mirror of
https://github.com/WhaleFell/SMSBoom.git
synced 2026-02-04 10:53:37 +08:00
🐞 fix: 修复bug
This commit is contained in:
@@ -1,10 +1,10 @@
|
||||
# encoding=utf8
|
||||
# app 工厂函数
|
||||
from flask import Flask
|
||||
from flask import Flask,current_app
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from flask_admin import Admin
|
||||
from flask_admin.contrib.sqla import ModelView
|
||||
from flask_babelex import Babel
|
||||
from flask_babelex import Babel
|
||||
import sys
|
||||
import os
|
||||
from loguru import logger
|
||||
@@ -28,9 +28,8 @@ logger.add(
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
||||
# app config
|
||||
|
||||
|
||||
class AppConfig:
|
||||
SQLALCHEMY_DATABASE_URI = prefix + \
|
||||
os.path.join(app.root_path, 'data.db') # 数据库路径
|
||||
@@ -40,18 +39,23 @@ class AppConfig:
|
||||
# 密钥
|
||||
SESSION_TYPE = 'filesystem'
|
||||
SECRET_KEY = os.urandom(24)
|
||||
|
||||
|
||||
BABEL_DEFAULT_LOCALE = 'zh_CN' # 汉化
|
||||
|
||||
TEST_PHONE = "19820294268" # 测试手机号
|
||||
|
||||
|
||||
app.config.from_object(AppConfig)
|
||||
# 设置模板全局变量
|
||||
# print(app.config.get("TEST_PHONE"))
|
||||
app.add_template_global(current_app,"current_app")
|
||||
|
||||
# 扩展
|
||||
db = SQLAlchemy(app)
|
||||
babel = Babel(app)
|
||||
|
||||
admin = Admin(app, name="短信接口调试", template_mode='bootstrap3')
|
||||
from .model import ApisModelVies,Apis
|
||||
from .model import ApisModelVies, Apis
|
||||
admin.add_view(ApisModelVies(Apis, db.session))
|
||||
|
||||
# buleprint
|
||||
|
||||
@@ -3,6 +3,13 @@
|
||||
from . import db
|
||||
from datetime import datetime
|
||||
from . import ModelView
|
||||
import json
|
||||
from typing import Union,Optional
|
||||
from pydantic import BaseModel
|
||||
|
||||
default_header = {
|
||||
"User-Agent": "Mozilla/5.0 (Linux; U; Android 10; zh-cn; Mi 10 Build/QKQ1.191117.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/79.0.3945.147 Mobile Safari/537.36 XiaoMi/MiuiBrowser/13.5.40"
|
||||
}
|
||||
|
||||
class ApisModelVies(ModelView):
|
||||
create_template = 'api_edit.html'
|
||||
@@ -23,3 +30,37 @@ class Apis(db.Model):
|
||||
header = db.Column(db.String(9999)) # 请求头
|
||||
data = db.Column(db.String(9999)) # 请求数据
|
||||
add_time = db.Column(db.DateTime(), default=datetime.now) # 添加时间
|
||||
|
||||
|
||||
class API(BaseModel):
|
||||
desc: str = "Default"
|
||||
url: str
|
||||
method: str = "GET"
|
||||
header: Optional[Union[str, dict]] = default_header
|
||||
data: Optional[Union[str, dict]]
|
||||
|
||||
def replace_data(self, content: Union[str, dict], phone) -> str:
|
||||
# 统一转换成 str 再替换.
|
||||
content = str(content).replace("[phone]", phone).replace(
|
||||
"[timestamp]", self.timestamp_new()).replace("'",'"')
|
||||
# 尝试 json 化
|
||||
try:
|
||||
return json.loads(content)
|
||||
except:
|
||||
return content
|
||||
|
||||
def timestamp_new(self) -> str:
|
||||
"""返回整数字符串时间戳"""
|
||||
return str(int(datetime.now().timestamp()))
|
||||
|
||||
def handle_API(self, phone=None):
|
||||
"""
|
||||
:param API: one API basemodel
|
||||
:return: API basemodel
|
||||
"""
|
||||
if isinstance(self.header, str) and self.header:
|
||||
self.header = self.replace_data(self.header, phone)
|
||||
self.data = self.replace_data(self.data, phone)
|
||||
self.url = self.replace_data(self.url, phone)
|
||||
return self
|
||||
|
||||
|
||||
52
flask_app/static/test.js
Normal file
52
flask_app/static/test.js
Normal file
@@ -0,0 +1,52 @@
|
||||
|
||||
|
||||
function getValue() {
|
||||
let desc = $("#desc").val();
|
||||
let url = $("#url").val();
|
||||
let method = $("#method").val();
|
||||
let header = $("#header").val();
|
||||
let phone = $("#phone").val();
|
||||
let data_ = $("#data").val();
|
||||
|
||||
let data = {
|
||||
"desc": desc,
|
||||
"url": url,
|
||||
"method": method,
|
||||
"header": header,
|
||||
"phone": phone,
|
||||
"data": data_
|
||||
};
|
||||
|
||||
return data;
|
||||
};
|
||||
|
||||
$(document).ready(function () {
|
||||
|
||||
|
||||
|
||||
$("#test").click(function () {
|
||||
|
||||
$.ajax({
|
||||
type: "POST",
|
||||
url: "/testapi/",
|
||||
contentType: "application/json",
|
||||
data: JSON.stringify(getValue()),
|
||||
dataType: "json",
|
||||
success: function (response) {
|
||||
if (response.status == 0) {
|
||||
$("#suc").show().text("请求成功!:" + response.resp);
|
||||
} else {
|
||||
$("#suc").attr("class", "alert alert-warning").show().text("请求失败!:" + response.resp);
|
||||
}
|
||||
},
|
||||
error: function (XMLHttpRequest, textStatus, errorThrown) {
|
||||
$("#error").show().text("发送请求错误请检查后端接口:" + textStatus);
|
||||
},
|
||||
});
|
||||
|
||||
});
|
||||
|
||||
// console.log(desc, url, method, header);
|
||||
|
||||
|
||||
});
|
||||
@@ -4,6 +4,13 @@
|
||||
<h2>短信接口添加</h2>
|
||||
<p>替换字符: 手机号<code>[phone]</code> 时间戳<code>[timestamp]</code></p>
|
||||
{{ super() }}
|
||||
<input type="button" class="btn btn-primary" value="测试接口">
|
||||
<input type="text" value="{{ current_app.config.get("TEST_PHONE") }}" id="phone">
|
||||
<input type="button" class="btn btn-primary" value="测试接口" id="test">
|
||||
{% endblock %}
|
||||
|
||||
|
||||
{% block tail %}
|
||||
<div id="error" class="alert alert-danger" style="display: none;">请求错误!</div>
|
||||
<div id="suc" class="alert alert-success" style="display: none;">请求成功!</div>
|
||||
<script src="/static/test.js" type="text/javascript"></script>
|
||||
{% endblock %}
|
||||
|
||||
@@ -1,70 +1,7 @@
|
||||
# encoding=utf8
|
||||
from pathlib import Path
|
||||
from loguru import logger
|
||||
import sys
|
||||
from pydantic import BaseModel, validator
|
||||
from typing import Optional, Union
|
||||
import httpx
|
||||
from datetime import datetime
|
||||
import json
|
||||
|
||||
# logger config
|
||||
logger.remove()
|
||||
logger.add(
|
||||
sink=sys.stdout,
|
||||
format="<green>{time:YYYY-MM-DD at HH:mm:ss}</green> - <level>{level}</level> - <level>{message}</level>",
|
||||
colorize=True,
|
||||
backtrace=True
|
||||
)
|
||||
|
||||
json_path = Path(Path(__file__).parent.parent, "api.json")
|
||||
if not json_path.exists():
|
||||
logger.error("Json file not exists in default directory!")
|
||||
sys.exit(1)
|
||||
|
||||
default_header = {
|
||||
"User-Agent": "Mozilla/5.0 (Linux; U; Android 10; zh-cn; Mi 10 Build/QKQ1.191117.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/79.0.3945.147 Mobile Safari/537.36 XiaoMi/MiuiBrowser/13.5.40"
|
||||
}
|
||||
|
||||
|
||||
class API(BaseModel):
|
||||
desc: str = "Default"
|
||||
url: str
|
||||
method: str = "GET"
|
||||
header: Optional[Union[str, dict]] = default_header
|
||||
data: Optional[Union[str, dict]]
|
||||
|
||||
|
||||
def replace_data(self, content: Union[str, dict], phone) -> str:
|
||||
if not phone:
|
||||
return content
|
||||
# 统一转换成 str 再替换.
|
||||
content = str(content).replace("[phone]", phone).replace(
|
||||
"[timestamp]", self.timestamp_new()).replace("'",'"')
|
||||
# 尝试 json 化
|
||||
try:
|
||||
return json.loads(content)
|
||||
except:
|
||||
return content
|
||||
|
||||
def timestamp_new(self) -> str:
|
||||
"""返回整数字符串时间戳"""
|
||||
return str(int(datetime.now().timestamp()))
|
||||
|
||||
def handle_API(self, phone=None):
|
||||
"""
|
||||
:param API: one API basemodel
|
||||
:return: API basemodel
|
||||
"""
|
||||
# if isinstance(self.data, str):
|
||||
# if self.data:
|
||||
# self.data = json.loads(self.data)
|
||||
if isinstance(self.header, str):
|
||||
if self.header:
|
||||
self.header = json.loads(self.header)
|
||||
self.data = self.replace_data(self.data, phone)
|
||||
self.url = self.replace_data(self.url, phone)
|
||||
return self
|
||||
from .model import API, default_header
|
||||
|
||||
|
||||
def test_resq(api: API, phone) -> httpx.Response:
|
||||
@@ -78,7 +15,7 @@ def test_resq(api: API, phone) -> httpx.Response:
|
||||
if not isinstance(api.data, dict):
|
||||
print("data")
|
||||
resp = client.request(method=api.method, headers=api.header,
|
||||
url=api.url, data=api.data)
|
||||
url=api.url, data=api.data)
|
||||
else:
|
||||
print('json')
|
||||
resp = client.request(
|
||||
|
||||
@@ -1,9 +1,25 @@
|
||||
# encoding=utf8
|
||||
# flask app views
|
||||
from . import main
|
||||
import json
|
||||
from ..model import Apis, API
|
||||
from ..utils import test_resq
|
||||
from flask_app import db
|
||||
from flask import request, jsonify
|
||||
|
||||
@main.route("/",methods=['GET','POST'])
|
||||
|
||||
@main.route("/", methods=['GET', 'POST'])
|
||||
def index():
|
||||
return "index"
|
||||
|
||||
|
||||
@main.route("/testapi/", methods=['GET', 'POST'])
|
||||
def testapi():
|
||||
try:
|
||||
req = request.json
|
||||
api = API(**req)
|
||||
resp = test_resq(api, phone=req.get('phone'))
|
||||
print(resp.text)
|
||||
return jsonify({"status": 0, "resp": resp.text})
|
||||
except Exception as why:
|
||||
return jsonify({"status": 1, "resp": why})
|
||||
|
||||
@@ -6,6 +6,10 @@ from pathlib import Path
|
||||
import json
|
||||
from flask_app import db, app
|
||||
from flask_app.model import Apis
|
||||
from utils import API
|
||||
|
||||
json_path = Path(app.root_path).parent.joinpath(
|
||||
"api.json")
|
||||
|
||||
|
||||
@click.command()
|
||||
@@ -23,8 +27,7 @@ def init(drop):
|
||||
@logger.catch()
|
||||
def json2sqlite():
|
||||
"""将json数据转为sqlite数据库"""
|
||||
j = Path(app.root_path).parent.joinpath(
|
||||
"api.json").read_text(encoding="utf8")
|
||||
j = json_path.read_text(encoding="utf8")
|
||||
|
||||
jss = json.loads(j)
|
||||
for js in jss:
|
||||
@@ -47,6 +50,36 @@ def json2sqlite():
|
||||
logger.success("json To sqlite 成功!")
|
||||
|
||||
|
||||
@click.command()
|
||||
@logger.catch()
|
||||
def sqlite2json():
|
||||
"""将sqlite数据转为json"""
|
||||
apis = Apis.query.all()
|
||||
apis_ = []
|
||||
for api in apis:
|
||||
# print(api.url)
|
||||
data = {
|
||||
"desc": api.desc,
|
||||
"url": api.url,
|
||||
"method": api.method,
|
||||
"data": api.data,
|
||||
"header": api.header,
|
||||
}
|
||||
try:
|
||||
api = API(**data).handle_API()
|
||||
apis_.append(api.dict())
|
||||
except:
|
||||
pass
|
||||
|
||||
data = json.loads(json_path.read_text(encoding='utf8'))
|
||||
with open(json_path, mode="w", encoding="utf8") as j:
|
||||
try:
|
||||
json.dump(data, j, ensure_ascii=False, sort_keys=False)
|
||||
logger.success("sqlite->json 成功!")
|
||||
except Exception:
|
||||
logger.exception("写入到 json 文件错误!")
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.option('--host', '-h', help='监听地址', default="0.0.0.0")
|
||||
@click.option('--port', '-p', help='监听端口', default=9090)
|
||||
@@ -63,6 +96,7 @@ def cli():
|
||||
cli.add_command(init)
|
||||
cli.add_command(start)
|
||||
cli.add_command(json2sqlite)
|
||||
cli.add_command(sqlite2json)
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
||||
|
||||
10
utils.py
10
utils.py
@@ -81,11 +81,13 @@ class API(BaseModel):
|
||||
|
||||
def replace_data(self, content: Union[str, dict], phone: str) -> str:
|
||||
# 统一转换成 str 再替换. ' -> "
|
||||
content = str(content).replace("[phone]", phone).replace(
|
||||
"[timestamp]", self.timestamp_new()).replace("'", '"')
|
||||
if phone:
|
||||
content = str(content).replace("[phone]", phone).replace(
|
||||
"[timestamp]", self.timestamp_new()).replace("'", '"')
|
||||
|
||||
# 尝试 json 化
|
||||
try:
|
||||
return json.loads(content)
|
||||
return json.loads(content.replace("'", '"'))
|
||||
except:
|
||||
return content
|
||||
|
||||
@@ -93,7 +95,7 @@ class API(BaseModel):
|
||||
"""返回整数字符串时间戳"""
|
||||
return str(int(datetime.now().timestamp()))
|
||||
|
||||
def handle_API(self, phone: str):
|
||||
def handle_API(self, phone: str=None):
|
||||
""" 传入手机号处理 API
|
||||
:param API: one API basemodel
|
||||
:return: API basemodel
|
||||
|
||||
Reference in New Issue
Block a user