diff --git a/app/api/endpoints/douban.py b/app/api/endpoints/douban.py index dcaaf7d2..3de5ad8d 100644 --- a/app/api/endpoints/douban.py +++ b/app/api/endpoints/douban.py @@ -1,55 +1,16 @@ -from typing import List, Any +from typing import Any, List -import requests -from fastapi import APIRouter, Depends, Response +from fastapi import APIRouter, Depends from app import schemas from app.chain.douban import DoubanChain -from app.core.config import settings from app.core.context import MediaInfo -from app.core.security import verify_token, verify_resource_token +from app.core.security import verify_token from app.schemas import MediaType -from app.utils.http import RequestUtils router = APIRouter() -@router.get("/img", summary="豆瓣图片代理") -def douban_img(imgurl: str, _: schemas.TokenPayload = Depends(verify_resource_token)) -> Any: - """ - 豆瓣图片代理 - """ - - def __download_image(url: str) -> requests.Response: - return RequestUtils(headers={ - 'Referer': "https://movie.douban.com/" - }, ua=settings.USER_AGENT).get_res(url=url) - - if not imgurl: - return None - if settings.GLOBAL_IMAGE_CACHE: - # 获取Url中除域名外的路径 - url_path = "/".join(imgurl.split('/')[3:]) - # 生成缓存文件路径 - cache_path = settings.CACHE_PATH / 'images' / url_path - # 如果缓存文件不存在,下载图片并保存 - if not cache_path.exists(): - response = __download_image(imgurl) - if response: - if not cache_path.parent.exists(): - cache_path.parent.mkdir(parents=True) - with open(cache_path, 'wb') as f: - f.write(response.content) - return Response(content=response.content, media_type="image/jpeg") - else: - return Response(content=cache_path.read_bytes(), media_type="image/jpeg") - else: - response = __download_image(imgurl) - if response: - return Response(content=response.content, media_type="image/jpeg") - return None - - @router.get("/person/{person_id}", summary="人物详情", response_model=schemas.MediaPerson) def douban_person(person_id: int, _: schemas.TokenPayload = Depends(verify_token)) -> Any: diff --git a/app/api/endpoints/system.py b/app/api/endpoints/system.py index 7f81cb24..dd33e843 100644 --- a/app/api/endpoints/system.py +++ b/app/api/endpoints/system.py @@ -1,9 +1,13 @@ +import io import json +import tempfile import time from datetime import datetime +from pathlib import Path from typing import Any, Union import tailer +from PIL import Image from fastapi import APIRouter, Depends, HTTPException, Response from fastapi.responses import StreamingResponse @@ -16,16 +20,19 @@ from app.core.security import verify_apitoken, verify_resource_token, verify_tok from app.db.models import User from app.db.systemconfig_oper import SystemConfigOper from app.db.user_oper import get_current_active_superuser +from app.helper.mediaserver import MediaServerHelper from app.helper.message import MessageHelper from app.helper.progress import ProgressHelper from app.helper.rule import RuleHelper from app.helper.sites import SitesHelper +from app.log import logger from app.monitor import Monitor from app.scheduler import Scheduler from app.schemas.types import SystemConfigKey from app.utils.http import RequestUtils from app.utils.security import SecurityUtils from app.utils.system import SystemUtils +from app.utils.url import UrlUtils from version import APP_VERSION router = APIRouter() @@ -38,14 +45,36 @@ def proxy_img(imgurl: str, proxy: bool = False, 图片代理,可选是否使用代理服务器 """ if not imgurl: - return None - if proxy: - response = RequestUtils(ua=settings.USER_AGENT, proxies=settings.PROXY).get_res(url=imgurl) - else: - response = RequestUtils(ua=settings.USER_AGENT).get_res(url=imgurl) - if response: - return Response(content=response.content, media_type="image/jpeg") - return None + raise HTTPException(status_code=404, detail="Not Found") + + # 媒体服务器添加图片代理支持 + hosts = [config.config.get("host") for config in MediaServerHelper().get_configs().values() if + config and config.config and config.config.get("host")] + allowed_domains = set(settings.SECURITY_IMAGE_DOMAINS) | set(hosts) + + # 验证URL安全性 + if not SecurityUtils.is_safe_url(imgurl, allowed_domains, strict=True): + raise HTTPException(status_code=404, detail="Not Found") + + referer = "https://movie.douban.com/" if "doubanio.com" in imgurl else None + proxies = settings.PROXY if proxy else None + mime_type = "image/jpeg" + + response = RequestUtils(ua=settings.USER_AGENT, proxies=proxies, referer=referer).get_res(url=imgurl) + if not response: + logger.debug(f"Failed to fetch image from URL: {imgurl}") + raise HTTPException(status_code=502, detail="Failed to fetch the image from the remote server.") + + # 验证下载的内容是否为有效图片 + try: + Image.open(io.BytesIO(response.content)).verify() + except Exception as e: + logger.debug(f"Invalid image format for URL {imgurl}: {e}") + raise HTTPException(status_code=400, detail="Invalid image format.") + + # 获取 MIME 类型 + mime_type = response.headers.get("Content-Type") or UrlUtils.get_mime_type(imgurl, mime_type) + return Response(content=response.content, media_type=mime_type) @router.get("/cache/image", summary="图片缓存") @@ -53,27 +82,64 @@ def cache_img(url: str, _: schemas.TokenPayload = Depends(verify_resource_token) """ 本地缓存图片文件 """ - # 获取Url中除域名外的路径 - url_path = "/".join(url.split('/')[3:]) - # 生成缓存文件路径 - cache_path = settings.CACHE_PATH / 'images' / url_path - # 豆瓣设置Referer - referer = None - if 'doubanio.com' in url: - referer = "https://movie.douban.com/" - # 如果缓存文件不存在,下载图片并保存 - if not cache_path.exists(): - response = RequestUtils(ua=settings.USER_AGENT, referer=referer).get_res(url=url) - if response: - if not cache_path.parent.exists(): - cache_path.parent.mkdir(parents=True) - with open(cache_path, 'wb') as f: - f.write(response.content) - return Response(content=response.content, media_type="image/jpeg") - else: - return None - else: - return Response(content=cache_path.read_bytes(), media_type="image/jpeg") + # 如果没有启用全局图片缓存,则默认使用图片代理的方案 + if not settings.GLOBAL_IMAGE_CACHE: + return proxy_img(imgurl=url) + + if not url: + raise HTTPException(status_code=404, detail="Not Found") + + # 验证URL安全性 + if not SecurityUtils.is_safe_url(url, settings.SECURITY_IMAGE_DOMAINS): + raise HTTPException(status_code=404, detail="Not Found") + + # 生成缓存路径 + url_path = SecurityUtils.sanitize_url_path(url) + cache_path = settings.CACHE_PATH / "images" / url_path + + # 确保缓存路径和文件类型合法 + if not SecurityUtils.is_safe_path(settings.CACHE_PATH, cache_path, settings.SECURITY_IMAGE_SUFFIXES): + raise HTTPException(status_code=404, detail="Not Found") + + referer = "https://movie.douban.com/" if "doubanio.com" in url else None + mime_type = "image/jpeg" + + # 如果缓存文件已存在,直接读取并返回 + if cache_path.exists(): + try: + content = cache_path.read_bytes() + return Response(content=content, media_type=UrlUtils.get_mime_type(cache_path, mime_type)) + except Exception as e: + logger.debug(f"Failed to read cache file {cache_path}: {e}") + raise HTTPException(status_code=400, detail="Internal Server Error") + + # 请求远程图片 + response = RequestUtils(ua=settings.USER_AGENT, referer=referer).get_res(url=url) + if not response: + raise HTTPException(status_code=502, detail="Failed to fetch the image from the remote server") + + # 创建父目录并保存图片 + if not cache_path.parent.exists(): + cache_path.parent.mkdir(parents=True, exist_ok=True) + + try: + with tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file: + tmp_file.write(response.content) + temp_path = Path(tmp_file.name) + temp_path.rename(cache_path) + except Exception as e: + logger.debug(f"Failed to write cache file {cache_path}: {e}") + raise HTTPException(status_code=400, detail="Internal Server Error") + + # 验证下载的内容是否为有效图片 + try: + Image.open(io.BytesIO(response.content)).verify() + except Exception as e: + logger.debug(f"Invalid image format for URL {url}: {e}") + raise HTTPException(status_code=400, detail="Invalid image format") + + media_type = response.headers.get("Content-Type") or UrlUtils.get_mime_type(url, mime_type) + return Response(content=response.content, media_type=media_type) @router.get("/global", summary="查询非敏感系统设置", response_model=schemas.Response) diff --git a/app/core/config.py b/app/core/config.py index f3969ac8..445e71e5 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -4,10 +4,10 @@ import secrets import sys import threading from pathlib import Path -from typing import Optional, List, Any, Type, Tuple, Dict +from typing import Any, Dict, List, Optional, Tuple, Type from dotenv import set_key -from pydantic import BaseSettings, validator, BaseModel +from pydantic import BaseModel, BaseSettings, validator from app.log import logger from app.utils.system import SystemUtils @@ -197,6 +197,11 @@ class ConfigModel(BaseModel): BIG_MEMORY_MODE: bool = False # 全局图片缓存,将媒体图片缓存到本地 GLOBAL_IMAGE_CACHE: bool = False + # 允许的图片缓存域名 + SECURITY_IMAGE_DOMAINS: List[str] = ["image.tmdb.org", "static-mdb.v.geilijiasu.com", "doubanio.com", "lain.bgm.tv", + "raw.githubusercontent.com", "github.com"] + # 允许的图片文件后缀格式 + SECURITY_IMAGE_SUFFIXES: List[str] = [".jpg", ".jpeg", ".png", ".webp", ".gif", ".svg"] class Settings(BaseSettings, ConfigModel): diff --git a/app/utils/security.py b/app/utils/security.py index 4b10df09..098f5949 100644 --- a/app/utils/security.py +++ b/app/utils/security.py @@ -1,11 +1,16 @@ +from hashlib import sha256 from pathlib import Path -from typing import Optional, Set +from typing import List, Optional, Set, Union +from urllib.parse import quote, urlparse + +from app.log import logger class SecurityUtils: @staticmethod - def is_safe_path(base_path: Path, user_path: Path, allowed_suffixes: Optional[Set[str]] = None) -> bool: + def is_safe_path(base_path: Path, user_path: Path, + allowed_suffixes: Optional[Union[Set[str], List[str]]] = None) -> bool: """ 验证用户提供的路径是否在基准目录内,并检查文件类型是否合法,防止目录遍历攻击 @@ -24,13 +29,85 @@ class SecurityUtils: if base_path_resolved != user_path_resolved and base_path_resolved not in user_path_resolved.parents: return False - # 如果指定了 allowed_suffixes,进一步检查文件后缀 - if allowed_suffixes and user_path.is_file() and user_path.suffix not in allowed_suffixes: - return False + if allowed_suffixes is not None: + allowed_suffixes = set(allowed_suffixes) + if user_path.suffix.lower() not in allowed_suffixes: + return False - # 所有检查通过 return True except Exception as e: - # 捕获并记录路径解析时的异常 - print(f"Error occurred while resolving paths: {e}") + logger.debug(f"Error occurred while validating paths: {e}") return False + + @staticmethod + def is_safe_url(url: str, allowed_domains: Union[Set[str], List[str]], strict: bool = True) -> bool: + """ + 验证URL是否在允许的域名列表中,包括带有端口的域名。 + + :param url: 需要验证的 URL + :param allowed_domains: 允许的域名集合,域名可以包含端口 + :param strict: 是否严格匹配一级域名(默认为 False,允许多级域名) + :return: 如果URL合法且在允许的域名列表中,返回 True;否则返回 False + """ + try: + # 解析URL + parsed_url = urlparse(url) + + # 检查URL的scheme和netloc + if not parsed_url.scheme or not parsed_url.netloc: + return False + + # 仅允许 http 或 https 协议 + if parsed_url.scheme not in {"http", "https"}: + return False + + # 获取完整的 netloc(包括 IP 和端口)并转换为小写 + netloc = parsed_url.netloc.lower() + allowed_domains = {d.lower() for d in allowed_domains} + + if not netloc: + return False + + if strict: + # 严格匹配一级域名,要求完全匹配或者子域名精确匹配 + domain_parts = netloc.split(".") + for allowed_domain in allowed_domains: + allowed_parts = allowed_domain.split(".") + if domain_parts[-len(allowed_parts):] == allowed_parts: + return True + else: + # 允许匹配多级域名,或者完全匹配的 netloc(包括 IP:port) + for allowed_domain in allowed_domains: + if netloc == allowed_domain or netloc.endswith(f".{allowed_domain}"): + return True + + return False + except Exception as e: + logger.debug(f"Error occurred while validating URL: {e}") + return False + + @staticmethod + def sanitize_url_path(url: str, max_length: int = 120) -> str: + """ + 将 URL 的路径部分进行编码,确保合法字符,并对路径长度进行压缩处理(如果超出最大长度) + + :param url: 需要处理的 URL + :param max_length: 路径允许的最大长度,超出时进行压缩 + :return: 处理后的路径字符串 + """ + # 解析 URL,获取路径部分 + parsed_url = urlparse(url) + path = parsed_url.path.lstrip("/") + + # 对路径中的特殊字符进行编码 + safe_path = quote(path) + + # 如果路径过长,进行压缩处理 + if len(safe_path) > max_length: + # 使用 SHA-256 对路径进行哈希,取前 16 位作为压缩后的路径 + hash_value = sha256(safe_path.encode()).hexdigest()[:16] + # 使用哈希值代替过长的路径,同时保留文件扩展名 + file_extension = Path(safe_path).suffix.lower() if Path(safe_path).suffix else "" + safe_path = f"compressed_{hash_value}{file_extension}" + + return safe_path diff --git a/app/utils/url.py b/app/utils/url.py index 39630ce7..15bac9e2 100644 --- a/app/utils/url.py +++ b/app/utils/url.py @@ -1,5 +1,7 @@ -from typing import Optional -from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse +import mimetypes +from pathlib import Path +from typing import Optional, Union +from urllib.parse import parse_qs, urlencode, urljoin, urlparse, urlunparse from app.log import logger @@ -69,3 +71,27 @@ class UrlUtils: except Exception as e: logger.debug(f"Error combining URL: {e}") return None + + @staticmethod + def get_mime_type(path_or_url: Union[str, Path], default_type: str = "application/octet-stream") -> str: + """ + 根据文件路径或 URL 获取 MIME 类型,如果无法获取则返回默认类型 + + :param path_or_url: 文件路径 (Path) 或 URL (str) + :param default_type: 无法获取类型时返回的默认 MIME 类型 + :return: 获取到的 MIME 类型或默认类型 + """ + try: + # 如果是 Path 类型,转换为字符串 + if isinstance(path_or_url, Path): + path_or_url = str(path_or_url) + + # 尝试根据路径或 URL 获取 MIME 类型 + mime_type, _ = mimetypes.guess_type(path_or_url) + # 如果无法推测到类型,返回默认类型 + if not mime_type: + return default_type + return mime_type + except Exception as e: + logger.debug(f"Error get_mime_type: {e}") + return default_type