#!/usr/bin/python python3
# coding=utf-8
# 爬取轰炸平台接口
from loguru import logger
import httpx
import requests
import re
from utils import Sql
import queue
import pathlib
import threading
import sys
import json
from prettytable import PrettyTable
import click
import urllib3
urllib3.disable_warnings()
# logger config
logger.remove()
logger.add(
sink=sys.stdout,
format="{time:YYYY-MM-DD at HH:mm:ss} - {level} - {message}",
colorize=True,
backtrace=True
)
path = pathlib.Path(__file__).parent
header = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.9 Safari/537.36",
}
class SMS(object):
# 默认的请求密钥
default_phone = "15019682928"
key_default = f"?hm={default_phone}&ok="
def __init__(self, website, key) -> None:
self.url = website
self.header = header
if key == "":
self.key = self.key_default
self.api_queue = queue.Queue()
self.db = Sql()
self.lock = threading.Lock()
self.ok_api = 0
def get_sms_api(self):
'''请求短信轰炸平台'''
with httpx.Client(verify=False) as ses:
ses.get(self.url, headers=self.header)
resp = ses.get(f"{self.url}{self.key}", headers=self.header)
pat = re.compile(r"
tuple:
"""check api web is ok?
:return: tuple
"""
if url is None:
return
with httpx.Client(headers=header, verify=False) as client:
try:
resp = client.get(url=url).text
title = re.findall('
(.*?)', resp)
if title:
logger.info(f"{url} title:{title[0]}")
return (title[0], url)
except httpx.HTTPError as why:
logger.error(f"{url} 请求错误! {why}")
return
def load_api_web():
"""从 json 文件加载轰炸网址.并测试!
:return:
"""
json_path = pathlib.Path(path, 'hz-web.json')
table = PrettyTable(["标题", "链接"])
if not json_path.exists():
logger.error(f"hz-web.json not exists in {str(json_path)}!")
return
j = json_path.read_text(encoding="utf8")
ok_web = []
try:
webs = json.loads(j)
except json.decoder.JSONDecodeError as why:
logger.error(f"json syctax error! {why}")
return
for web in webs:
result = test_api_web(web['url'])
if result:
table.add_row([result[0], result[1]])
ok_web.append(
{"url": result[1], "key": web.get('key'), "title": result[0]})
logger.success(f"有效的轰炸网站:\n{table}")
if input(">>是否写入 hz-web.json?(Y/n)") == "Y":
with open(json_path, encoding="utf8", mode="w") as fp:
try:
json.dump(ok_web, fp, ensure_ascii=False)
logger.success("save hz-web.json success!")
except Exception as why:
logger.error(f"write hz-web.json error {why}")
return ok_web
@click.group()
def cli():
pass
@click.command()
def spider_all():
"""
根据目录下的 hz-web.json 文件更新接口
"""
websites = load_api_web()
for website in websites:
logger.info(f"正在爬取:{website['url']}")
try:
sms = SMS(website=website['url'], key=website['key']).main()
except Exception as why:
logger.critical(f"爬取:{website['url']} 出错:{why}")
@click.command()
@click.option('--url', help='轰炸网站的网址,结尾需要带/', prompt=True)
@click.option('--key', help='网址携带的参数(可选)', default="")
def spider_one(url, key):
"""爬取单个网址."""
try:
sms = SMS(website=url, key=key).main()
except Exception as why:
logger.critical(f"爬取:{url} 出错:{why}")
@click.command()
@logger.catch
def save_api():
"""保存api到 GETAPI.json 文件"""
db = Sql()
apis = db.select()
api_lst = [
api
for api in apis
]
with open("GETAPI.json", mode="w") as j:
json.dump(fp=j, obj=api_lst, ensure_ascii=False)
logger.success("写入到 GETAPI.json 成功!")
cli.add_command(spider_all)
cli.add_command(spider_one)
cli.add_command(save_api)
if __name__ == '__main__':
cli()