🎈 perf: 优化代码

This commit is contained in:
AdminWhaleFall
2022-04-08 22:35:22 +08:00
parent 32874aa02e
commit dcbaa4f4fb
6 changed files with 133 additions and 176 deletions

View File

@@ -4,9 +4,9 @@
"url": "https://biz.caiyunapp.com/v1/send_sms_code",
"method": "POST",
"header": {
"Referer": "https://fanyi.caiyunapp.com/",
"Cookie": "UM_distinctid=17fd5c7a9ba69a-0200a7005bf45a-56171958-146d15-17fd5c7a9bb749; _gid=GA1.2.2046680529.1648971157; _gat_gtag_UA_185151443_2=1; _ga=GA1.2.44459633.1648559084; _ga_65TZCJSDBD=GS1.1.1648971156.4.1.1648971164.0; _ga_R9YPR75N68=GS1.1.1648971156.4.1.1648971164.52",
"Cy-Token": "token 9876032166"
"Cy-Token": "token 9876032166",
"X-Authorization": "token:qgemv4jr1y38jyq6vhvi",
"app-name": "xy"
},
"data": {
"phone_num": "[phone]",

View File

@@ -1,45 +0,0 @@
# -*- coding: utf-8 -*-
# @Time : 2022/1/11 1:46
# @Author : WhaleFall
# @Site :
# @File : config.py
# @Description : 项目的配置文件
import os
from pathlib import Path
import platform
import sys
class Config:
BASE_DIR = Path(__file__).resolve().parent # 项目绝对目录
SQLITE_PATH = os.path.join(BASE_DIR, 'db', 'data.db')
# 默认UA
DEFAULT_UA = "Mozilla/5.0 (Windows; U; Windows NT 6.1; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 " \
"Safari/534.50"
LOGS_DIR = Path(BASE_DIR, 'logs')
if platform.system() == "Windows":
DATABASE_URI = 'sqlite:///' + SQLITE_PATH
else:
DATABASE_URI = 'sqlite:////' + SQLITE_PATH
engine = create_engine(DATABASE_URI) # 数据库引擎
def __init__(self):
dirs = [Path(self.BASE_DIR, 'db'), self.LOGS_DIR]
for dir_ in dirs:
dir_.mkdir(exist_ok=True)
config = Config() # 实例化配置
log_config = {
# 添加接收器
"handlers": [
# 写入日志文件不必用颜色,rotation="1 day": 文件超过一天就会分割
# run_{time:YYYY_M_D}.log: 日志格式
{"sink": str(config.LOGS_DIR) + "/run_{time:YYYY_M_D}.log", "rotation": "1 day", "encoding": "utf-8",
"backtrace": True, "diagnose": True, "colorize": False, "level": "DEBUG"},
# 标准输出流
{"sink": sys.stdout, "backtrace": True, "diagnose": True, "colorize": True, "level": "DEBUG"},
],
}

View File

@@ -1 +1,4 @@
httpx
httpx
click
flask
flask_cors

View File

@@ -1,126 +0,0 @@
# encoding=utf8
import httpx
import json
import sys
import pathlib
import asyncio
from loguru import logger
from pydantic import BaseModel, validator
from typing import Optional, List, Union
from datetime import datetime
from urllib3 import disable_warnings
disable_warnings()
# logger config
logger.remove()
logger.add(
sink=sys.stdout,
format="<green>{time:YYYY-MM-DD at HH:mm:ss}</green> - <level>{level}</level> - <level>{message}</level>",
colorize=True,
backtrace=True
)
default_header = {
"User-Agent": "Mozilla/5.0 (Linux; U; Android 10; zh-cn; Mi 10 Build/QKQ1.191117.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/79.0.3945.147 Mobile Safari/537.36 XiaoMi/MiuiBrowser/13.5.40"
}
# current directory
path = pathlib.Path(__file__).parent
phone = "19820294267"
class API(BaseModel):
desc: str = "Default"
url: str
method: str = "GET"
header: Optional[dict]
data: Optional[Union[str, dict]]
def load_json() -> List[API]:
"""
load json for api.json
:return: api list
"""
json_path = pathlib.Path(path, 'api.json')
if not json_path.exists():
logger.error("Json file not exists!")
return None
with open(json_path.resolve(), mode="r", encoding="utf8") as j:
try:
datas = json.loads(j.read())
APIs = [
API(**data)
for data in datas
]
return APIs
except Exception as why:
logger.error(f"Json file syntax error:{why}")
return None
def timestamp_new() -> str:
"""返回整数字符串时间戳"""
return str(int(datetime.now().timestamp()))
def replace_data(content: Union[str, dict]) -> str:
if not phone:
return content
# 统一转换成 str 再替换.
content = str(content).replace("[phone]", phone).replace(
"[timestamp]", timestamp_new()).replace("'",'"')
# 尝试 json 化
try:
return json.loads(content)
except:
return content
def handle_API(API: API) -> API:
"""
:param API: one API basemodel
:return: API basemodel
"""
API.data = replace_data(API.data)
API.url = replace_data(API.url)
return API
async def rqs(API: API):
"""requests api async function
:param API: one API basemodel
:return:
"""
API = handle_API(API)
# print(API.dict())
async with httpx.AsyncClient(headers=default_header) as client:
try:
if not isinstance(API.data, dict):
resp = await client.request(method=API.method, headers=API.header,
url=API.url, data=API.data)
else:
resp = await client.request(
method=API.method, headers=API.header, url=API.url, json=API.data)
logger.success(f"{API.desc}-{resp.text}")
except httpx.HTTPError as exc:
logger.error(f"{API.desc} Error:{exc}")
@logger.catch
async def main():
APIs = load_json()
if APIs is None:
return
# 接收一个元组需要用 * 传参
await asyncio.gather(
*(rqs(api)
for api in APIs)
)
if __name__ == "__main__":
asyncio.run(main())

79
smsboom_async.py Normal file
View File

@@ -0,0 +1,79 @@
# encoding=utf8
# 短信发送异步版.....
import asyncio
import json
import pathlib
import sys
import httpx
from loguru import logger
from urllib3 import disable_warnings
from typing import List
from utils import API, default_header
disable_warnings()
# logger config
logger.remove()
logger.add(
sink=sys.stdout,
format="<green>{time:YYYY-MM-DD at HH:mm:ss}</green> - <level>{level}</level> - <level>{message}</level>",
colorize=True,
backtrace=True
)
# current directory
path = pathlib.Path(__file__).parent
phone = "19820294267"
def load_json() -> List[API]:
"""load json for api.json
:return: api list
"""
json_path = pathlib.Path(path, 'api.json')
if not json_path.exists():
logger.error("Json file not exists!")
return None
with open(json_path.resolve(), mode="r", encoding="utf8") as j:
try:
datas = json.loads(j.read())
APIs = [
API(**data)
for data in datas
]
return APIs
except Exception as why:
logger.error(f"Json file syntax error:{why}")
return None
async def rqs(api: API):
async with httpx.AsyncClient(headers=default_header) as c:
try:
if isinstance(api.data, dict):
resp = await c.request(method=api.method, url=api.url, json=api.data, headers=api.header)
else:
resp = await c.request(method=api.method, url=api.url, data=api.data, headers=api.header)
logger.success(f"{api.desc}-请求成功-{resp.text}")
except httpx.HTTPError as why:
logger.error(f"{api.desc}-请求错误-{why}")
@logger.catch
async def main():
APIs = load_json()
if APIs is None:
return
# 接收一个元组需要用 * 传参
await asyncio.gather(
*(rqs(api.handle_API(phone))
for api in APIs)
)
if __name__ == "__main__":
phone = input(">>需要轰炸的手机号码:")
asyncio.run(main())

View File

@@ -1,10 +1,20 @@
# coding=utf-8
# 读写数据库模块
# utils 实用工具类
import sqlite3
import json
from datetime import datetime
from pathlib import Path
from pydantic import BaseModel
from typing import Union, Optional
default_header = {
"User-Agent": "Mozilla/5.0 (Linux; U; Android 10; zh-cn; Mi 10 Build/QKQ1.191117.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/79.0.3945.147 Mobile Safari/537.36 XiaoMi/MiuiBrowser/13.5.40"
}
class Sql(object):
"""处理SQL数据"""
def __init__(self) -> None:
'''初始化数据库'''
# 数据库路径
@@ -59,3 +69,39 @@ CREATE TABLE IF NOT EXISTS API200 (
'''对象被删除时执行的函数'''
print(f"共改变{self.client.total_changes}条数据!,正在关闭数据库连接......")
self.client.close()
class API(BaseModel):
"""处理自定义 API 数据"""
desc: str = "Default"
url: str
method: str = "GET"
header: Optional[Union[str, dict]] = default_header
data: Optional[Union[str, dict]]
def replace_data(self, content: Union[str, dict], phone: str) -> str:
# 统一转换成 str 再替换. ' -> "
content = str(content).replace("[phone]", phone).replace(
"[timestamp]", self.timestamp_new()).replace("'", '"')
# 尝试 json 化
try:
return json.loads(content)
except:
return content
def timestamp_new(self) -> str:
"""返回整数字符串时间戳"""
return str(int(datetime.now().timestamp()))
def handle_API(self, phone: str):
""" 传入手机号处理 API
:param API: one API basemodel
:return: API basemodel
"""
# 如果传入的 header 是字符串,就转为字典.
if (isinstance(self.header, str) and self.header):
self.header = json.loads(self.header.replace("'", '"'))
self.data = self.replace_data(self.data, phone)
self.url = self.replace_data(self.url, phone)
return self