#!/usr/bin/python python3 # coding=utf-8 # 爬取轰炸平台接口 from loguru import logger import httpx import requests import re from utils import Sql import queue import pathlib import threading import sys import json from prettytable import PrettyTable import click import urllib3 urllib3.disable_warnings() # logger config logger.remove() logger.add( sink=sys.stdout, format="{time:YYYY-MM-DD at HH:mm:ss} - {level} - {message}", colorize=True, backtrace=True ) path = pathlib.Path(__file__).parent header = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.9 Safari/537.36", } class SMS(object): # 默认的请求密钥 default_phone = "15019682928" key_default = f"?hm={default_phone}&ok=" def __init__(self, website, key) -> None: self.url = website self.header = header if key == "": self.key = self.key_default self.api_queue = queue.Queue() self.db = Sql() self.lock = threading.Lock() self.ok_api = 0 def get_sms_api(self): '''请求短信轰炸平台''' with httpx.Client(verify=False) as ses: ses.get(self.url, headers=self.header) resp = ses.get(f"{self.url}{self.key}", headers=self.header) pat = re.compile(r" tuple: """check api web is ok? :return: tuple """ if url is None: return with httpx.Client(headers=header, verify=False) as client: try: resp = client.get(url=url).text title = re.findall('(.*?)', resp) if title: logger.info(f"{url} title:{title[0]}") return (title[0], url) except httpx.HTTPError as why: logger.error(f"{url} 请求错误! {why}") return def load_api_web(): """从 json 文件加载轰炸网址.并测试! :return: """ json_path = pathlib.Path(path, 'hz-web.json') table = PrettyTable(["标题", "链接"]) if not json_path.exists(): logger.error(f"hz-web.json not exists in {str(json_path)}!") return j = json_path.read_text(encoding="utf8") ok_web = [] try: webs = json.loads(j) except json.decoder.JSONDecodeError as why: logger.error(f"json syctax error! {why}") return for web in webs: result = test_api_web(web['url']) if result: table.add_row([result[0], result[1]]) ok_web.append( {"url": result[1], "key": web.get('key'), "title": result[0]}) logger.success(f"有效的轰炸网站:\n{table}") if input(">>是否写入 hz-web.json?(Y/n)") == "Y": with open(json_path, encoding="utf8", mode="w") as fp: try: json.dump(ok_web, fp, ensure_ascii=False) logger.success("save hz-web.json success!") except Exception as why: logger.error(f"write hz-web.json error {why}") return ok_web @click.group() def cli(): pass @click.command() def spider_all(): """ 根据目录下的 hz-web.json 文件更新接口 """ websites = load_api_web() for website in websites: logger.info(f"正在爬取:{website['url']}") try: sms = SMS(website=website['url'], key=website['key']).main() except Exception as why: logger.critical(f"爬取:{website['url']} 出错:{why}") @click.command() @click.option('--url', help='轰炸网站的网址,结尾需要带/', prompt=True) @click.option('--key', help='网址携带的参数(可选)', default="") def spider_one(url, key): """爬取单个网址.""" try: sms = SMS(website=url, key=key).main() except Exception as why: logger.critical(f"爬取:{url} 出错:{why}") @click.command() @logger.catch def save_api(): """保存api到 GETAPI.json 文件""" db = Sql() apis = db.select() api_lst = [ api for api in apis ] with open("GETAPI.json", mode="w") as j: json.dump(fp=j, obj=api_lst, ensure_ascii=False) logger.success("写入到 GETAPI.json 成功!") cli.add_command(spider_all) cli.add_command(spider_one) cli.add_command(save_api) if __name__ == '__main__': cli()