feat: 完成FlaskAPP接口

This commit is contained in:
AdminWhaleFall
2022-04-03 21:10:11 +08:00
parent 638260c9c4
commit 894259da85
8 changed files with 439 additions and 3 deletions

View File

@@ -12,5 +12,31 @@
"phone_num": "{phone}",
"area_code": "86"
}
},
{
"desc": "彩云小译",
"url": "https://biz.caiyunapp.com/v1/send_sms_code",
"method": "POST",
"header": {
"Referer": "https://fanyi.caiyunapp.com/",
"Cy-Token": "token 9876032166"
},
"data": {
"phone_num": "{phone}",
"area_code": "86"
}
},
{
"desc": "彩云小译",
"url": "https://biz.caiyunapp.com/v1/send_sms_code",
"method": "POST",
"header": {
"Referer": "https://fanyi.caiyunapp.com/",
"Cy-Token": "token 9876032166"
},
"data": {
"phone_num": "{phone}",
"area_code": "86"
}
}
]

View File

@@ -0,0 +1 @@
[{"desc": "˛ĘÔĆĐĄŇë", "url": "https://biz.caiyunapp.com/v1/send_sms_code", "method": "POST", "header": {"Referer": "https://fanyi.caiyunapp.com/", "Cookie": "UM_distinctid=17fd5c7a9ba69a-0200a7005bf45a-56171958-146d15-17fd5c7a9bb749; _gid=GA1.2.2046680529.1648971157; _gat_gtag_UA_185151443_2=1; _ga=GA1.2.44459633.1648559084; _ga_65TZCJSDBD=GS1.1.1648971156.4.1.1648971164.0; _ga_R9YPR75N68=GS1.1.1648971156.4.1.1648971164.52", "Cy-Token": "token 9876032166"}, "data": {"phone_num": "{phone}", "area_code": "86"}}, {"desc": "˛ĘÔĆĐĄŇë", "url": "https://biz.caiyunapp.com/v1/send_sms_code", "method": "POST", "header": {"Referer": "https://fanyi.caiyunapp.com/", "Cy-Token": "token 9876032166"}, "data": {"phone_num": "{phone}", "area_code": "86"}}, {"desc": "˛ĘÔĆĐĄŇë", "url": "https://biz.caiyunapp.com/v1/send_sms_code", "method": "POST", "header": {"Referer": "https://fanyi.caiyunapp.com/", "Cy-Token": "token 9876032166"}, "data": {"phone_num": "{phone}", "area_code": "86"}}]

3
flask_app/README.md Normal file
View File

@@ -0,0 +1,3 @@
# Flask APP For SMSBoom
用于添加测试更改短信接口的 Flask 项目,方便短信接口的维护.

Binary file not shown.

113
flask_app/app.py Normal file
View File

@@ -0,0 +1,113 @@
# encoding=utf8
import json
import time
from flask import Flask, make_response, request, jsonify
from flask_cors import CORS
from urllib3 import disable_warnings
from utils import *
disable_warnings()
app = Flask(__name__)
CORS(app, supports_credentials=True, resources="/*") # 跨域
def request_parse(req_data: request) -> dict:
'''解析请求数据并以字典的形式返回'''
if req_data.method == 'POST':
data = req_data.form
elif req_data.method == 'GET':
data = req_data.args
return dict(data)
class BaseResponse(BaseModel):
"""返回的响应"""
status: int = 0 # 状态码 0-->成功 1-->失败
msg: str = "前端显示的简短信息"
data: Optional[str]
@property
def resp(self):
'''BaseModel类型返回json'''
response = make_response(
json.dumps(
self.dict(),
ensure_ascii=False,
sort_keys=False
),
)
response.mimetype = 'application/json'
# 跨域设置
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'OPTIONS,HEAD,GET,POST'
response.headers['Access-Control-Allow-Headers'] = 'x-requested-with'
return response
@app.route("/testapi/", methods=['POST'])
def testapi():
brs = BaseResponse()
# 需要传入 json 数据
try:
jsonData = request.get_json()
api = API(**jsonData)
phone = jsonData.get('phone')
if not phone:
raise ValueError("参数 phone 没有!")
try:
resp = test_resq(api, phone)
brs.status = 0
brs.msg = f"请求成功!{resp}"
brs.data = resp.text
except Exception as why:
brs.status = 1
brs.msg = f"请求失败:{why}"
except Exception as why:
brs.status = 1
brs.msg = f"参数有误:{why}"
return brs.resp
@app.route("/submitapi/", methods=['POST'])
def submitapi():
"""提交API到json文件"""
# 需要传入 json 数据
jsonData = request.get_json()
api = API(**jsonData)
data = json.loads(json_path.read_text(encoding='utf8'))
with open(json_path, mode="w", encoding="utf8") as j:
try:
data.append(api.dict())
json.dump(data, j, ensure_ascii=False, sort_keys=False)
return BaseResponse(status=0, msg="写入成功!").resp
except Exception as why:
return BaseResponse(status=1, msg=f"写入失败!{why}").resp
@app.route("/backapi/", methods=['GET'])
def backjson():
"""备份json文件"""
try:
timeStruct = time.localtime(int(time.time()))
strTime = time.strftime("%Y_%m_%d_%H_%M_%S", timeStruct)
Path.mkdir(Path(json_path.parent, 'apiback', exist_ok=True))
json_back_path = Path(json_path.parent, 'apiback',
f"api_back_{strTime}.json")
with open(json_back_path, mode="w") as j:
j_data = json.loads(json_path.read_text(encoding='utf8'))
json.dump(j_data, j, ensure_ascii=False, sort_keys=False)
return BaseResponse(status=0, msg="备份成功!").resp
except Exception as why:
return BaseResponse(status=1, msg=f"备份失败{why}").resp
@app.route("/downloadapi/", methods=['GET'])
def downloadapi():
"""下载接口文件"""
return json_path.read_text(encoding='utf8')
app.run(host="0.0.0.0", port=1098, debug=True)

View File

@@ -0,0 +1,205 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<!-- 屏蔽referrer -->
<meta name="referrer" content="never">
<title>短信轰炸接口调试工具.</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://cdn.staticfile.org/twitter-bootstrap/3.3.7/css/bootstrap.min.css">
<script src="https://cdn.staticfile.org/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdn.staticfile.org/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/vue@2"></script>
<script src="https://cdn.staticfile.org/axios/0.18.0/axios.min.js"></script>
<!-- 引入 layui.css -->
<link rel="stylesheet" href="https://unpkg.com/layui@2.6.8/dist/css/layui.css">
<!-- 引入 layui.js -->
<script src="https://unpkg.com/layui@2.6.8/dist/layui.js"></script>
</head>
<body>
<div class="container" id="app">
<div class="page-header">
<h1>抖音去水印解析<small>最新支持图文/视频! author:落落 <b><a href="https://lskyl.xyz/">Blog</a></b></small></h1>
</div>
<label for="basic-url">请输入复制的视频链接(自动提取URL)</label>
<!-- 输入框组 -->
<div class="input-group">
<span class="input-group-addon" id="basic-addon3">URL:</span>
<input v-model="raw_url" type="text" class="form-control" id="basic-url" aria-describedby="basic-addon3">
<span class="input-group-btn">
<button v-on:click="req_api" class="btn btn-info" type="button">Go!</button>
</span>
</div>
<br />
<div v-if="url" class="alert alert-info" role="alert">
<b>提取到的链接:</b> <a v-bind:href="url" class="alert-link" target="_blank">{{ url }}</a>
</div>
<div v-if="resp">
<div v-if="resp.status" class="alert alert-danger" role="alert">
<b>接口报错!Msg:</b> {{ resp.msg }}
</div>
<div v-else>
<div class="alert alert-success" role="alert">
<b>解析成功!Msg:</b> {{ resp.msg }}
</div>
<!-- Telegram share -->
<script async src="https://telegram.org/js/telegram-widget.js?18" v-bind:data-telegram-share-url="resp.data.p_url" v-bind:data-comment="resp.data.title" data-size="large"></script>
<!-- 解析结果可折叠面板 -->
<div class="panel-group" id="accordion">
<div class="panel panel-default">
<div class="panel-body">
<b v-if="resp.type_ === 'v'">Type:视频</b>
<b v-else>Type:图文</b>
</div>
</div>
<div class="panel panel-default">
<div class="panel-body">
<b>文案:</b> {{ resp.data.title }}
</div>
</div>
<div class="panel panel-default">
<div class="panel-body">
<b>作者:</b> {{ resp.data.author }}
</div>
</div>
<div class="panel panel-default">
<div class="panel-body">
<b>音乐:</b><a v-bind:href="resp.data.m_url" target="_blank"><b>(点击下载)</b></a>
<audio v-bind:src="resp.data.m_url" controls>
</div>
</div>
<div class="panel panel-default">
<div class="panel-heading">
<h4 class="panel-title">
<a data-toggle="collapse" data-parent="#accordion" href="#collapse_1">
预览图 (点击展开)
</a>
</h4>
</div>
<div id="collapse_1" class="panel-collapse collapse">
<div class="panel-body">
<a href="#" class="thumbnail">
<img v-bind:src="resp.data.p_url" alt="无法加载....">
<a v-bind:href="resp.data.p_url">
<button type="button" class="btn btn-info">Download</button>
</a>
</a>
</div>
</div>
</div>
<div v-if="resp.type_ === 'p'" class="panel panel-default">
<div class="panel-heading">
<h4 class="panel-title">
<a data-toggle="collapse" data-parent="#accordion" href="#collapse_2">
图文下载 (点击展开)
</a>
</h4>
</div>
<div id="collapse_2" class="panel-collapse collapse">
<div class="panel-body">
点击图片即可下载!
<div class="row">
<div v-for="img_url in resp.data.img_list" class="col-xs-6 col-md-3">
<a v-bind:href="img_url" target="_blank" class="thumbnail">
<img v-bind:src="img_url" alt="加载失败">
</a>
</div>
</div>
</div>
</div>
</div>
<div v-else>
<div class="panel panel-default">
<div class="panel-body">
<b>视频链接:</b>
<a v-bind:href="resp.data.v_url" target="_blank">(点击下载)</a>
<br />
<div class="embed-responsive embed-responsive-16by9">
<video ref="video" controls controlslist="nodownload" preload="none" width="100%"
height="100%" :poster="resp.data.p_url" :src="resp.data.v_url">
</video>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
<div v-if="error">
<div class="alert alert-danger" role="alert">
<b>请求失败,请检查接口 Why:</b> {{ resp }}
接口地址: <code>{{ api }}</code>
</div>
</div>
</div>
</body>
<script>
// api 地址基于 Flask 搭建,可能会失效。请知悉
// 重度使用者请自行通过腾讯云函数搭建!!!
var api = "https://service-dyjwokh6-1300913563.gz.apigw.tencentcs.com/dy/"
var app = new Vue({
el: '#app',
data: {
api: api,
raw_url: '',
url: '',
resp: '',
error: ''
},
methods: {
// 正则匹配字符串中的URL
httpString: function (s) {
if (!s) return ''
var reg = /(http:\/\/|https:\/\/)((\w|=|\?|\.|\/|&|-)+)/g;
var reg = /(https?|http|ftp|file):\/\/[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]/g;
s = s.match(reg);
this.url = s[0];
console.log(this.url);
return s[0];
},
req_api: function () {
layer.msg('请求接口中', {
icon: 16
, shade: 0.01
});
axios.get(api, {
params: {
url: this.url,
}
})
.then((response) => {
this.resp = response.data;
layer.closeAll('loading');
})
.catch((error) => {
this.error = true;
this.resp = error;
layer.closeAll('loading');
})
},
},
// 监听 raw_url 变化
watch: {
'raw_url': function (newVal) {
this.url = this.httpString(newVal);
},
},
});
</script>
</html>

86
flask_app/utils.py Normal file
View File

@@ -0,0 +1,86 @@
# encoding=utf8
from pathlib import Path
from loguru import logger
import sys
from pydantic import BaseModel, validator
from typing import Optional, Union
import httpx
from datetime import datetime
import json
# logger config
logger.remove()
logger.add(
sink=sys.stdout,
format="<green>{time:YYYY-MM-DD at HH:mm:ss}</green> - <level>{level}</level> - <level>{message}</level>",
colorize=True,
backtrace=True
)
json_path = Path(Path(__file__).parent.parent, "api.json")
if not json_path.exists():
logger.error("Json file not exists in default directory!")
sys.exit(1)
default_header = {
"User-Agent": "Mozilla/5.0 (Linux; U; Android 10; zh-cn; Mi 10 Build/QKQ1.191117.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/79.0.3945.147 Mobile Safari/537.36 XiaoMi/MiuiBrowser/13.5.40"
}
class API(BaseModel):
desc: str = "Default"
url: str
method: str = "GET"
header: Optional[Union[str, dict]] = default_header
data: Optional[Union[str, dict]]
@validator('url')
def name_must_contain_space(cls, v: str):
"""验证链接是否正确"""
if not v.startswith('https' or 'http'):
raise ValueError('url must startswith http(s)!')
return v
def replace_data(self, content: Union[str, dict], phone) -> str:
if isinstance(content, dict):
for key, value in content.items():
content[key] = value.replace("{phone}", phone).replace(
"{timestamp}", self.timestamp_new())
else:
if isinstance(content, str):
content.replace("{phone}", phone).replace(
"{timestamp}", self.timestamp_new())
return content
def timestamp_new(self) -> str:
"""返回整数字符串时间戳"""
return str(int(datetime.now().timestamp()))
def handle_API(self, phone):
"""
:param API: one API basemodel
:return: API basemodel
"""
if self.method != "POST":
self.method = "GET"
self.data = self.replace_data(self.data, phone)
self.url = self.replace_data(self.url, phone)
if isinstance(self.header, str):
self.header = json.loads(self.header)
return self
def test_resq(api: API, phone) -> httpx.Response:
"""测试 API 返回响应
:param api: API model
:param phone: 手机号
:return: httpx 请求对象.
"""
api = api.handle_API(phone)
with httpx.Client(headers=default_header, timeout=8) as client:
if not isinstance(api.data, dict):
client.request(method=api.method, headers=api.header,
url=api.url, data=api.data)
resp = client.request(
method=api.method, headers=api.header, url=api.url, json=api.data)
return resp

View File

@@ -1,4 +1,4 @@
# coding=utf-8
# encoding=utf8
import httpx
import json
import sys
@@ -79,8 +79,10 @@ def replace_data(content: Union[str, dict]) -> str:
content[key] = value.replace("{phone}", phone).replace(
"{timestamp}", timestamp_new())
else:
content.replace("{phone}", phone).replace(
"{timestamp}", timestamp_new())
# fix: add str判断
if isinstance(content, str):
content.replace("{phone}", phone).replace(
"{timestamp}", timestamp_new())
return content