feat(security): enhance image URL and domain validation

This commit is contained in:
InfinityPacer
2024-10-14 01:33:07 +08:00
parent efb624259a
commit 422474b4b7
5 changed files with 218 additions and 83 deletions

View File

@@ -1,55 +1,16 @@
from typing import List, Any
from typing import Any, List
import requests
from fastapi import APIRouter, Depends, Response
from fastapi import APIRouter, Depends
from app import schemas
from app.chain.douban import DoubanChain
from app.core.config import settings
from app.core.context import MediaInfo
from app.core.security import verify_token, verify_resource_token
from app.core.security import verify_token
from app.schemas import MediaType
from app.utils.http import RequestUtils
router = APIRouter()
@router.get("/img", summary="豆瓣图片代理")
def douban_img(imgurl: str, _: schemas.TokenPayload = Depends(verify_resource_token)) -> Any:
"""
豆瓣图片代理
"""
def __download_image(url: str) -> requests.Response:
return RequestUtils(headers={
'Referer': "https://movie.douban.com/"
}, ua=settings.USER_AGENT).get_res(url=url)
if not imgurl:
return None
if settings.GLOBAL_IMAGE_CACHE:
# 获取Url中除域名外的路径
url_path = "/".join(imgurl.split('/')[3:])
# 生成缓存文件路径
cache_path = settings.CACHE_PATH / 'images' / url_path
# 如果缓存文件不存在,下载图片并保存
if not cache_path.exists():
response = __download_image(imgurl)
if response:
if not cache_path.parent.exists():
cache_path.parent.mkdir(parents=True)
with open(cache_path, 'wb') as f:
f.write(response.content)
return Response(content=response.content, media_type="image/jpeg")
else:
return Response(content=cache_path.read_bytes(), media_type="image/jpeg")
else:
response = __download_image(imgurl)
if response:
return Response(content=response.content, media_type="image/jpeg")
return None
@router.get("/person/{person_id}", summary="人物详情", response_model=schemas.MediaPerson)
def douban_person(person_id: int,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:

View File

@@ -1,9 +1,13 @@
import io
import json
import tempfile
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Union
import tailer
from PIL import Image
from fastapi import APIRouter, Depends, HTTPException, Response
from fastapi.responses import StreamingResponse
@@ -16,16 +20,19 @@ from app.core.security import verify_apitoken, verify_resource_token, verify_tok
from app.db.models import User
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_superuser
from app.helper.mediaserver import MediaServerHelper
from app.helper.message import MessageHelper
from app.helper.progress import ProgressHelper
from app.helper.rule import RuleHelper
from app.helper.sites import SitesHelper
from app.log import logger
from app.monitor import Monitor
from app.scheduler import Scheduler
from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils
from app.utils.security import SecurityUtils
from app.utils.system import SystemUtils
from app.utils.url import UrlUtils
from version import APP_VERSION
router = APIRouter()
@@ -38,14 +45,36 @@ def proxy_img(imgurl: str, proxy: bool = False,
图片代理,可选是否使用代理服务器
"""
if not imgurl:
return None
if proxy:
response = RequestUtils(ua=settings.USER_AGENT, proxies=settings.PROXY).get_res(url=imgurl)
else:
response = RequestUtils(ua=settings.USER_AGENT).get_res(url=imgurl)
if response:
return Response(content=response.content, media_type="image/jpeg")
return None
raise HTTPException(status_code=404, detail="Not Found")
# 媒体服务器添加图片代理支持
hosts = [config.config.get("host") for config in MediaServerHelper().get_configs().values() if
config and config.config and config.config.get("host")]
allowed_domains = set(settings.SECURITY_IMAGE_DOMAINS) | set(hosts)
# 验证URL安全性
if not SecurityUtils.is_safe_url(imgurl, allowed_domains, strict=True):
raise HTTPException(status_code=404, detail="Not Found")
referer = "https://movie.douban.com/" if "doubanio.com" in imgurl else None
proxies = settings.PROXY if proxy else None
mime_type = "image/jpeg"
response = RequestUtils(ua=settings.USER_AGENT, proxies=proxies, referer=referer).get_res(url=imgurl)
if not response:
logger.debug(f"Failed to fetch image from URL: {imgurl}")
raise HTTPException(status_code=502, detail="Failed to fetch the image from the remote server.")
# 验证下载的内容是否为有效图片
try:
Image.open(io.BytesIO(response.content)).verify()
except Exception as e:
logger.debug(f"Invalid image format for URL {imgurl}: {e}")
raise HTTPException(status_code=400, detail="Invalid image format.")
# 获取 MIME 类型
mime_type = response.headers.get("Content-Type") or UrlUtils.get_mime_type(imgurl, mime_type)
return Response(content=response.content, media_type=mime_type)
@router.get("/cache/image", summary="图片缓存")
@@ -53,27 +82,64 @@ def cache_img(url: str, _: schemas.TokenPayload = Depends(verify_resource_token)
"""
本地缓存图片文件
"""
# 获取Url中除域名外的路径
url_path = "/".join(url.split('/')[3:])
# 生成缓存文件路径
cache_path = settings.CACHE_PATH / 'images' / url_path
# 豆瓣设置Referer
referer = None
if 'doubanio.com' in url:
referer = "https://movie.douban.com/"
# 如果缓存文件不存在,下载图片并保存
if not cache_path.exists():
response = RequestUtils(ua=settings.USER_AGENT, referer=referer).get_res(url=url)
if response:
if not cache_path.parent.exists():
cache_path.parent.mkdir(parents=True)
with open(cache_path, 'wb') as f:
f.write(response.content)
return Response(content=response.content, media_type="image/jpeg")
else:
return None
else:
return Response(content=cache_path.read_bytes(), media_type="image/jpeg")
# 如果没有启用全局图片缓存,则默认使用图片代理的方案
if not settings.GLOBAL_IMAGE_CACHE:
return proxy_img(imgurl=url)
if not url:
raise HTTPException(status_code=404, detail="Not Found")
# 验证URL安全性
if not SecurityUtils.is_safe_url(url, settings.SECURITY_IMAGE_DOMAINS):
raise HTTPException(status_code=404, detail="Not Found")
# 生成缓存路径
url_path = SecurityUtils.sanitize_url_path(url)
cache_path = settings.CACHE_PATH / "images" / url_path
# 确保缓存路径和文件类型合法
if not SecurityUtils.is_safe_path(settings.CACHE_PATH, cache_path, settings.SECURITY_IMAGE_SUFFIXES):
raise HTTPException(status_code=404, detail="Not Found")
referer = "https://movie.douban.com/" if "doubanio.com" in url else None
mime_type = "image/jpeg"
# 如果缓存文件已存在,直接读取并返回
if cache_path.exists():
try:
content = cache_path.read_bytes()
return Response(content=content, media_type=UrlUtils.get_mime_type(cache_path, mime_type))
except Exception as e:
logger.debug(f"Failed to read cache file {cache_path}: {e}")
raise HTTPException(status_code=400, detail="Internal Server Error")
# 请求远程图片
response = RequestUtils(ua=settings.USER_AGENT, referer=referer).get_res(url=url)
if not response:
raise HTTPException(status_code=502, detail="Failed to fetch the image from the remote server")
# 创建父目录并保存图片
if not cache_path.parent.exists():
cache_path.parent.mkdir(parents=True, exist_ok=True)
try:
with tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file:
tmp_file.write(response.content)
temp_path = Path(tmp_file.name)
temp_path.rename(cache_path)
except Exception as e:
logger.debug(f"Failed to write cache file {cache_path}: {e}")
raise HTTPException(status_code=400, detail="Internal Server Error")
# 验证下载的内容是否为有效图片
try:
Image.open(io.BytesIO(response.content)).verify()
except Exception as e:
logger.debug(f"Invalid image format for URL {url}: {e}")
raise HTTPException(status_code=400, detail="Invalid image format")
media_type = response.headers.get("Content-Type") or UrlUtils.get_mime_type(url, mime_type)
return Response(content=response.content, media_type=media_type)
@router.get("/global", summary="查询非敏感系统设置", response_model=schemas.Response)

View File

@@ -4,10 +4,10 @@ import secrets
import sys
import threading
from pathlib import Path
from typing import Optional, List, Any, Type, Tuple, Dict
from typing import Any, Dict, List, Optional, Tuple, Type
from dotenv import set_key
from pydantic import BaseSettings, validator, BaseModel
from pydantic import BaseModel, BaseSettings, validator
from app.log import logger
from app.utils.system import SystemUtils
@@ -197,6 +197,11 @@ class ConfigModel(BaseModel):
BIG_MEMORY_MODE: bool = False
# 全局图片缓存,将媒体图片缓存到本地
GLOBAL_IMAGE_CACHE: bool = False
# 允许的图片缓存域名
SECURITY_IMAGE_DOMAINS: List[str] = ["image.tmdb.org", "static-mdb.v.geilijiasu.com", "doubanio.com", "lain.bgm.tv",
"raw.githubusercontent.com", "github.com"]
# 允许的图片文件后缀格式
SECURITY_IMAGE_SUFFIXES: List[str] = [".jpg", ".jpeg", ".png", ".webp", ".gif", ".svg"]
class Settings(BaseSettings, ConfigModel):

View File

@@ -1,11 +1,16 @@
from hashlib import sha256
from pathlib import Path
from typing import Optional, Set
from typing import List, Optional, Set, Union
from urllib.parse import quote, urlparse
from app.log import logger
class SecurityUtils:
@staticmethod
def is_safe_path(base_path: Path, user_path: Path, allowed_suffixes: Optional[Set[str]] = None) -> bool:
def is_safe_path(base_path: Path, user_path: Path,
allowed_suffixes: Optional[Union[Set[str], List[str]]] = None) -> bool:
"""
验证用户提供的路径是否在基准目录内,并检查文件类型是否合法,防止目录遍历攻击
@@ -24,13 +29,85 @@ class SecurityUtils:
if base_path_resolved != user_path_resolved and base_path_resolved not in user_path_resolved.parents:
return False
# 如果指定了 allowed_suffixes,进一步检查文件后缀
if allowed_suffixes and user_path.is_file() and user_path.suffix not in allowed_suffixes:
return False
if allowed_suffixes is not None:
allowed_suffixes = set(allowed_suffixes)
if user_path.suffix.lower() not in allowed_suffixes:
return False
# 所有检查通过
return True
except Exception as e:
# 捕获并记录路径解析时的异常
print(f"Error occurred while resolving paths: {e}")
logger.debug(f"Error occurred while validating paths: {e}")
return False
@staticmethod
def is_safe_url(url: str, allowed_domains: Union[Set[str], List[str]], strict: bool = True) -> bool:
"""
验证URL是否在允许的域名列表中包括带有端口的域名。
:param url: 需要验证的 URL
:param allowed_domains: 允许的域名集合,域名可以包含端口
:param strict: 是否严格匹配一级域名(默认为 False允许多级域名
:return: 如果URL合法且在允许的域名列表中返回 True否则返回 False
"""
try:
# 解析URL
parsed_url = urlparse(url)
# 检查URL的scheme和netloc
if not parsed_url.scheme or not parsed_url.netloc:
return False
# 仅允许 http 或 https 协议
if parsed_url.scheme not in {"http", "https"}:
return False
# 获取完整的 netloc包括 IP 和端口)并转换为小写
netloc = parsed_url.netloc.lower()
allowed_domains = {d.lower() for d in allowed_domains}
if not netloc:
return False
if strict:
# 严格匹配一级域名,要求完全匹配或者子域名精确匹配
domain_parts = netloc.split(".")
for allowed_domain in allowed_domains:
allowed_parts = allowed_domain.split(".")
if domain_parts[-len(allowed_parts):] == allowed_parts:
return True
else:
# 允许匹配多级域名,或者完全匹配的 netloc包括 IP:port
for allowed_domain in allowed_domains:
if netloc == allowed_domain or netloc.endswith(f".{allowed_domain}"):
return True
return False
except Exception as e:
logger.debug(f"Error occurred while validating URL: {e}")
return False
@staticmethod
def sanitize_url_path(url: str, max_length: int = 120) -> str:
"""
将 URL 的路径部分进行编码,确保合法字符,并对路径长度进行压缩处理(如果超出最大长度)
:param url: 需要处理的 URL
:param max_length: 路径允许的最大长度,超出时进行压缩
:return: 处理后的路径字符串
"""
# 解析 URL获取路径部分
parsed_url = urlparse(url)
path = parsed_url.path.lstrip("/")
# 对路径中的特殊字符进行编码
safe_path = quote(path)
# 如果路径过长,进行压缩处理
if len(safe_path) > max_length:
# 使用 SHA-256 对路径进行哈希,取前 16 位作为压缩后的路径
hash_value = sha256(safe_path.encode()).hexdigest()[:16]
# 使用哈希值代替过长的路径,同时保留文件扩展名
file_extension = Path(safe_path).suffix.lower() if Path(safe_path).suffix else ""
safe_path = f"compressed_{hash_value}{file_extension}"
return safe_path

View File

@@ -1,5 +1,7 @@
from typing import Optional
from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
import mimetypes
from pathlib import Path
from typing import Optional, Union
from urllib.parse import parse_qs, urlencode, urljoin, urlparse, urlunparse
from app.log import logger
@@ -69,3 +71,27 @@ class UrlUtils:
except Exception as e:
logger.debug(f"Error combining URL: {e}")
return None
@staticmethod
def get_mime_type(path_or_url: Union[str, Path], default_type: str = "application/octet-stream") -> str:
"""
根据文件路径或 URL 获取 MIME 类型,如果无法获取则返回默认类型
:param path_or_url: 文件路径 (Path) 或 URL (str)
:param default_type: 无法获取类型时返回的默认 MIME 类型
:return: 获取到的 MIME 类型或默认类型
"""
try:
# 如果是 Path 类型,转换为字符串
if isinstance(path_or_url, Path):
path_or_url = str(path_or_url)
# 尝试根据路径或 URL 获取 MIME 类型
mime_type, _ = mimetypes.guess_type(path_or_url)
# 如果无法推测到类型,返回默认类型
if not mime_type:
return default_type
return mime_type
except Exception as e:
logger.debug(f"Error get_mime_type: {e}")
return default_type