Merge pull request #2700 from InfinityPacer/feature/api-token

This commit is contained in:
jxxghp
2024-09-13 08:32:10 +08:00
committed by GitHub
7 changed files with 61 additions and 27 deletions

View File

@@ -4,7 +4,7 @@ from fastapi import APIRouter, Depends, Header
from app import schemas
from app.core.plugin import PluginManager
from app.core.security import verify_token
from app.core.security import verify_token, verify_apitoken
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_superuser
from app.helper.plugin import PluginHelper
@@ -23,6 +23,10 @@ def register_plugin_api(plugin_id: str = None):
if r.path == api.get("path"):
router.routes.remove(r)
break
# 检查是否允许匿名访问,如果不允许匿名访问,则添加 API_TOKEN 验证
allow_anonymous = api.pop("allow_anonymous", False)
if not allow_anonymous:
api.setdefault("dependencies", []).append(Depends(verify_apitoken))
router.add_api_route(**api)

View File

@@ -123,7 +123,7 @@ def set_env_setting(env: dict,
v = ''
else:
v = str(v)
set_key(settings.CONFIG_PATH / "app.env", k, v)
set_key(SystemUtils.get_env_path(), k, v)
return schemas.Response(success=True)
@@ -180,7 +180,7 @@ def set_setting(key: str, value: Union[list, dict, bool, int, str] = None,
value = ''
else:
value = str(value)
set_key(settings.CONFIG_PATH / "app.env", key, value)
set_key(SystemUtils.get_env_path(), key, value)
else:
SystemConfigOper().set(key, value)
return schemas.Response(success=True)

View File

@@ -5,8 +5,10 @@ from pathlib import Path
from typing import Optional, List
from urllib.parse import urlparse
from dotenv import set_key
from pydantic import BaseSettings, validator
from app.log import logger
from app.utils.system import SystemUtils
@@ -61,7 +63,7 @@ class Settings(BaseSettings):
# 超级管理员
SUPERUSER: str = "admin"
# API密钥需要更换
API_TOKEN: str = "moviepilot"
API_TOKEN: Optional[str] = None
# 网络代理 IP:PORT
PROXY_HOST: Optional[str] = None
# 登录页面电影海报,tmdb/bing
@@ -117,7 +119,7 @@ class Settings(BaseSettings):
'.pcm', '.rmi', '.s3m', '.snd', '.spx', '.tak',
'.tta', '.vqf', '.wav', '.wma',
'.aifc', '.aiff', '.alac', '.adif', '.adts',
'.flac', '.midi', '.opus', '.sfalc']
'.flac', '.midi', '.opus', '.sfalc']
# 下载器临时文件后缀
DOWNLOAD_TMPEXT: list = ['.!qB', '.part']
# 下载器监视间隔(小时)
@@ -198,6 +200,17 @@ class Settings(BaseSettings):
except (ValueError, TypeError):
raise ValueError(f"{value} 格式错误,不是有效数字!")
@validator("API_TOKEN", pre=True, always=True)
def validate_api_token(cls, v):
if not v:
new_token = secrets.token_urlsafe(16)
logger.info(f"API_TOKEN 未设置,已随机生成新的 API_TOKEN{new_token}")
set_key(str(SystemUtils.get_env_path()), "API_TOKEN", new_token)
return new_token
elif len(v) < 16:
logger.warning("API_TOKEN 长度不足 16 个字符,存在安全隐患,建议尽快更换为更复杂的密钥!")
return v
@property
def INNER_CONFIG_PATH(self):
return self.ROOT_PATH / "config"
@@ -272,16 +285,16 @@ class Settings(BaseSettings):
"""
if self.PROXY_HOST:
parsed_url = urlparse(self.PROXY_HOST)
protocol = parsed_url.scheme or "" # 协议
username = parsed_url.username or "" # 用户名
password = parsed_url.password or "" # 密码
host = parsed_url.hostname or "" # 主机
port = parsed_url.port or "" # 端口
path = parsed_url.path or "" # 路径
netloc = parsed_url.netloc or "" # 用户名:密码@主机:端口
query = parsed_url.query or "" # 查询参数: ?key=value
params = parsed_url.params or "" # 使用;分割的参数
fragment = parsed_url.fragment or "" # 片段: #fragment
protocol = parsed_url.scheme or "" # 协议
username = parsed_url.username or "" # 用户名
password = parsed_url.password or "" # 密码
host = parsed_url.hostname or "" # 主机
port = parsed_url.port or "" # 端口
path = parsed_url.path or "" # 路径
netloc = parsed_url.netloc or "" # 用户名:密码@主机:端口
query = parsed_url.query or "" # 查询参数: ?key=value
params = parsed_url.params or "" # 使用;分割的参数
fragment = parsed_url.fragment or "" # 片段: #fragment
if not port:
if protocol == "https":
@@ -414,6 +427,8 @@ class Settings(BaseSettings):
class Config:
case_sensitive = True
env_file = SystemUtils.get_env_path()
env_file_encoding = "utf-8"
class GlobalVar(object):
@@ -451,10 +466,7 @@ class GlobalVar(object):
# 实例化配置
settings = Settings(
_env_file=Settings().CONFIG_PATH / "app.env",
_env_file_encoding="utf-8"
)
settings = Settings()
# 全局标识
global_vars = GlobalVar()

View File

@@ -426,7 +426,8 @@ class PluginManager(metaclass=Singleton):
"endpoint": self.xxx,
"methods": ["GET", "POST"],
"summary": "API名称",
"description": "API说明"
"description": "API说明",
"allow_anonymous": false
}]
"""
ret_apis = []

View File

@@ -2,7 +2,7 @@ import inspect
import logging
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Dict, Any
from typing import Dict, Any, Optional
import click
from pydantic import BaseSettings
@@ -14,6 +14,8 @@ class LogSettings(BaseSettings):
"""
日志设置
"""
# 配置文件目录
CONFIG_DIR: Optional[str] = None
# 是否为调试模式
DEBUG: bool = False
# 日志级别DEBUG、INFO、WARNING、ERROR等
@@ -27,12 +29,16 @@ class LogSettings(BaseSettings):
# 文件日志格式
LOG_FILE_FORMAT: str = "%(levelname)s%(asctime)s - %(message)s"
@property
def CONFIG_PATH(self):
return SystemUtils.get_config_path(self.CONFIG_DIR)
@property
def LOG_PATH(self):
"""
获取日志存储路径
"""
return SystemUtils.get_config_path() / "logs"
return self.CONFIG_PATH / "logs"
@property
def LOG_MAX_FILE_SIZE_BYTES(self):
@@ -43,7 +49,7 @@ class LogSettings(BaseSettings):
class Config:
case_sensitive = True
env_file = SystemUtils.get_config_path() / "app.env"
env_file = SystemUtils.get_env_path()
env_file_encoding = "utf-8"

View File

@@ -5,7 +5,7 @@ import re
import shutil
import sys
from pathlib import Path
from typing import List, Union, Tuple
from typing import List, Union, Tuple, Optional
import docker
import psutil
@@ -473,13 +473,24 @@ class SystemUtils:
return os.stat(src).st_dev == os.stat(dest).st_dev
@staticmethod
def get_config_path() -> Path:
def get_config_path(config_dir: Optional[str] = None) -> Path:
"""
获取配置路径
"""
if not config_dir:
config_dir = os.getenv("CONFIG_DIR")
if config_dir:
return Path(config_dir)
if SystemUtils.is_docker():
return Path("/config")
elif SystemUtils.is_frozen():
return Path(sys.executable).parent / "config"
else:
return Path(__file__).parents[2] / "config"
@staticmethod
def get_env_path() -> Path:
"""
获取配置路径
"""
return SystemUtils.get_config_path() / "app.env"