diff --git a/app/api/endpoints/douban.py b/app/api/endpoints/douban.py index dcaaf7d2..3de5ad8d 100644 --- a/app/api/endpoints/douban.py +++ b/app/api/endpoints/douban.py @@ -1,55 +1,16 @@ -from typing import List, Any +from typing import Any, List -import requests -from fastapi import APIRouter, Depends, Response +from fastapi import APIRouter, Depends from app import schemas from app.chain.douban import DoubanChain -from app.core.config import settings from app.core.context import MediaInfo -from app.core.security import verify_token, verify_resource_token +from app.core.security import verify_token from app.schemas import MediaType -from app.utils.http import RequestUtils router = APIRouter() -@router.get("/img", summary="豆瓣图片代理") -def douban_img(imgurl: str, _: schemas.TokenPayload = Depends(verify_resource_token)) -> Any: - """ - 豆瓣图片代理 - """ - - def __download_image(url: str) -> requests.Response: - return RequestUtils(headers={ - 'Referer': "https://movie.douban.com/" - }, ua=settings.USER_AGENT).get_res(url=url) - - if not imgurl: - return None - if settings.GLOBAL_IMAGE_CACHE: - # 获取Url中除域名外的路径 - url_path = "/".join(imgurl.split('/')[3:]) - # 生成缓存文件路径 - cache_path = settings.CACHE_PATH / 'images' / url_path - # 如果缓存文件不存在,下载图片并保存 - if not cache_path.exists(): - response = __download_image(imgurl) - if response: - if not cache_path.parent.exists(): - cache_path.parent.mkdir(parents=True) - with open(cache_path, 'wb') as f: - f.write(response.content) - return Response(content=response.content, media_type="image/jpeg") - else: - return Response(content=cache_path.read_bytes(), media_type="image/jpeg") - else: - response = __download_image(imgurl) - if response: - return Response(content=response.content, media_type="image/jpeg") - return None - - @router.get("/person/{person_id}", summary="人物详情", response_model=schemas.MediaPerson) def douban_person(person_id: int, _: schemas.TokenPayload = Depends(verify_token)) -> Any: diff --git a/app/api/endpoints/system.py b/app/api/endpoints/system.py index cfb439b5..6cdbbd08 100644 --- a/app/api/endpoints/system.py +++ b/app/api/endpoints/system.py @@ -1,30 +1,38 @@ +import io import json +import tempfile import time from datetime import datetime -from typing import Union, Any +from pathlib import Path +from typing import Any, Union import tailer -from fastapi import APIRouter, Depends, Response +from PIL import Image +from fastapi import APIRouter, Depends, HTTPException, Response from fastapi.responses import StreamingResponse from app import schemas from app.chain.search import SearchChain from app.chain.system import SystemChain -from app.core.config import settings, global_vars +from app.core.config import global_vars, settings from app.core.module import ModuleManager -from app.core.security import verify_token, verify_apitoken, verify_resource_token +from app.core.security import verify_apitoken, verify_resource_token, verify_token from app.db.models import User from app.db.systemconfig_oper import SystemConfigOper from app.db.user_oper import get_current_active_superuser +from app.helper.mediaserver import MediaServerHelper from app.helper.message import MessageHelper from app.helper.progress import ProgressHelper from app.helper.rule import RuleHelper from app.helper.sites import SitesHelper +from app.log import logger from app.monitor import Monitor from app.scheduler import Scheduler from app.schemas.types import SystemConfigKey from app.utils.http import RequestUtils +from app.utils.security import SecurityUtils from app.utils.system import SystemUtils +from app.utils.url import UrlUtils from version import APP_VERSION router = APIRouter() @@ -37,14 +45,36 @@ def proxy_img(imgurl: str, proxy: bool = False, 图片代理,可选是否使用代理服务器 """ if not imgurl: - return None - if proxy: - response = RequestUtils(ua=settings.USER_AGENT, proxies=settings.PROXY).get_res(url=imgurl) - else: - response = RequestUtils(ua=settings.USER_AGENT).get_res(url=imgurl) - if response: - return Response(content=response.content, media_type="image/jpeg") - return None + raise HTTPException(status_code=404, detail="Not Found") + + # 媒体服务器添加图片代理支持 + hosts = [config.config.get("host") for config in MediaServerHelper().get_configs().values() if + config and config.config and config.config.get("host")] + allowed_domains = set(settings.SECURITY_IMAGE_DOMAINS) | set(hosts) + + # 验证URL安全性 + if not SecurityUtils.is_safe_url(imgurl, allowed_domains, strict=True): + raise HTTPException(status_code=404, detail="Not Found") + + referer = "https://movie.douban.com/" if "doubanio.com" in imgurl else None + proxies = settings.PROXY if proxy else None + mime_type = "image/jpeg" + + response = RequestUtils(ua=settings.USER_AGENT, proxies=proxies, referer=referer).get_res(url=imgurl) + if not response: + logger.debug(f"Failed to fetch image from URL: {imgurl}") + raise HTTPException(status_code=502, detail="Failed to fetch the image from the remote server.") + + # 验证下载的内容是否为有效图片 + try: + Image.open(io.BytesIO(response.content)).verify() + except Exception as e: + logger.debug(f"Invalid image format for URL {imgurl}: {e}") + raise HTTPException(status_code=502, detail="Invalid image format.") + + # 获取 MIME 类型 + mime_type = response.headers.get("Content-Type") or UrlUtils.get_mime_type(imgurl, mime_type) + return Response(content=response.content, media_type=mime_type) @router.get("/cache/image", summary="图片缓存") @@ -52,27 +82,63 @@ def cache_img(url: str, _: schemas.TokenPayload = Depends(verify_resource_token) """ 本地缓存图片文件 """ - # 获取Url中除域名外的路径 - url_path = "/".join(url.split('/')[3:]) - # 生成缓存文件路径 - cache_path = settings.CACHE_PATH / 'images' / url_path - # 豆瓣设置Referer - referer = None - if 'doubanio.com' in url: - referer = "https://movie.douban.com/" - # 如果缓存文件不存在,下载图片并保存 - if not cache_path.exists(): - response = RequestUtils(ua=settings.USER_AGENT, referer=referer).get_res(url=url) - if response: - if not cache_path.parent.exists(): - cache_path.parent.mkdir(parents=True) - with open(cache_path, 'wb') as f: - f.write(response.content) - return Response(content=response.content, media_type="image/jpeg") - else: - return None - else: - return Response(content=cache_path.read_bytes(), media_type="image/jpeg") + # 如果没有启用全局图片缓存,则默认使用图片代理的方案 + if not settings.GLOBAL_IMAGE_CACHE: + return proxy_img(imgurl=url) + + if not url: + raise HTTPException(status_code=404, detail="Not Found") + + # 验证URL安全性 + if not SecurityUtils.is_safe_url(url, settings.SECURITY_IMAGE_DOMAINS): + raise HTTPException(status_code=404, detail="Not Found") + + # 生成缓存路径 + url_path = SecurityUtils.sanitize_url_path(url) + cache_path = settings.CACHE_PATH / "images" / url_path + + # 确保缓存路径和文件类型合法 + if not SecurityUtils.is_safe_path(settings.CACHE_PATH, cache_path, settings.SECURITY_IMAGE_SUFFIXES): + raise HTTPException(status_code=404, detail="Not Found") + + referer = "https://movie.douban.com/" if "doubanio.com" in url else None + mime_type = "image/jpeg" + + # 如果缓存文件已存在,直接读取并返回 + if cache_path.exists(): + try: + content = cache_path.read_bytes() + return Response(content=content, media_type=UrlUtils.get_mime_type(cache_path, mime_type)) + except Exception as e: + logger.debug(f"Failed to read cache file {cache_path}: {e}") + raise HTTPException(status_code=400, detail="Internal Server Error") + + # 请求远程图片 + response = RequestUtils(ua=settings.USER_AGENT, referer=referer).get_res(url=url) + if not response: + raise HTTPException(status_code=502, detail="Failed to fetch the image from the remote server") + + # 验证下载的内容是否为有效图片 + try: + Image.open(io.BytesIO(response.content)).verify() + except Exception as e: + logger.debug(f"Invalid image format for URL {url}: {e}") + raise HTTPException(status_code=502, detail="Invalid image format") + + # 创建父目录并保存图片 + if not cache_path.parent.exists(): + cache_path.parent.mkdir(parents=True, exist_ok=True) + + try: + with tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file: + tmp_file.write(response.content) + temp_path = Path(tmp_file.name) + temp_path.rename(cache_path) + except Exception as e: + logger.debug(f"Failed to write cache file {cache_path}: {e}") + + media_type = response.headers.get("Content-Type") or UrlUtils.get_mime_type(url, mime_type) + return Response(content=response.content, media_type=media_type) @router.get("/global", summary="查询非敏感系统设置", response_model=schemas.Response) @@ -214,6 +280,12 @@ def get_logging(length: int = 50, logfile: str = "moviepilot.log", """ log_path = settings.LOG_PATH / logfile + if not SecurityUtils.is_safe_path(settings.LOG_PATH, log_path, allowed_suffixes={".log"}): + raise HTTPException(status_code=404, detail="Not Found") + + if not log_path.exists() or not log_path.is_file(): + raise HTTPException(status_code=404, detail="Not Found") + def log_generator(): # 读取文件末尾50行,不使用tailer模块 with open(log_path, 'r', encoding='utf-8') as f: diff --git a/app/core/config.py b/app/core/config.py index f3969ac8..445e71e5 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -4,10 +4,10 @@ import secrets import sys import threading from pathlib import Path -from typing import Optional, List, Any, Type, Tuple, Dict +from typing import Any, Dict, List, Optional, Tuple, Type from dotenv import set_key -from pydantic import BaseSettings, validator, BaseModel +from pydantic import BaseModel, BaseSettings, validator from app.log import logger from app.utils.system import SystemUtils @@ -197,6 +197,11 @@ class ConfigModel(BaseModel): BIG_MEMORY_MODE: bool = False # 全局图片缓存,将媒体图片缓存到本地 GLOBAL_IMAGE_CACHE: bool = False + # 允许的图片缓存域名 + SECURITY_IMAGE_DOMAINS: List[str] = ["image.tmdb.org", "static-mdb.v.geilijiasu.com", "doubanio.com", "lain.bgm.tv", + "raw.githubusercontent.com", "github.com"] + # 允许的图片文件后缀格式 + SECURITY_IMAGE_SUFFIXES: List[str] = [".jpg", ".jpeg", ".png", ".webp", ".gif", ".svg"] class Settings(BaseSettings, ConfigModel): diff --git a/app/core/security.py b/app/core/security.py index db73a306..c802c11b 100644 --- a/app/core/security.py +++ b/app/core/security.py @@ -130,8 +130,8 @@ def __set_or_refresh_resource_token_cookie(request: Request, response: Response, key=settings.PROJECT_NAME, value=resource_token, httponly=True, - secure=request.url.scheme == "https", - samesite="strict" + secure=request.url.scheme == "https", # 根据当前请求的协议设置 secure 属性 + samesite="lax" # 不同浏览器对 "Strict" 的处理可能不同,设置 SameSite 为 "Lax",以平衡安全性和兼容性 ) diff --git a/app/modules/plex/plex.py b/app/modules/plex/plex.py index d031c9d5..ee9c5adf 100644 --- a/app/modules/plex/plex.py +++ b/app/modules/plex/plex.py @@ -291,15 +291,15 @@ class Plex: query = {"X-Plex-Token": self._token} if image_type == "Poster": if item.thumb: - image_url = RequestUtils.combine_url(host=self._playhost, path=item.thumb, query=query) + image_url = UrlUtils.combine_url(host=self._playhost, path=item.thumb, query=query) else: # 默认使用art也就是Backdrop进行处理 if item.art: - image_url = RequestUtils.combine_url(host=self._playhost, path=item.art, query=query) + image_url = UrlUtils.combine_url(host=self._playhost, path=item.art, query=query) # 这里对episode进行特殊处理,实际上episode的Backdrop是Poster # 也有个别情况,比如机智的凡人小子episode就是Poster,因此这里把episode的优先级降低,默认还是取art if not image_url and item.TYPE == "episode" and item.thumb: - image_url = RequestUtils.combine_url(host=self._playhost, path=item.thumb, query=query) + image_url = UrlUtils.combine_url(host=self._playhost, path=item.thumb, query=query) else: if image_type == "Poster": images = self._plex.fetchItems(ekey=f"{ekey}/posters", @@ -825,7 +825,7 @@ class Plex: if not self._session: return try: - url = RequestUtils.adapt_request_url(host=self._host, endpoint=endpoint) + url = UrlUtils.adapt_request_url(host=self._host, endpoint=endpoint) kwargs.setdefault("headers", self.__get_request_headers()) kwargs.setdefault("raise_exception", True) request_method = getattr(RequestUtils(session=self._session), f"{method}_res", None) diff --git a/app/utils/http.py b/app/utils/http.py index b618c7ec..62859daa 100644 --- a/app/utils/http.py +++ b/app/utils/http.py @@ -1,13 +1,11 @@ -from typing import Union, Any, Optional -from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse +from typing import Any, Optional, Union import requests import urllib3 -from requests import Session, Response +from requests import Response, Session from urllib3.exceptions import InsecureRequestWarning from app.log import logger -from app.utils.url import UrlUtils urllib3.disable_warnings(InsecureRequestWarning) @@ -225,68 +223,4 @@ class RequestUtils: cookie_dict[cstr[0].strip()] = cstr[1].strip() if array: return [{"name": k, "value": v} for k, v in cookie_dict.items()] - return cookie_dict - - @staticmethod - def standardize_base_url(host: str) -> str: - """ - 标准化提供的主机地址,确保它以http://或https://开头,并且以斜杠(/)结尾 - :param host: 提供的主机地址字符串 - :return: 标准化后的主机地址字符串 - """ - if not host: - return host - if not host.endswith("/"): - host += "/" - if not host.startswith("http://") and not host.startswith("https://"): - host = "http://" + host - return host - - @staticmethod - def adapt_request_url(host: str, endpoint: str) -> Optional[str]: - """ - 基于传入的host,适配请求的URL,确保每个请求的URL是完整的,用于在发送请求前自动处理和修正请求的URL。 - :param host: 主机头 - :param endpoint: 端点 - :return: 完整的请求URL字符串 - """ - if not host and not endpoint: - return None - if endpoint.startswith(("http://", "https://")): - return endpoint - host = UrlUtils.standardize_base_url(host) - return urljoin(host, endpoint) if host else endpoint - - @staticmethod - def combine_url(host: str, path: Optional[str] = None, query: Optional[dict] = None) -> Optional[str]: - """ - 使用给定的主机头、路径和查询参数组合生成完整的URL。 - :param host: str, 主机头,例如 https://example.com - :param path: Optional[str], 包含路径和可能已经包含的查询参数的端点,例如 /path/to/resource?current=1 - :param query: Optional[dict], 可选,额外的查询参数,例如 {"key": "value"} - :return: str, 完整的请求URL字符串 - """ - try: - # 如果路径为空,则默认为 '/' - if path is None: - path = '/' - host = UrlUtils.standardize_base_url(host) - # 使用 urljoin 合并 host 和 path - url = urljoin(host, path) - # 解析当前 URL 的组成部分 - url_parts = urlparse(url) - # 解析已存在的查询参数,并与额外的查询参数合并 - query_params = parse_qs(url_parts.query) - if query: - for key, value in query.items(): - query_params[key] = value - - # 重新构建查询字符串 - query_string = urlencode(query_params, doseq=True) - # 构建完整的 URL - new_url_parts = url_parts._replace(query=query_string) - complete_url = urlunparse(new_url_parts) - return str(complete_url) - except Exception as e: - logger.debug(f"Error combining URL: {e}") - return None + return cookie_dict \ No newline at end of file diff --git a/app/utils/security.py b/app/utils/security.py new file mode 100644 index 00000000..098f5949 --- /dev/null +++ b/app/utils/security.py @@ -0,0 +1,113 @@ +from hashlib import sha256 +from pathlib import Path +from typing import List, Optional, Set, Union +from urllib.parse import quote, urlparse + +from app.log import logger + + +class SecurityUtils: + + @staticmethod + def is_safe_path(base_path: Path, user_path: Path, + allowed_suffixes: Optional[Union[Set[str], List[str]]] = None) -> bool: + """ + 验证用户提供的路径是否在基准目录内,并检查文件类型是否合法,防止目录遍历攻击 + + :param base_path: 基准目录,允许访问的根目录 + :param user_path: 用户提供的路径,需检查其是否位于基准目录内 + :param allowed_suffixes: 允许的文件后缀名集合,用于验证文件类型 + :return: 如果用户路径安全且位于基准目录内,且文件类型合法,返回 True;否则返回 False + :raises Exception: 如果解析路径时发生错误,则捕获并记录异常 + """ + try: + # resolve() 将相对路径转换为绝对路径,并处理符号链接和'..' + base_path_resolved = base_path.resolve() + user_path_resolved = user_path.resolve() + + # 检查用户路径是否在基准目录或基准目录的子目录内 + if base_path_resolved != user_path_resolved and base_path_resolved not in user_path_resolved.parents: + return False + + if allowed_suffixes is not None: + allowed_suffixes = set(allowed_suffixes) + if user_path.suffix.lower() not in allowed_suffixes: + return False + + return True + except Exception as e: + logger.debug(f"Error occurred while validating paths: {e}") + return False + + @staticmethod + def is_safe_url(url: str, allowed_domains: Union[Set[str], List[str]], strict: bool = True) -> bool: + """ + 验证URL是否在允许的域名列表中,包括带有端口的域名。 + + :param url: 需要验证的 URL + :param allowed_domains: 允许的域名集合,域名可以包含端口 + :param strict: 是否严格匹配一级域名(默认为 False,允许多级域名) + :return: 如果URL合法且在允许的域名列表中,返回 True;否则返回 False + """ + try: + # 解析URL + parsed_url = urlparse(url) + + # 检查URL的scheme和netloc + if not parsed_url.scheme or not parsed_url.netloc: + return False + + # 仅允许 http 或 https 协议 + if parsed_url.scheme not in {"http", "https"}: + return False + + # 获取完整的 netloc(包括 IP 和端口)并转换为小写 + netloc = parsed_url.netloc.lower() + allowed_domains = {d.lower() for d in allowed_domains} + + if not netloc: + return False + + if strict: + # 严格匹配一级域名,要求完全匹配或者子域名精确匹配 + domain_parts = netloc.split(".") + for allowed_domain in allowed_domains: + allowed_parts = allowed_domain.split(".") + if domain_parts[-len(allowed_parts):] == allowed_parts: + return True + else: + # 允许匹配多级域名,或者完全匹配的 netloc(包括 IP:port) + for allowed_domain in allowed_domains: + if netloc == allowed_domain or netloc.endswith(f".{allowed_domain}"): + return True + + return False + except Exception as e: + logger.debug(f"Error occurred while validating URL: {e}") + return False + + @staticmethod + def sanitize_url_path(url: str, max_length: int = 120) -> str: + """ + 将 URL 的路径部分进行编码,确保合法字符,并对路径长度进行压缩处理(如果超出最大长度) + + :param url: 需要处理的 URL + :param max_length: 路径允许的最大长度,超出时进行压缩 + :return: 处理后的路径字符串 + """ + # 解析 URL,获取路径部分 + parsed_url = urlparse(url) + path = parsed_url.path.lstrip("/") + + # 对路径中的特殊字符进行编码 + safe_path = quote(path) + + # 如果路径过长,进行压缩处理 + if len(safe_path) > max_length: + # 使用 SHA-256 对路径进行哈希,取前 16 位作为压缩后的路径 + hash_value = sha256(safe_path.encode()).hexdigest()[:16] + # 使用哈希值代替过长的路径,同时保留文件扩展名 + file_extension = Path(safe_path).suffix.lower() if Path(safe_path).suffix else "" + safe_path = f"compressed_{hash_value}{file_extension}" + + return safe_path diff --git a/app/utils/url.py b/app/utils/url.py index 39630ce7..15bac9e2 100644 --- a/app/utils/url.py +++ b/app/utils/url.py @@ -1,5 +1,7 @@ -from typing import Optional -from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse +import mimetypes +from pathlib import Path +from typing import Optional, Union +from urllib.parse import parse_qs, urlencode, urljoin, urlparse, urlunparse from app.log import logger @@ -69,3 +71,27 @@ class UrlUtils: except Exception as e: logger.debug(f"Error combining URL: {e}") return None + + @staticmethod + def get_mime_type(path_or_url: Union[str, Path], default_type: str = "application/octet-stream") -> str: + """ + 根据文件路径或 URL 获取 MIME 类型,如果无法获取则返回默认类型 + + :param path_or_url: 文件路径 (Path) 或 URL (str) + :param default_type: 无法获取类型时返回的默认 MIME 类型 + :return: 获取到的 MIME 类型或默认类型 + """ + try: + # 如果是 Path 类型,转换为字符串 + if isinstance(path_or_url, Path): + path_or_url = str(path_or_url) + + # 尝试根据路径或 URL 获取 MIME 类型 + mime_type, _ = mimetypes.guess_type(path_or_url) + # 如果无法推测到类型,返回默认类型 + if not mime_type: + return default_type + return mime_type + except Exception as e: + logger.debug(f"Error get_mime_type: {e}") + return default_type