diff --git a/app/api/apiv1.py b/app/api/apiv1.py index 6aa5c9ce..8c1c4b8c 100644 --- a/app/api/apiv1.py +++ b/app/api/apiv1.py @@ -1,8 +1,8 @@ from fastapi import APIRouter -from app.api.endpoints import login, user, site, message, webhook, subscribe, \ +from app.api.endpoints import login, user, webhook, message, site, subscribe, \ media, douban, search, plugin, tmdb, history, system, download, dashboard, \ - transfer, mediaserver, bangumi, storage, discover, recommend, workflow, torrent + transfer, mediaserver, bangumi, storage, discover, recommend, workflow, torrent, monitoring api_router = APIRouter() api_router.include_router(login.router, prefix="/login", tags=["login"]) @@ -28,3 +28,4 @@ api_router.include_router(discover.router, prefix="/discover", tags=["discover"] api_router.include_router(recommend.router, prefix="/recommend", tags=["recommend"]) api_router.include_router(workflow.router, prefix="/workflow", tags=["workflow"]) api_router.include_router(torrent.router, prefix="/torrent", tags=["torrent"]) +api_router.include_router(monitoring.router, prefix="/monitoring", tags=["monitoring"]) diff --git a/app/api/endpoints/monitoring.py b/app/api/endpoints/monitoring.py new file mode 100644 index 00000000..bc5cabfc --- /dev/null +++ b/app/api/endpoints/monitoring.py @@ -0,0 +1,409 @@ +from typing import Any, List + +from fastapi import APIRouter, Depends, Query +from fastapi.responses import HTMLResponse + +from app import schemas +from app.core.security import verify_apitoken +from app.monitoring import monitor, get_metrics_response +from app.schemas.monitoring import ( + PerformanceSnapshot, + EndpointStats, + ErrorRequest, + MonitoringOverview +) + +router = APIRouter() + + +@router.get("/overview", summary="获取监控概览", response_model=schemas.MonitoringOverview) +def get_overview(_: str = Depends(verify_apitoken)) -> Any: + """ + 获取完整的监控概览信息 + """ + # 获取性能快照 + performance = monitor.get_performance_snapshot() + + # 获取最活跃端点 + top_endpoints = monitor.get_top_endpoints(limit=10) + + # 获取最近错误 + recent_errors = monitor.get_recent_errors(limit=20) + + # 检查告警 + alerts = monitor.check_alerts() + + return MonitoringOverview( + performance=PerformanceSnapshot( + timestamp=performance.timestamp, + cpu_usage=performance.cpu_usage, + memory_usage=performance.memory_usage, + active_requests=performance.active_requests, + request_rate=performance.request_rate, + avg_response_time=performance.avg_response_time, + error_rate=performance.error_rate, + slow_requests=performance.slow_requests + ), + top_endpoints=[EndpointStats(**endpoint) for endpoint in top_endpoints], + recent_errors=[ErrorRequest(**error) for error in recent_errors], + alerts=alerts + ) + + +@router.get("/performance", summary="获取性能快照", response_model=schemas.PerformanceSnapshot) +def get_performance(_: str = Depends(verify_apitoken)) -> Any: + """ + 获取当前性能快照 + """ + snapshot = monitor.get_performance_snapshot() + return PerformanceSnapshot( + timestamp=snapshot.timestamp, + cpu_usage=snapshot.cpu_usage, + memory_usage=snapshot.memory_usage, + active_requests=snapshot.active_requests, + request_rate=snapshot.request_rate, + avg_response_time=snapshot.avg_response_time, + error_rate=snapshot.error_rate, + slow_requests=snapshot.slow_requests + ) + + +@router.get("/endpoints", summary="获取端点统计", response_model=List[schemas.EndpointStats]) +def get_endpoints( + limit: int = Query(10, ge=1, le=50, description="返回的端点数量"), + _: str = Depends(verify_apitoken) +) -> Any: + """ + 获取最活跃的API端点统计 + """ + endpoints = monitor.get_top_endpoints(limit=limit) + return [EndpointStats(**endpoint) for endpoint in endpoints] + + +@router.get("/errors", summary="获取错误请求", response_model=List[schemas.ErrorRequest]) +def get_errors( + limit: int = Query(20, ge=1, le=100, description="返回的错误数量"), + _: str = Depends(verify_apitoken) +) -> Any: + """ + 获取最近的错误请求记录 + """ + errors = monitor.get_recent_errors(limit=limit) + return [ErrorRequest(**error) for error in errors] + + +@router.get("/alerts", summary="获取告警信息", response_model=List[str]) +def get_alerts(_: str = Depends(verify_apitoken)) -> Any: + """ + 获取当前告警信息 + """ + return monitor.check_alerts() + + +@router.get("/metrics", summary="Prometheus指标") +def get_prometheus_metrics(_: str = Depends(verify_apitoken)) -> Any: + """ + 获取Prometheus格式的监控指标 + """ + return get_metrics_response() + + +@router.get("/dashboard", summary="监控仪表板", response_class=HTMLResponse) +def get_dashboard(_: str = Depends(verify_apitoken)) -> Any: + """ + 获取实时监控仪表板HTML页面 + """ + return HTMLResponse(content=""" + + + + + + MoviePilot 性能监控仪表板 + + + + +
+
+

🎬 MoviePilot 性能监控仪表板

+ +
+ + + +
+
+
--
+
CPU使用率 (%)
+
+
+
--
+
内存使用率 (%)
+
+
+
--
+
活跃请求数
+
+
+
--
+
请求率 (req/min)
+
+
+
--
+
平均响应时间 (s)
+
+
+
--
+
错误率 (%)
+
+
+ +
+

📊 性能趋势

+ +
+ +
+

🔥 最活跃端点

+ +
+
+ + + + + """) diff --git a/app/core/config.py b/app/core/config.py index 2fd87bd2..fc4360a9 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -280,6 +280,8 @@ class ConfigModel(BaseModel): REPO_GITHUB_TOKEN: Optional[str] = None # 大内存模式 BIG_MEMORY_MODE: bool = False + # FastApi性能监控 + PERFORMANCE_MONITOR_ENABLE: bool = False # 全局图片缓存,将媒体图片缓存到本地 GLOBAL_IMAGE_CACHE: bool = False # 是否启用编码探测的性能模式 diff --git a/app/factory.py b/app/factory.py index c4c43c1c..6fec4dac 100644 --- a/app/factory.py +++ b/app/factory.py @@ -2,6 +2,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from app.core.config import settings +from app.monitoring import setup_prometheus_metrics from app.startup.lifecycle import lifespan @@ -24,6 +25,9 @@ def create_app() -> FastAPI: allow_headers=["*"], ) + # 设置性能监控 + setup_prometheus_metrics(_app) + return _app diff --git a/app/monitoring.py b/app/monitoring.py new file mode 100644 index 00000000..b17ee65b --- /dev/null +++ b/app/monitoring.py @@ -0,0 +1,340 @@ +import threading +import time +from collections import defaultdict, deque +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Dict, List, Any + +import psutil +from fastapi import Request, Response +from fastapi.responses import PlainTextResponse +from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST +from prometheus_fastapi_instrumentator import Instrumentator + +from app.core.config import settings +from app.log import logger + + +@dataclass +class RequestMetrics: + """ + 请求指标数据类 + """ + path: str + method: str + status_code: int + response_time: float + timestamp: datetime + client_ip: str + user_agent: str + + +@dataclass +class PerformanceSnapshot: + """ + 性能快照数据类 + """ + timestamp: datetime + cpu_usage: float + memory_usage: float + active_requests: int + request_rate: float + avg_response_time: float + error_rate: float + slow_requests: int + + +class FastAPIMonitor: + """ + FastAPI性能监控器 + """ + + def __init__(self, max_history: int = 1000, window_size: int = 60): + self.max_history = max_history + self.window_size = window_size # 秒 + + # 请求历史记录 + self.request_history: deque = deque(maxlen=max_history) + + # 实时统计 + self.active_requests = 0 + self.total_requests = 0 + self.error_requests = 0 + self.slow_requests = 0 # 响应时间超过1秒的请求 + + # 时间窗口统计 + self.window_requests: deque = deque(maxlen=window_size) + self.window_response_times: deque = deque(maxlen=window_size) + + # 线程锁 + self._lock = threading.Lock() + + # 性能阈值 + self.slow_request_threshold = 1.0 # 1秒 + self.error_threshold = 0.05 # 5% + self.cpu_threshold = 80.0 # 80% + self.memory_threshold = 80.0 # 80% + + # 告警状态 + self.alerts: List[str] = [] + + logger.info("FastAPI性能监控器已初始化") + + def record_request(self, request: Request, response: Response, response_time: float): + """ + 记录请求指标 + """ + with self._lock: + # 创建请求指标 + metrics = RequestMetrics( + path=str(request.url.path), + method=request.method, + status_code=response.status_code, + response_time=response_time, + timestamp=datetime.now(), + client_ip=request.client.host if request.client else "unknown", + user_agent=request.headers.get("user-agent", "unknown") + ) + + # 添加到历史记录 + self.request_history.append(metrics) + + # 更新统计 + self.total_requests += 1 + if response.status_code >= 400: + self.error_requests += 1 + if response_time > self.slow_request_threshold: + self.slow_requests += 1 + + # 添加到时间窗口 + self.window_requests.append(metrics) + self.window_response_times.append(response_time) + + def start_request(self): + """ + 开始处理请求 + """ + with self._lock: + self.active_requests += 1 + + def end_request(self): + """ + 结束处理请求 + """ + with self._lock: + self.active_requests = max(0, self.active_requests - 1) + + def get_performance_snapshot(self) -> PerformanceSnapshot: + """ + 获取性能快照 + """ + with self._lock: + now = datetime.now() + + # 计算请求率(每分钟) + recent_requests = [ + req for req in self.window_requests + if now - req.timestamp < timedelta(seconds=self.window_size) + ] + request_rate = len(recent_requests) / (self.window_size / 60) + + # 计算平均响应时间 + recent_response_times = [ + rt for rt in self.window_response_times + if len(self.window_response_times) > 0 + ] + avg_response_time = sum(recent_response_times) / len(recent_response_times) if recent_response_times else 0 + + # 计算错误率 + error_rate = self.error_requests / self.total_requests if self.total_requests > 0 else 0 + + # 系统资源使用率 + cpu_usage = psutil.cpu_percent(interval=0.1) + memory_usage = psutil.virtual_memory().percent + + return PerformanceSnapshot( + timestamp=now, + cpu_usage=cpu_usage, + memory_usage=memory_usage, + active_requests=self.active_requests, + request_rate=request_rate, + avg_response_time=avg_response_time, + error_rate=error_rate, + slow_requests=self.slow_requests + ) + + def get_top_endpoints(self, limit: int = 10) -> List[Dict[str, Any]]: + """ + 获取最活跃的端点 + """ + with self._lock: + endpoint_stats = defaultdict(lambda: { + 'count': 0, + 'total_time': 0, + 'errors': 0, + 'avg_time': 0 + }) + + for req in self.request_history: + key = f"{req.method} {req.path}" + endpoint_stats[key]['count'] += 1 + endpoint_stats[key]['total_time'] += req.response_time + if req.status_code >= 400: + endpoint_stats[key]['errors'] += 1 + + # 计算平均时间 + for stats in endpoint_stats.values(): + if stats['count'] > 0: + stats['avg_time'] = stats['total_time'] / stats['count'] + + # 按请求数量排序 + sorted_endpoints = sorted( + [{'endpoint': k, **v} for k, v in endpoint_stats.items()], + key=lambda x: x['count'], + reverse=True + ) + + return sorted_endpoints[:limit] + + def get_recent_errors(self, limit: int = 20) -> List[Dict[str, Any]]: + """ + 获取最近的错误请求 + """ + with self._lock: + errors = [ + { + 'timestamp': req.timestamp.isoformat(), + 'method': req.method, + 'path': req.path, + 'status_code': req.status_code, + 'response_time': req.response_time, + 'client_ip': req.client_ip + } + for req in self.request_history + if req.status_code >= 400 + ] + return errors[-limit:] + + def check_alerts(self) -> List[str]: + """ + 检查告警条件 + """ + snapshot = self.get_performance_snapshot() + alerts = [] + + if snapshot.error_rate > self.error_threshold: + alerts.append(f"错误率过高: {snapshot.error_rate:.2%}") + + if snapshot.cpu_usage > self.cpu_threshold: + alerts.append(f"CPU使用率过高: {snapshot.cpu_usage:.1f}%") + + if snapshot.memory_usage > self.memory_threshold: + alerts.append(f"内存使用率过高: {snapshot.memory_usage:.1f}%") + + if snapshot.avg_response_time > self.slow_request_threshold: + alerts.append(f"平均响应时间过长: {snapshot.avg_response_time:.2f}s") + + if snapshot.request_rate > 1000: # 每分钟1000请求 + alerts.append(f"请求率过高: {snapshot.request_rate:.0f} req/min") + + self.alerts = alerts + return alerts + + +# 全局监控实例 +monitor = FastAPIMonitor() + + +def setup_prometheus_metrics(app): + """ + 设置Prometheus指标 + """ + + if not settings.PERFORMANCE_MONITOR_ENABLE: + return + + # 创建Prometheus指标 + request_counter = Counter( + "http_requests_total", + "Total number of HTTP requests", + ["method", "endpoint", "status"] + ) + + request_duration = Histogram( + "http_request_duration_seconds", + "HTTP request duration in seconds", + ["method", "endpoint"] + ) + + active_requests = Gauge( + "http_active_requests", + "Number of active HTTP requests" + ) + + # 自定义指标收集函数 + def custom_metrics(request: Request, response: Response, response_time: float): + request_counter.labels( + method=request.method, + endpoint=request.url.path, + status=response.status_code + ).inc() + + request_duration.labels( + method=request.method, + endpoint=request.url.path + ).observe(response_time) + + active_requests.set(monitor.active_requests) + + # 设置Prometheus监控 + Instrumentator().instrument(app).expose(app, include_in_schema=False, should_gzip=True) + + # 添加自定义指标 + @app.middleware("http") + async def monitor_middleware(request: Request, call_next): + start_time = time.time() + + # 开始请求 + monitor.start_request() + + try: + response = await call_next(request) + response_time = time.time() - start_time + + # 记录请求指标 + monitor.record_request(request, response, response_time) + + # 更新Prometheus指标 + custom_metrics(request, response, response_time) + + return response + except Exception as e: + response_time = time.time() - start_time + logger.error(f"请求处理异常: {e}") + + # 创建错误响应 + response = Response( + content=str(e), + status_code=500, + media_type="text/plain" + ) + + # 记录错误请求 + monitor.record_request(request, response, response_time) + + return response + finally: + # 结束请求 + monitor.end_request() + + logger.info("Prometheus指标监控已设置") + + +def get_metrics_response(): + """ + 获取Prometheus指标响应 + """ + return PlainTextResponse( + generate_latest(), + media_type=CONTENT_TYPE_LATEST + ) diff --git a/app/schemas/__init__.py b/app/schemas/__init__.py index 0a627bbe..69bba4f8 100644 --- a/app/schemas/__init__.py +++ b/app/schemas/__init__.py @@ -1,23 +1,24 @@ -from .token import * -from .user import * -from .response import * -from .site import * -from .subscribe import * from .context import * -from .servarr import * -from .servcookie import * -from .plugin import * -from .history import * from .dashboard import * +from .download import * +from .event import * +from .exception import * +from .file import * +from .history import * from .mediaserver import * from .message import * -from .tmdb import * -from .transfer import * +from .monitoring import * +from .plugin import * +from .response import * from .rule import * +from .servarr import * +from .servcookie import * +from .site import * +from .subscribe import * from .system import * -from .file import * -from .exception import * from .system import * -from .event import * +from .tmdb import * +from .token import * +from .transfer import * +from .user import * from .workflow import * -from .download import * diff --git a/app/schemas/event.py b/app/schemas/event.py index 7fe3fddc..30fd0616 100644 --- a/app/schemas/event.py +++ b/app/schemas/event.py @@ -3,7 +3,8 @@ from typing import Optional, Dict, Any, List, Set, Callable from pydantic import BaseModel, Field, root_validator -from app.schemas import MessageChannel, FileItem +from app.schemas.message import MessageChannel +from app.schemas.file import FileItem class Event(BaseModel): diff --git a/app/schemas/monitoring.py b/app/schemas/monitoring.py new file mode 100644 index 00000000..85fe59f0 --- /dev/null +++ b/app/schemas/monitoring.py @@ -0,0 +1,76 @@ +from datetime import datetime +from typing import List + +from pydantic import BaseModel + + +class RequestMetrics(BaseModel): + """ + 请求指标模型 + """ + path: str + method: str + status_code: int + response_time: float + timestamp: datetime + client_ip: str + user_agent: str + + +class PerformanceSnapshot(BaseModel): + """ + 性能快照模型 + """ + timestamp: datetime + cpu_usage: float + memory_usage: float + active_requests: int + request_rate: float + avg_response_time: float + error_rate: float + slow_requests: int + + +class EndpointStats(BaseModel): + """ + 端点统计模型 + """ + endpoint: str + count: int + total_time: float + errors: int + avg_time: float + + +class ErrorRequest(BaseModel): + """ + 错误请求模型 + """ + timestamp: str + method: str + path: str + status_code: int + response_time: float + client_ip: str + + +class MonitoringOverview(BaseModel): + """ + 监控概览模型 + """ + performance: PerformanceSnapshot + top_endpoints: List[EndpointStats] + recent_errors: List[ErrorRequest] + alerts: List[str] + + +class MonitoringConfig(BaseModel): + """ + 监控配置模型 + """ + slow_request_threshold: float = 1.0 + error_threshold: float = 0.05 + cpu_threshold: float = 80.0 + memory_threshold: float = 80.0 + max_history: int = 1000 + window_size: int = 60 diff --git a/app/utils/system.py b/app/utils/system.py index 1b3848ae..a711d3a2 100644 --- a/app/utils/system.py +++ b/app/utils/system.py @@ -494,11 +494,11 @@ class SystemUtils: time.sleep(1) # 等待1秒 # 获取1秒后的网络统计 net_io_2 = psutil.net_io_counters() - + # 计算1秒内的流量变化 upload_speed = net_io_2.bytes_sent - net_io_1.bytes_sent download_speed = net_io_2.bytes_recv - net_io_1.bytes_recv - + return [upload_speed, download_speed] @staticmethod diff --git a/requirements.in b/requirements.in index 2e9a297b..41af8815 100644 --- a/requirements.in +++ b/requirements.in @@ -76,4 +76,6 @@ setuptools~=78.1.0 pympler~=1.1 smbprotocol~=1.15.0 setproctitle~=1.3.6 -httpx~=0.28.1 \ No newline at end of file +httpx~=0.28.1 +prometheus-client~=0.22.1 +prometheus-fastapi-instrumentator~=7.1.0 \ No newline at end of file