mirror of
https://github.com/WhaleFell/SMSBoom.git
synced 2026-02-06 03:43:50 +08:00
123 lines
3.7 KiB
Python
123 lines
3.7 KiB
Python
# encoding=utf8
|
|
import json
|
|
import time
|
|
from flask import Flask, make_response, request, jsonify, render_template
|
|
from flask_cors import CORS
|
|
from urllib3 import disable_warnings
|
|
|
|
from utils import *
|
|
|
|
disable_warnings()
|
|
|
|
app = Flask(__name__)
|
|
CORS(app, supports_credentials=True, resources="/*") # 跨域
|
|
|
|
# 解决与 vue 冲突
|
|
app.jinja_env.variable_start_string = '[['
|
|
app.jinja_env.variable_end_string = ']]'
|
|
|
|
def request_parse(req_data: request) -> dict:
|
|
'''解析请求数据并以字典的形式返回'''
|
|
if req_data.method == 'POST':
|
|
data = req_data.form
|
|
|
|
elif req_data.method == 'GET':
|
|
data = req_data.args
|
|
|
|
return dict(data)
|
|
|
|
|
|
class BaseResponse(BaseModel):
|
|
"""返回的响应"""
|
|
status: int = 0 # 状态码 0-->成功 1-->失败
|
|
msg: str = "前端显示的简短信息"
|
|
data: Optional[str]
|
|
|
|
@property
|
|
def resp(self):
|
|
'''BaseModel类型返回json'''
|
|
response = make_response(
|
|
json.dumps(
|
|
self.dict(),
|
|
ensure_ascii=False,
|
|
sort_keys=False
|
|
),
|
|
)
|
|
response.mimetype = 'application/json'
|
|
# 跨域设置
|
|
response.headers['Access-Control-Allow-Origin'] = '*'
|
|
response.headers['Access-Control-Allow-Methods'] = 'OPTIONS,HEAD,GET,POST'
|
|
response.headers['Access-Control-Allow-Headers'] = 'x-requested-with'
|
|
return response
|
|
|
|
|
|
@app.route("/", methods=['GET'])
|
|
def index():
|
|
return render_template('admin.html')
|
|
|
|
@app.route("/testapi/", methods=['POST'])
|
|
@logger.catch
|
|
def testapi():
|
|
brs = BaseResponse()
|
|
# 需要传入 json 数据
|
|
try:
|
|
jsonData = request.get_json()
|
|
api = API(**jsonData)
|
|
phone = jsonData.get('phone')
|
|
if not phone:
|
|
raise ValueError("参数 phone 没有!")
|
|
try:
|
|
resp = test_resq(api, phone)
|
|
brs.status = 0
|
|
brs.msg = f"请求成功!{resp}"
|
|
brs.data = resp.text
|
|
except Exception as why:
|
|
brs.status = 1
|
|
brs.msg = f"请求失败:{why}"
|
|
except Exception as why:
|
|
brs.status = 1
|
|
brs.msg = f"参数有误:{why}"
|
|
return brs.resp
|
|
|
|
|
|
@app.route("/submitapi/", methods=['POST'])
|
|
def submitapi():
|
|
"""提交API到json文件"""
|
|
# 需要传入 json 数据
|
|
jsonData = request.get_json()
|
|
api = API(**jsonData).handle_API()
|
|
data = json.loads(json_path.read_text(encoding='utf8'))
|
|
with open(json_path, mode="w", encoding="utf8") as j:
|
|
try:
|
|
data.append(api.dict())
|
|
json.dump(data, j, ensure_ascii=False, sort_keys=False)
|
|
return BaseResponse(status=0, msg="写入成功!").resp
|
|
except Exception as why:
|
|
return BaseResponse(status=1, msg=f"写入失败!{why}").resp
|
|
|
|
|
|
@app.route("/backapi/", methods=['GET', 'POST'])
|
|
def backjson():
|
|
"""备份json文件"""
|
|
try:
|
|
timeStruct = time.localtime(int(time.time()))
|
|
strTime = time.strftime("%Y_%m_%d_%H_%M_%S", timeStruct)
|
|
Path(json_path.parent, 'apiback').mkdir(exist_ok=True)
|
|
json_back_path = Path(json_path.parent, 'apiback',
|
|
f"api_back_{strTime}.json")
|
|
with open(json_back_path, mode="w") as j:
|
|
j_data = json.loads(json_path.read_text(encoding='utf8'))
|
|
json.dump(j_data, j, ensure_ascii=False, sort_keys=False)
|
|
return BaseResponse(status=0, msg="备份成功!").resp
|
|
except Exception as why:
|
|
return BaseResponse(status=1, msg=f"备份失败{why}").resp
|
|
|
|
|
|
@app.route("/downloadapi/", methods=['GET'])
|
|
def downloadapi():
|
|
"""下载接口文件"""
|
|
return json_path.read_text(encoding='utf8')
|
|
|
|
|
|
app.run(host="0.0.0.0", port=10981, debug=True)
|