mirror of
https://github.com/WhaleFell/SMSBoom.git
synced 2026-02-04 02:43:24 +08:00
142 lines
4.7 KiB
Python
142 lines
4.7 KiB
Python
# encoding=utf8
|
|
# 请求的方法
|
|
import httpx
|
|
from httpx import Limits
|
|
from typing import Union, List
|
|
import asyncio
|
|
|
|
|
|
from utils import default_header_user_agent
|
|
|
|
from utils.models import API
|
|
from utils.log import logger
|
|
|
|
|
|
def reqAPI(api: API, client: Union[httpx.Client, httpx.AsyncClient]) -> httpx.Response:
|
|
if isinstance(api.data, dict):
|
|
resp = client.request(method=api.method, json=api.data,
|
|
headers=api.header, url=api.url, timeout=10)
|
|
else:
|
|
resp = client.request(method=api.method, data=api.data,
|
|
headers=api.header, url=api.url, timeout=10)
|
|
return resp
|
|
|
|
|
|
|
|
def reqFuncByProxy(api: Union[API, str], phone: Union[tuple, str], proxy: dict) -> bool:
|
|
|
|
"""通过代理请求接口方法"""
|
|
# 多手机号支持
|
|
if isinstance(phone, tuple):
|
|
phone_lst = [_ for _ in phone]
|
|
else:
|
|
phone_lst = [phone]
|
|
with httpx.Client(headers=default_header_user_agent(), verify=False, proxies=proxy) as client:
|
|
for ph in phone_lst:
|
|
try:
|
|
if isinstance(api, API):
|
|
api = api.handle_API(ph)
|
|
resp = reqAPI(api, client)
|
|
logger.info(f"{api.desc}-{resp.text[:30]}")
|
|
else:
|
|
api = api.replace("[phone]", ph).replace(" ", "").replace('\n', '').replace('\r', '')
|
|
resp = client.get(url=api, headers=default_header_user_agent())
|
|
logger.info(f"GETAPI接口-{resp.text[:30]}")
|
|
return True
|
|
except httpx.HTTPError as why:
|
|
logger.error(f"请求失败{why}")
|
|
return False
|
|
|
|
|
|
def reqFunc(api: Union[API, str], phone: Union[tuple, str]) -> bool:
|
|
|
|
"""请求接口方法"""
|
|
# 多手机号支持
|
|
if isinstance(phone, tuple):
|
|
phone_lst = [_ for _ in phone]
|
|
else:
|
|
phone_lst = [phone]
|
|
with httpx.Client(headers=default_header_user_agent(), verify=False) as client:
|
|
for ph in phone_lst:
|
|
try:
|
|
if isinstance(api, API):
|
|
api = api.handle_API(ph)
|
|
resp = reqAPI(api, client)
|
|
logger.info(f"{api.desc}-{resp.text[:30]}")
|
|
else:
|
|
api = api.replace("[phone]", ph).replace(" ", "").replace('\n', '').replace('\r', '')
|
|
resp = client.get(url=api, headers=default_header_user_agent())
|
|
logger.info(f"GETAPI接口-{resp.text[:30]}")
|
|
return True
|
|
except httpx.HTTPError as why:
|
|
logger.error(f"请求失败{why}")
|
|
return False
|
|
|
|
|
|
async def asyncReqs(src: Union[API, str], phone: Union[tuple, str], semaphore):
|
|
"""异步请求方法
|
|
|
|
:param:
|
|
:return:
|
|
|
|
"""
|
|
# 多手机号支持
|
|
if isinstance(phone, tuple):
|
|
phone_lst = [_ for _ in phone]
|
|
else:
|
|
phone_lst = [phone]
|
|
async with semaphore:
|
|
async with httpx.AsyncClient(
|
|
limits=Limits(max_connections=1000,
|
|
max_keepalive_connections=2000),
|
|
headers=default_header,
|
|
verify=False,
|
|
timeout=99999
|
|
) as c:
|
|
|
|
for ph in phone_lst:
|
|
try:
|
|
if isinstance(src, API):
|
|
src = src.handle_API(ph)
|
|
r = await reqAPI(src, c)
|
|
else:
|
|
# 利用元组传参安全因为元组不可修改
|
|
s = (src.replace(" ", "").replace("\n", "").replace("\t", "").replace(
|
|
"&", "").replace('\n', '').replace('\r', ''),)
|
|
r = await c.get(*s)
|
|
return r
|
|
except httpx.HTTPError as why:
|
|
logger.error(f"异步请求失败{type(why)}")
|
|
# logger.error(f"异步请求失败{why}")
|
|
# import aiofiles
|
|
# async with aiofiles.open("error.txt","a",encoding="utf-8") as f:
|
|
# await f.write(f"{str(s[0]) if str(s[0]) else str(src)}\n")
|
|
except TypeError:
|
|
logger.error("类型错误")
|
|
except Exception as wy:
|
|
logger.exception(f"异步失败{wy}")
|
|
|
|
|
|
def callback(result):
|
|
"""异步回调函数"""
|
|
log = result.result()
|
|
if log is not None:
|
|
logger.info(f"请求结果:{log.text[:30]}")
|
|
|
|
|
|
|
|
async def runAsync(apis: List[Union[API,str]], phone: Union[tuple, str]):
|
|
|
|
|
|
tasks = []
|
|
|
|
for api in apis:
|
|
semaphore = asyncio.Semaphore(999999)
|
|
task = asyncio.ensure_future(asyncReqs(api, phone, semaphore))
|
|
task.add_done_callback(callback)
|
|
tasks.append(task)
|
|
|
|
await asyncio.gather(
|
|
*tasks
|
|
)
|