Merge pull request #2848 from InfinityPacer/feature/security

This commit is contained in:
jxxghp
2024-10-14 06:49:38 +08:00
committed by GitHub
8 changed files with 265 additions and 154 deletions

View File

@@ -1,55 +1,16 @@
from typing import List, Any
from typing import Any, List
import requests
from fastapi import APIRouter, Depends, Response
from fastapi import APIRouter, Depends
from app import schemas
from app.chain.douban import DoubanChain
from app.core.config import settings
from app.core.context import MediaInfo
from app.core.security import verify_token, verify_resource_token
from app.core.security import verify_token
from app.schemas import MediaType
from app.utils.http import RequestUtils
router = APIRouter()
@router.get("/img", summary="豆瓣图片代理")
def douban_img(imgurl: str, _: schemas.TokenPayload = Depends(verify_resource_token)) -> Any:
"""
豆瓣图片代理
"""
def __download_image(url: str) -> requests.Response:
return RequestUtils(headers={
'Referer': "https://movie.douban.com/"
}, ua=settings.USER_AGENT).get_res(url=url)
if not imgurl:
return None
if settings.GLOBAL_IMAGE_CACHE:
# 获取Url中除域名外的路径
url_path = "/".join(imgurl.split('/')[3:])
# 生成缓存文件路径
cache_path = settings.CACHE_PATH / 'images' / url_path
# 如果缓存文件不存在,下载图片并保存
if not cache_path.exists():
response = __download_image(imgurl)
if response:
if not cache_path.parent.exists():
cache_path.parent.mkdir(parents=True)
with open(cache_path, 'wb') as f:
f.write(response.content)
return Response(content=response.content, media_type="image/jpeg")
else:
return Response(content=cache_path.read_bytes(), media_type="image/jpeg")
else:
response = __download_image(imgurl)
if response:
return Response(content=response.content, media_type="image/jpeg")
return None
@router.get("/person/{person_id}", summary="人物详情", response_model=schemas.MediaPerson)
def douban_person(person_id: int,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:

View File

@@ -1,30 +1,38 @@
import io
import json
import tempfile
import time
from datetime import datetime
from typing import Union, Any
from pathlib import Path
from typing import Any, Union
import tailer
from fastapi import APIRouter, Depends, Response
from PIL import Image
from fastapi import APIRouter, Depends, HTTPException, Response
from fastapi.responses import StreamingResponse
from app import schemas
from app.chain.search import SearchChain
from app.chain.system import SystemChain
from app.core.config import settings, global_vars
from app.core.config import global_vars, settings
from app.core.module import ModuleManager
from app.core.security import verify_token, verify_apitoken, verify_resource_token
from app.core.security import verify_apitoken, verify_resource_token, verify_token
from app.db.models import User
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_superuser
from app.helper.mediaserver import MediaServerHelper
from app.helper.message import MessageHelper
from app.helper.progress import ProgressHelper
from app.helper.rule import RuleHelper
from app.helper.sites import SitesHelper
from app.log import logger
from app.monitor import Monitor
from app.scheduler import Scheduler
from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils
from app.utils.security import SecurityUtils
from app.utils.system import SystemUtils
from app.utils.url import UrlUtils
from version import APP_VERSION
router = APIRouter()
@@ -37,14 +45,36 @@ def proxy_img(imgurl: str, proxy: bool = False,
图片代理,可选是否使用代理服务器
"""
if not imgurl:
return None
if proxy:
response = RequestUtils(ua=settings.USER_AGENT, proxies=settings.PROXY).get_res(url=imgurl)
else:
response = RequestUtils(ua=settings.USER_AGENT).get_res(url=imgurl)
if response:
return Response(content=response.content, media_type="image/jpeg")
return None
raise HTTPException(status_code=404, detail="Not Found")
# 媒体服务器添加图片代理支持
hosts = [config.config.get("host") for config in MediaServerHelper().get_configs().values() if
config and config.config and config.config.get("host")]
allowed_domains = set(settings.SECURITY_IMAGE_DOMAINS) | set(hosts)
# 验证URL安全性
if not SecurityUtils.is_safe_url(imgurl, allowed_domains, strict=True):
raise HTTPException(status_code=404, detail="Not Found")
referer = "https://movie.douban.com/" if "doubanio.com" in imgurl else None
proxies = settings.PROXY if proxy else None
mime_type = "image/jpeg"
response = RequestUtils(ua=settings.USER_AGENT, proxies=proxies, referer=referer).get_res(url=imgurl)
if not response:
logger.debug(f"Failed to fetch image from URL: {imgurl}")
raise HTTPException(status_code=502, detail="Failed to fetch the image from the remote server.")
# 验证下载的内容是否为有效图片
try:
Image.open(io.BytesIO(response.content)).verify()
except Exception as e:
logger.debug(f"Invalid image format for URL {imgurl}: {e}")
raise HTTPException(status_code=502, detail="Invalid image format.")
# 获取 MIME 类型
mime_type = response.headers.get("Content-Type") or UrlUtils.get_mime_type(imgurl, mime_type)
return Response(content=response.content, media_type=mime_type)
@router.get("/cache/image", summary="图片缓存")
@@ -52,27 +82,63 @@ def cache_img(url: str, _: schemas.TokenPayload = Depends(verify_resource_token)
"""
本地缓存图片文件
"""
# 获取Url中除域名外的路径
url_path = "/".join(url.split('/')[3:])
# 生成缓存文件路径
cache_path = settings.CACHE_PATH / 'images' / url_path
# 豆瓣设置Referer
referer = None
if 'doubanio.com' in url:
referer = "https://movie.douban.com/"
# 如果缓存文件不存在,下载图片并保存
if not cache_path.exists():
response = RequestUtils(ua=settings.USER_AGENT, referer=referer).get_res(url=url)
if response:
if not cache_path.parent.exists():
cache_path.parent.mkdir(parents=True)
with open(cache_path, 'wb') as f:
f.write(response.content)
return Response(content=response.content, media_type="image/jpeg")
else:
return None
else:
return Response(content=cache_path.read_bytes(), media_type="image/jpeg")
# 如果没有启用全局图片缓存,则默认使用图片代理的方案
if not settings.GLOBAL_IMAGE_CACHE:
return proxy_img(imgurl=url)
if not url:
raise HTTPException(status_code=404, detail="Not Found")
# 验证URL安全性
if not SecurityUtils.is_safe_url(url, settings.SECURITY_IMAGE_DOMAINS):
raise HTTPException(status_code=404, detail="Not Found")
# 生成缓存路径
url_path = SecurityUtils.sanitize_url_path(url)
cache_path = settings.CACHE_PATH / "images" / url_path
# 确保缓存路径和文件类型合法
if not SecurityUtils.is_safe_path(settings.CACHE_PATH, cache_path, settings.SECURITY_IMAGE_SUFFIXES):
raise HTTPException(status_code=404, detail="Not Found")
referer = "https://movie.douban.com/" if "doubanio.com" in url else None
mime_type = "image/jpeg"
# 如果缓存文件已存在,直接读取并返回
if cache_path.exists():
try:
content = cache_path.read_bytes()
return Response(content=content, media_type=UrlUtils.get_mime_type(cache_path, mime_type))
except Exception as e:
logger.debug(f"Failed to read cache file {cache_path}: {e}")
raise HTTPException(status_code=400, detail="Internal Server Error")
# 请求远程图片
response = RequestUtils(ua=settings.USER_AGENT, referer=referer).get_res(url=url)
if not response:
raise HTTPException(status_code=502, detail="Failed to fetch the image from the remote server")
# 验证下载的内容是否为有效图片
try:
Image.open(io.BytesIO(response.content)).verify()
except Exception as e:
logger.debug(f"Invalid image format for URL {url}: {e}")
raise HTTPException(status_code=502, detail="Invalid image format")
# 创建父目录并保存图片
if not cache_path.parent.exists():
cache_path.parent.mkdir(parents=True, exist_ok=True)
try:
with tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file:
tmp_file.write(response.content)
temp_path = Path(tmp_file.name)
temp_path.rename(cache_path)
except Exception as e:
logger.debug(f"Failed to write cache file {cache_path}: {e}")
media_type = response.headers.get("Content-Type") or UrlUtils.get_mime_type(url, mime_type)
return Response(content=response.content, media_type=media_type)
@router.get("/global", summary="查询非敏感系统设置", response_model=schemas.Response)
@@ -214,6 +280,12 @@ def get_logging(length: int = 50, logfile: str = "moviepilot.log",
"""
log_path = settings.LOG_PATH / logfile
if not SecurityUtils.is_safe_path(settings.LOG_PATH, log_path, allowed_suffixes={".log"}):
raise HTTPException(status_code=404, detail="Not Found")
if not log_path.exists() or not log_path.is_file():
raise HTTPException(status_code=404, detail="Not Found")
def log_generator():
# 读取文件末尾50行不使用tailer模块
with open(log_path, 'r', encoding='utf-8') as f:

View File

@@ -4,10 +4,10 @@ import secrets
import sys
import threading
from pathlib import Path
from typing import Optional, List, Any, Type, Tuple, Dict
from typing import Any, Dict, List, Optional, Tuple, Type
from dotenv import set_key
from pydantic import BaseSettings, validator, BaseModel
from pydantic import BaseModel, BaseSettings, validator
from app.log import logger
from app.utils.system import SystemUtils
@@ -197,6 +197,11 @@ class ConfigModel(BaseModel):
BIG_MEMORY_MODE: bool = False
# 全局图片缓存,将媒体图片缓存到本地
GLOBAL_IMAGE_CACHE: bool = False
# 允许的图片缓存域名
SECURITY_IMAGE_DOMAINS: List[str] = ["image.tmdb.org", "static-mdb.v.geilijiasu.com", "doubanio.com", "lain.bgm.tv",
"raw.githubusercontent.com", "github.com"]
# 允许的图片文件后缀格式
SECURITY_IMAGE_SUFFIXES: List[str] = [".jpg", ".jpeg", ".png", ".webp", ".gif", ".svg"]
class Settings(BaseSettings, ConfigModel):

View File

@@ -130,8 +130,8 @@ def __set_or_refresh_resource_token_cookie(request: Request, response: Response,
key=settings.PROJECT_NAME,
value=resource_token,
httponly=True,
secure=request.url.scheme == "https",
samesite="strict"
secure=request.url.scheme == "https", # 根据当前请求的协议设置 secure 属性
samesite="lax" # 不同浏览器对 "Strict" 的处理可能不同,设置 SameSite 为 "Lax",以平衡安全性和兼容性
)

View File

@@ -291,15 +291,15 @@ class Plex:
query = {"X-Plex-Token": self._token}
if image_type == "Poster":
if item.thumb:
image_url = RequestUtils.combine_url(host=self._playhost, path=item.thumb, query=query)
image_url = UrlUtils.combine_url(host=self._playhost, path=item.thumb, query=query)
else:
# 默认使用art也就是Backdrop进行处理
if item.art:
image_url = RequestUtils.combine_url(host=self._playhost, path=item.art, query=query)
image_url = UrlUtils.combine_url(host=self._playhost, path=item.art, query=query)
# 这里对episode进行特殊处理实际上episode的Backdrop是Poster
# 也有个别情况比如机智的凡人小子episode就是Poster因此这里把episode的优先级降低默认还是取art
if not image_url and item.TYPE == "episode" and item.thumb:
image_url = RequestUtils.combine_url(host=self._playhost, path=item.thumb, query=query)
image_url = UrlUtils.combine_url(host=self._playhost, path=item.thumb, query=query)
else:
if image_type == "Poster":
images = self._plex.fetchItems(ekey=f"{ekey}/posters",
@@ -825,7 +825,7 @@ class Plex:
if not self._session:
return
try:
url = RequestUtils.adapt_request_url(host=self._host, endpoint=endpoint)
url = UrlUtils.adapt_request_url(host=self._host, endpoint=endpoint)
kwargs.setdefault("headers", self.__get_request_headers())
kwargs.setdefault("raise_exception", True)
request_method = getattr(RequestUtils(session=self._session), f"{method}_res", None)

View File

@@ -1,13 +1,11 @@
from typing import Union, Any, Optional
from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
from typing import Any, Optional, Union
import requests
import urllib3
from requests import Session, Response
from requests import Response, Session
from urllib3.exceptions import InsecureRequestWarning
from app.log import logger
from app.utils.url import UrlUtils
urllib3.disable_warnings(InsecureRequestWarning)
@@ -225,68 +223,4 @@ class RequestUtils:
cookie_dict[cstr[0].strip()] = cstr[1].strip()
if array:
return [{"name": k, "value": v} for k, v in cookie_dict.items()]
return cookie_dict
@staticmethod
def standardize_base_url(host: str) -> str:
"""
标准化提供的主机地址确保它以http://或https://开头,并且以斜杠(/)结尾
:param host: 提供的主机地址字符串
:return: 标准化后的主机地址字符串
"""
if not host:
return host
if not host.endswith("/"):
host += "/"
if not host.startswith("http://") and not host.startswith("https://"):
host = "http://" + host
return host
@staticmethod
def adapt_request_url(host: str, endpoint: str) -> Optional[str]:
"""
基于传入的host适配请求的URL确保每个请求的URL是完整的用于在发送请求前自动处理和修正请求的URL。
:param host: 主机头
:param endpoint: 端点
:return: 完整的请求URL字符串
"""
if not host and not endpoint:
return None
if endpoint.startswith(("http://", "https://")):
return endpoint
host = UrlUtils.standardize_base_url(host)
return urljoin(host, endpoint) if host else endpoint
@staticmethod
def combine_url(host: str, path: Optional[str] = None, query: Optional[dict] = None) -> Optional[str]:
"""
使用给定的主机头、路径和查询参数组合生成完整的URL。
:param host: str, 主机头,例如 https://example.com
:param path: Optional[str], 包含路径和可能已经包含的查询参数的端点,例如 /path/to/resource?current=1
:param query: Optional[dict], 可选,额外的查询参数,例如 {"key": "value"}
:return: str, 完整的请求URL字符串
"""
try:
# 如果路径为空,则默认为 '/'
if path is None:
path = '/'
host = UrlUtils.standardize_base_url(host)
# 使用 urljoin 合并 host 和 path
url = urljoin(host, path)
# 解析当前 URL 的组成部分
url_parts = urlparse(url)
# 解析已存在的查询参数,并与额外的查询参数合并
query_params = parse_qs(url_parts.query)
if query:
for key, value in query.items():
query_params[key] = value
# 重新构建查询字符串
query_string = urlencode(query_params, doseq=True)
# 构建完整的 URL
new_url_parts = url_parts._replace(query=query_string)
complete_url = urlunparse(new_url_parts)
return str(complete_url)
except Exception as e:
logger.debug(f"Error combining URL: {e}")
return None
return cookie_dict

113
app/utils/security.py Normal file
View File

@@ -0,0 +1,113 @@
from hashlib import sha256
from pathlib import Path
from typing import List, Optional, Set, Union
from urllib.parse import quote, urlparse
from app.log import logger
class SecurityUtils:
@staticmethod
def is_safe_path(base_path: Path, user_path: Path,
allowed_suffixes: Optional[Union[Set[str], List[str]]] = None) -> bool:
"""
验证用户提供的路径是否在基准目录内,并检查文件类型是否合法,防止目录遍历攻击
:param base_path: 基准目录,允许访问的根目录
:param user_path: 用户提供的路径,需检查其是否位于基准目录内
:param allowed_suffixes: 允许的文件后缀名集合,用于验证文件类型
:return: 如果用户路径安全且位于基准目录内,且文件类型合法,返回 True否则返回 False
:raises Exception: 如果解析路径时发生错误,则捕获并记录异常
"""
try:
# resolve() 将相对路径转换为绝对路径,并处理符号链接和'..'
base_path_resolved = base_path.resolve()
user_path_resolved = user_path.resolve()
# 检查用户路径是否在基准目录或基准目录的子目录内
if base_path_resolved != user_path_resolved and base_path_resolved not in user_path_resolved.parents:
return False
if allowed_suffixes is not None:
allowed_suffixes = set(allowed_suffixes)
if user_path.suffix.lower() not in allowed_suffixes:
return False
return True
except Exception as e:
logger.debug(f"Error occurred while validating paths: {e}")
return False
@staticmethod
def is_safe_url(url: str, allowed_domains: Union[Set[str], List[str]], strict: bool = True) -> bool:
"""
验证URL是否在允许的域名列表中包括带有端口的域名。
:param url: 需要验证的 URL
:param allowed_domains: 允许的域名集合,域名可以包含端口
:param strict: 是否严格匹配一级域名(默认为 False允许多级域名
:return: 如果URL合法且在允许的域名列表中返回 True否则返回 False
"""
try:
# 解析URL
parsed_url = urlparse(url)
# 检查URL的scheme和netloc
if not parsed_url.scheme or not parsed_url.netloc:
return False
# 仅允许 http 或 https 协议
if parsed_url.scheme not in {"http", "https"}:
return False
# 获取完整的 netloc包括 IP 和端口)并转换为小写
netloc = parsed_url.netloc.lower()
allowed_domains = {d.lower() for d in allowed_domains}
if not netloc:
return False
if strict:
# 严格匹配一级域名,要求完全匹配或者子域名精确匹配
domain_parts = netloc.split(".")
for allowed_domain in allowed_domains:
allowed_parts = allowed_domain.split(".")
if domain_parts[-len(allowed_parts):] == allowed_parts:
return True
else:
# 允许匹配多级域名,或者完全匹配的 netloc包括 IP:port
for allowed_domain in allowed_domains:
if netloc == allowed_domain or netloc.endswith(f".{allowed_domain}"):
return True
return False
except Exception as e:
logger.debug(f"Error occurred while validating URL: {e}")
return False
@staticmethod
def sanitize_url_path(url: str, max_length: int = 120) -> str:
"""
将 URL 的路径部分进行编码,确保合法字符,并对路径长度进行压缩处理(如果超出最大长度)
:param url: 需要处理的 URL
:param max_length: 路径允许的最大长度,超出时进行压缩
:return: 处理后的路径字符串
"""
# 解析 URL获取路径部分
parsed_url = urlparse(url)
path = parsed_url.path.lstrip("/")
# 对路径中的特殊字符进行编码
safe_path = quote(path)
# 如果路径过长,进行压缩处理
if len(safe_path) > max_length:
# 使用 SHA-256 对路径进行哈希,取前 16 位作为压缩后的路径
hash_value = sha256(safe_path.encode()).hexdigest()[:16]
# 使用哈希值代替过长的路径,同时保留文件扩展名
file_extension = Path(safe_path).suffix.lower() if Path(safe_path).suffix else ""
safe_path = f"compressed_{hash_value}{file_extension}"
return safe_path

View File

@@ -1,5 +1,7 @@
from typing import Optional
from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
import mimetypes
from pathlib import Path
from typing import Optional, Union
from urllib.parse import parse_qs, urlencode, urljoin, urlparse, urlunparse
from app.log import logger
@@ -69,3 +71,27 @@ class UrlUtils:
except Exception as e:
logger.debug(f"Error combining URL: {e}")
return None
@staticmethod
def get_mime_type(path_or_url: Union[str, Path], default_type: str = "application/octet-stream") -> str:
"""
根据文件路径或 URL 获取 MIME 类型,如果无法获取则返回默认类型
:param path_or_url: 文件路径 (Path) 或 URL (str)
:param default_type: 无法获取类型时返回的默认 MIME 类型
:return: 获取到的 MIME 类型或默认类型
"""
try:
# 如果是 Path 类型,转换为字符串
if isinstance(path_or_url, Path):
path_or_url = str(path_or_url)
# 尝试根据路径或 URL 获取 MIME 类型
mime_type, _ = mimetypes.guess_type(path_or_url)
# 如果无法推测到类型,返回默认类型
if not mime_type:
return default_type
return mime_type
except Exception as e:
logger.debug(f"Error get_mime_type: {e}")
return default_type