diff --git a/flask_app/__init__.py b/flask_app/__init__.py
index 3b07407..7ccaca9 100644
--- a/flask_app/__init__.py
+++ b/flask_app/__init__.py
@@ -1,10 +1,10 @@
# encoding=utf8
# app 工厂函数
-from flask import Flask
+from flask import Flask,current_app
from flask_sqlalchemy import SQLAlchemy
from flask_admin import Admin
from flask_admin.contrib.sqla import ModelView
-from flask_babelex import Babel
+from flask_babelex import Babel
import sys
import os
from loguru import logger
@@ -28,9 +28,8 @@ logger.add(
app = Flask(__name__)
+
# app config
-
-
class AppConfig:
SQLALCHEMY_DATABASE_URI = prefix + \
os.path.join(app.root_path, 'data.db') # 数据库路径
@@ -40,18 +39,23 @@ class AppConfig:
# 密钥
SESSION_TYPE = 'filesystem'
SECRET_KEY = os.urandom(24)
-
+
BABEL_DEFAULT_LOCALE = 'zh_CN' # 汉化
+ TEST_PHONE = "19820294268" # 测试手机号
+
app.config.from_object(AppConfig)
+# 设置模板全局变量
+# print(app.config.get("TEST_PHONE"))
+app.add_template_global(current_app,"current_app")
# 扩展
db = SQLAlchemy(app)
babel = Babel(app)
admin = Admin(app, name="短信接口调试", template_mode='bootstrap3')
-from .model import ApisModelVies,Apis
+from .model import ApisModelVies, Apis
admin.add_view(ApisModelVies(Apis, db.session))
# buleprint
diff --git a/flask_app/model.py b/flask_app/model.py
index b178e41..9afcbd5 100644
--- a/flask_app/model.py
+++ b/flask_app/model.py
@@ -3,6 +3,13 @@
from . import db
from datetime import datetime
from . import ModelView
+import json
+from typing import Union,Optional
+from pydantic import BaseModel
+
+default_header = {
+ "User-Agent": "Mozilla/5.0 (Linux; U; Android 10; zh-cn; Mi 10 Build/QKQ1.191117.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/79.0.3945.147 Mobile Safari/537.36 XiaoMi/MiuiBrowser/13.5.40"
+}
class ApisModelVies(ModelView):
create_template = 'api_edit.html'
@@ -23,3 +30,37 @@ class Apis(db.Model):
header = db.Column(db.String(9999)) # 请求头
data = db.Column(db.String(9999)) # 请求数据
add_time = db.Column(db.DateTime(), default=datetime.now) # 添加时间
+
+
+class API(BaseModel):
+ desc: str = "Default"
+ url: str
+ method: str = "GET"
+ header: Optional[Union[str, dict]] = default_header
+ data: Optional[Union[str, dict]]
+
+ def replace_data(self, content: Union[str, dict], phone) -> str:
+ # 统一转换成 str 再替换.
+ content = str(content).replace("[phone]", phone).replace(
+ "[timestamp]", self.timestamp_new()).replace("'",'"')
+ # 尝试 json 化
+ try:
+ return json.loads(content)
+ except:
+ return content
+
+ def timestamp_new(self) -> str:
+ """返回整数字符串时间戳"""
+ return str(int(datetime.now().timestamp()))
+
+ def handle_API(self, phone=None):
+ """
+ :param API: one API basemodel
+ :return: API basemodel
+ """
+ if isinstance(self.header, str) and self.header:
+ self.header = self.replace_data(self.header, phone)
+ self.data = self.replace_data(self.data, phone)
+ self.url = self.replace_data(self.url, phone)
+ return self
+
diff --git a/flask_app/static/test.js b/flask_app/static/test.js
new file mode 100644
index 0000000..9beeaf7
--- /dev/null
+++ b/flask_app/static/test.js
@@ -0,0 +1,52 @@
+
+
+function getValue() {
+ let desc = $("#desc").val();
+ let url = $("#url").val();
+ let method = $("#method").val();
+ let header = $("#header").val();
+ let phone = $("#phone").val();
+ let data_ = $("#data").val();
+
+ let data = {
+ "desc": desc,
+ "url": url,
+ "method": method,
+ "header": header,
+ "phone": phone,
+ "data": data_
+ };
+
+ return data;
+};
+
+$(document).ready(function () {
+
+
+
+ $("#test").click(function () {
+
+ $.ajax({
+ type: "POST",
+ url: "/testapi/",
+ contentType: "application/json",
+ data: JSON.stringify(getValue()),
+ dataType: "json",
+ success: function (response) {
+ if (response.status == 0) {
+ $("#suc").show().text("请求成功!:" + response.resp);
+ } else {
+ $("#suc").attr("class", "alert alert-warning").show().text("请求失败!:" + response.resp);
+ }
+ },
+ error: function (XMLHttpRequest, textStatus, errorThrown) {
+ $("#error").show().text("发送请求错误请检查后端接口:" + textStatus);
+ },
+ });
+
+ });
+
+ // console.log(desc, url, method, header);
+
+
+});
\ No newline at end of file
diff --git a/flask_app/templates/api_edit.html b/flask_app/templates/api_edit.html
index ba478b6..479e81c 100644
--- a/flask_app/templates/api_edit.html
+++ b/flask_app/templates/api_edit.html
@@ -4,6 +4,13 @@
短信接口添加
替换字符: 手机号[phone] 时间戳[timestamp]
{{ super() }}
-
+
+
{% endblock %}
+
+{% block tail %}
+请求错误!
+请求成功!
+
+{% endblock %}
diff --git a/flask_app/utils.py b/flask_app/utils.py
index f61463e..ffe3691 100644
--- a/flask_app/utils.py
+++ b/flask_app/utils.py
@@ -1,70 +1,7 @@
# encoding=utf8
-from pathlib import Path
-from loguru import logger
-import sys
-from pydantic import BaseModel, validator
-from typing import Optional, Union
import httpx
-from datetime import datetime
import json
-
-# logger config
-logger.remove()
-logger.add(
- sink=sys.stdout,
- format="{time:YYYY-MM-DD at HH:mm:ss} - {level} - {message}",
- colorize=True,
- backtrace=True
-)
-
-json_path = Path(Path(__file__).parent.parent, "api.json")
-if not json_path.exists():
- logger.error("Json file not exists in default directory!")
- sys.exit(1)
-
-default_header = {
- "User-Agent": "Mozilla/5.0 (Linux; U; Android 10; zh-cn; Mi 10 Build/QKQ1.191117.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/79.0.3945.147 Mobile Safari/537.36 XiaoMi/MiuiBrowser/13.5.40"
-}
-
-
-class API(BaseModel):
- desc: str = "Default"
- url: str
- method: str = "GET"
- header: Optional[Union[str, dict]] = default_header
- data: Optional[Union[str, dict]]
-
-
- def replace_data(self, content: Union[str, dict], phone) -> str:
- if not phone:
- return content
- # 统一转换成 str 再替换.
- content = str(content).replace("[phone]", phone).replace(
- "[timestamp]", self.timestamp_new()).replace("'",'"')
- # 尝试 json 化
- try:
- return json.loads(content)
- except:
- return content
-
- def timestamp_new(self) -> str:
- """返回整数字符串时间戳"""
- return str(int(datetime.now().timestamp()))
-
- def handle_API(self, phone=None):
- """
- :param API: one API basemodel
- :return: API basemodel
- """
- # if isinstance(self.data, str):
- # if self.data:
- # self.data = json.loads(self.data)
- if isinstance(self.header, str):
- if self.header:
- self.header = json.loads(self.header)
- self.data = self.replace_data(self.data, phone)
- self.url = self.replace_data(self.url, phone)
- return self
+from .model import API, default_header
def test_resq(api: API, phone) -> httpx.Response:
@@ -78,7 +15,7 @@ def test_resq(api: API, phone) -> httpx.Response:
if not isinstance(api.data, dict):
print("data")
resp = client.request(method=api.method, headers=api.header,
- url=api.url, data=api.data)
+ url=api.url, data=api.data)
else:
print('json')
resp = client.request(
diff --git a/flask_app/views/views.py b/flask_app/views/views.py
index ca59619..e0d1513 100644
--- a/flask_app/views/views.py
+++ b/flask_app/views/views.py
@@ -1,9 +1,25 @@
# encoding=utf8
# flask app views
from . import main
+import json
+from ..model import Apis, API
+from ..utils import test_resq
from flask_app import db
+from flask import request, jsonify
-@main.route("/",methods=['GET','POST'])
+
+@main.route("/", methods=['GET', 'POST'])
def index():
return "index"
+
+@main.route("/testapi/", methods=['GET', 'POST'])
+def testapi():
+ try:
+ req = request.json
+ api = API(**req)
+ resp = test_resq(api, phone=req.get('phone'))
+ print(resp.text)
+ return jsonify({"status": 0, "resp": resp.text})
+ except Exception as why:
+ return jsonify({"status": 1, "resp": why})
diff --git a/run_fask_app.py b/run_fask_app.py
index 38cdd4d..40dfe24 100644
--- a/run_fask_app.py
+++ b/run_fask_app.py
@@ -6,6 +6,10 @@ from pathlib import Path
import json
from flask_app import db, app
from flask_app.model import Apis
+from utils import API
+
+json_path = Path(app.root_path).parent.joinpath(
+ "api.json")
@click.command()
@@ -23,8 +27,7 @@ def init(drop):
@logger.catch()
def json2sqlite():
"""将json数据转为sqlite数据库"""
- j = Path(app.root_path).parent.joinpath(
- "api.json").read_text(encoding="utf8")
+ j = json_path.read_text(encoding="utf8")
jss = json.loads(j)
for js in jss:
@@ -47,6 +50,36 @@ def json2sqlite():
logger.success("json To sqlite 成功!")
+@click.command()
+@logger.catch()
+def sqlite2json():
+ """将sqlite数据转为json"""
+ apis = Apis.query.all()
+ apis_ = []
+ for api in apis:
+ # print(api.url)
+ data = {
+ "desc": api.desc,
+ "url": api.url,
+ "method": api.method,
+ "data": api.data,
+ "header": api.header,
+ }
+ try:
+ api = API(**data).handle_API()
+ apis_.append(api.dict())
+ except:
+ pass
+
+ data = json.loads(json_path.read_text(encoding='utf8'))
+ with open(json_path, mode="w", encoding="utf8") as j:
+ try:
+ json.dump(data, j, ensure_ascii=False, sort_keys=False)
+ logger.success("sqlite->json 成功!")
+ except Exception:
+ logger.exception("写入到 json 文件错误!")
+
+
@click.command()
@click.option('--host', '-h', help='监听地址', default="0.0.0.0")
@click.option('--port', '-p', help='监听端口', default=9090)
@@ -63,6 +96,7 @@ def cli():
cli.add_command(init)
cli.add_command(start)
cli.add_command(json2sqlite)
+cli.add_command(sqlite2json)
if __name__ == "__main__":
cli()
diff --git a/utils.py b/utils.py
index 92258ab..39e9136 100644
--- a/utils.py
+++ b/utils.py
@@ -81,11 +81,13 @@ class API(BaseModel):
def replace_data(self, content: Union[str, dict], phone: str) -> str:
# 统一转换成 str 再替换. ' -> "
- content = str(content).replace("[phone]", phone).replace(
- "[timestamp]", self.timestamp_new()).replace("'", '"')
+ if phone:
+ content = str(content).replace("[phone]", phone).replace(
+ "[timestamp]", self.timestamp_new()).replace("'", '"')
+
# 尝试 json 化
try:
- return json.loads(content)
+ return json.loads(content.replace("'", '"'))
except:
return content
@@ -93,7 +95,7 @@ class API(BaseModel):
"""返回整数字符串时间戳"""
return str(int(datetime.now().timestamp()))
- def handle_API(self, phone: str):
+ def handle_API(self, phone: str=None):
""" 传入手机号处理 API
:param API: one API basemodel
:return: API basemodel