remove monitoring

This commit is contained in:
jxxghp
2025-09-04 11:23:22 +08:00
parent 48aeb98bf1
commit 89b0ea0bf1
9 changed files with 3 additions and 1047 deletions

View File

@@ -2,7 +2,7 @@ from fastapi import APIRouter
from app.api.endpoints import login, user, webhook, message, site, subscribe, \
media, douban, search, plugin, tmdb, history, system, download, dashboard, \
transfer, mediaserver, bangumi, storage, discover, recommend, workflow, torrent, monitoring
transfer, mediaserver, bangumi, storage, discover, recommend, workflow, torrent
api_router = APIRouter()
api_router.include_router(login.router, prefix="/login", tags=["login"])
@@ -28,4 +28,3 @@ api_router.include_router(discover.router, prefix="/discover", tags=["discover"]
api_router.include_router(recommend.router, prefix="/recommend", tags=["recommend"])
api_router.include_router(workflow.router, prefix="/workflow", tags=["workflow"])
api_router.include_router(torrent.router, prefix="/torrent", tags=["torrent"])
api_router.include_router(monitoring.router, prefix="/monitoring", tags=["monitoring"])

View File

@@ -1,409 +0,0 @@
from typing import Any, List
from fastapi import APIRouter, Depends, Query
from fastapi.responses import HTMLResponse
from app import schemas
from app.core.security import verify_apitoken
from app.monitoring import monitor, get_metrics_response
from app.schemas.monitoring import (
PerformanceSnapshot,
EndpointStats,
ErrorRequest,
MonitoringOverview
)
router = APIRouter()
@router.get("/overview", summary="获取监控概览", response_model=schemas.MonitoringOverview)
def get_overview(_: str = Depends(verify_apitoken)) -> Any:
"""
获取完整的监控概览信息
"""
# 获取性能快照
performance = monitor.get_performance_snapshot()
# 获取最活跃端点
top_endpoints = monitor.get_top_endpoints(limit=10)
# 获取最近错误
recent_errors = monitor.get_recent_errors(limit=20)
# 检查告警
alerts = monitor.check_alerts()
return MonitoringOverview(
performance=PerformanceSnapshot(
timestamp=performance.timestamp,
cpu_usage=performance.cpu_usage,
memory_usage=performance.memory_usage,
active_requests=performance.active_requests,
request_rate=performance.request_rate,
avg_response_time=performance.avg_response_time,
error_rate=performance.error_rate,
slow_requests=performance.slow_requests
),
top_endpoints=[EndpointStats(**endpoint) for endpoint in top_endpoints],
recent_errors=[ErrorRequest(**error) for error in recent_errors],
alerts=alerts
)
@router.get("/performance", summary="获取性能快照", response_model=schemas.PerformanceSnapshot)
def get_performance(_: str = Depends(verify_apitoken)) -> Any:
"""
获取当前性能快照
"""
snapshot = monitor.get_performance_snapshot()
return PerformanceSnapshot(
timestamp=snapshot.timestamp,
cpu_usage=snapshot.cpu_usage,
memory_usage=snapshot.memory_usage,
active_requests=snapshot.active_requests,
request_rate=snapshot.request_rate,
avg_response_time=snapshot.avg_response_time,
error_rate=snapshot.error_rate,
slow_requests=snapshot.slow_requests
)
@router.get("/endpoints", summary="获取端点统计", response_model=List[schemas.EndpointStats])
def get_endpoints(
limit: int = Query(10, ge=1, le=50, description="返回的端点数量"),
_: str = Depends(verify_apitoken)
) -> Any:
"""
获取最活跃的API端点统计
"""
endpoints = monitor.get_top_endpoints(limit=limit)
return [EndpointStats(**endpoint) for endpoint in endpoints]
@router.get("/errors", summary="获取错误请求", response_model=List[schemas.ErrorRequest])
def get_errors(
limit: int = Query(20, ge=1, le=100, description="返回的错误数量"),
_: str = Depends(verify_apitoken)
) -> Any:
"""
获取最近的错误请求记录
"""
errors = monitor.get_recent_errors(limit=limit)
return [ErrorRequest(**error) for error in errors]
@router.get("/alerts", summary="获取告警信息", response_model=List[str])
def get_alerts(_: str = Depends(verify_apitoken)) -> Any:
"""
获取当前告警信息
"""
return monitor.check_alerts()
@router.get("/metrics", summary="Prometheus指标")
def get_prometheus_metrics(_: str = Depends(verify_apitoken)) -> Any:
"""
获取Prometheus格式的监控指标
"""
return get_metrics_response()
@router.get("/dashboard", summary="监控仪表板", response_class=HTMLResponse)
def get_dashboard(_: str = Depends(verify_apitoken)) -> Any:
"""
获取实时监控仪表板HTML页面
"""
return HTMLResponse(content="""
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>MoviePilot 性能监控仪表板</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<style>
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
margin: 0;
padding: 20px;
background-color: #f5f5f5;
}
.container {
max-width: 1200px;
margin: 0 auto;
}
.header {
text-align: center;
margin-bottom: 30px;
color: #333;
}
.metrics-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
gap: 20px;
margin-bottom: 30px;
}
.metric-card {
background: white;
padding: 20px;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
text-align: center;
}
.metric-value {
font-size: 2em;
font-weight: bold;
color: #2196F3;
}
.metric-label {
color: #666;
margin-top: 5px;
}
.chart-container {
background: white;
padding: 20px;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
margin-bottom: 20px;
}
.alerts {
background: #fff3cd;
border: 1px solid #ffeaa7;
border-radius: 5px;
padding: 15px;
margin-bottom: 20px;
}
.alert-item {
color: #856404;
margin: 5px 0;
}
.refresh-btn {
background: #2196F3;
color: white;
border: none;
padding: 10px 20px;
border-radius: 5px;
cursor: pointer;
margin-bottom: 20px;
}
.refresh-btn:hover {
background: #1976D2;
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🎬 MoviePilot 性能监控仪表板</h1>
<button class="refresh-btn" onclick="refreshData()">刷新数据</button>
</div>
<div id="alerts" class="alerts" style="display: none;">
<h3>⚠️ 告警信息</h3>
<div id="alerts-list"></div>
</div>
<div class="metrics-grid">
<div class="metric-card">
<div class="metric-value" id="cpu-usage">--</div>
<div class="metric-label">CPU使用率 (%)</div>
</div>
<div class="metric-card">
<div class="metric-value" id="memory-usage">--</div>
<div class="metric-label">内存使用率 (%)</div>
</div>
<div class="metric-card">
<div class="metric-value" id="active-requests">--</div>
<div class="metric-label">活跃请求数</div>
</div>
<div class="metric-card">
<div class="metric-value" id="request-rate">--</div>
<div class="metric-label">请求率 (req/min)</div>
</div>
<div class="metric-card">
<div class="metric-value" id="avg-response-time">--</div>
<div class="metric-label">平均响应时间 (s)</div>
</div>
<div class="metric-card">
<div class="metric-value" id="error-rate">--</div>
<div class="metric-label">错误率 (%)</div>
</div>
</div>
<div class="chart-container">
<h3>📊 性能趋势</h3>
<canvas id="performanceChart" width="400" height="200"></canvas>
</div>
<div class="chart-container">
<h3>🔥 最活跃端点</h3>
<canvas id="endpointsChart" width="400" height="200"></canvas>
</div>
</div>
<script>
let performanceChart, endpointsChart;
let performanceData = {
labels: [],
cpu: [],
memory: [],
requests: []
};
// 初始化图表
function initCharts() {
const ctx1 = document.getElementById('performanceChart').getContext('2d');
performanceChart = new Chart(ctx1, {
type: 'line',
data: {
labels: performanceData.labels,
datasets: [{
label: 'CPU使用率 (%)',
data: performanceData.cpu,
borderColor: '#2196F3',
backgroundColor: 'rgba(33, 150, 243, 0.1)',
tension: 0.4
}, {
label: '内存使用率 (%)',
data: performanceData.memory,
borderColor: '#4CAF50',
backgroundColor: 'rgba(76, 175, 80, 0.1)',
tension: 0.4
}, {
label: '活跃请求数',
data: performanceData.requests,
borderColor: '#FF9800',
backgroundColor: 'rgba(255, 152, 0, 0.1)',
tension: 0.4
}]
},
options: {
responsive: true,
scales: {
y: {
beginAtZero: true
}
}
}
});
const ctx2 = document.getElementById('endpointsChart').getContext('2d');
endpointsChart = new Chart(ctx2, {
type: 'bar',
data: {
labels: [],
datasets: [{
label: '请求数',
data: [],
backgroundColor: 'rgba(33, 150, 243, 0.8)'
}]
},
options: {
responsive: true,
scales: {
y: {
beginAtZero: true
}
}
}
});
}
// 更新性能数据
function updatePerformanceData(data) {
const now = new Date().toLocaleTimeString();
performanceData.labels.push(now);
performanceData.cpu.push(data.performance.cpu_usage);
performanceData.memory.push(data.performance.memory_usage);
performanceData.requests.push(data.performance.active_requests);
// 保持最近20个数据点
if (performanceData.labels.length > 20) {
performanceData.labels.shift();
performanceData.cpu.shift();
performanceData.memory.shift();
performanceData.requests.shift();
}
// 更新图表
performanceChart.data.labels = performanceData.labels;
performanceChart.data.datasets[0].data = performanceData.cpu;
performanceChart.data.datasets[1].data = performanceData.memory;
performanceChart.data.datasets[2].data = performanceData.requests;
performanceChart.update();
// 更新端点图表
const endpointLabels = data.top_endpoints.map(e => e.endpoint.substring(0, 20));
const endpointData = data.top_endpoints.map(e => e.count);
endpointsChart.data.labels = endpointLabels;
endpointsChart.data.datasets[0].data = endpointData;
endpointsChart.update();
}
// 更新指标显示
function updateMetrics(data) {
document.getElementById('cpu-usage').textContent = data.performance.cpu_usage.toFixed(1);
document.getElementById('memory-usage').textContent = data.performance.memory_usage.toFixed(1);
document.getElementById('active-requests').textContent = data.performance.active_requests;
document.getElementById('request-rate').textContent = data.performance.request_rate.toFixed(0);
document.getElementById('avg-response-time').textContent = data.performance.avg_response_time.toFixed(3);
document.getElementById('error-rate').textContent = (data.performance.error_rate * 100).toFixed(2);
}
// 更新告警
function updateAlerts(alerts) {
const alertsDiv = document.getElementById('alerts');
const alertsList = document.getElementById('alerts-list');
if (alerts.length > 0) {
alertsDiv.style.display = 'block';
alertsList.innerHTML = alerts.map(alert =>
`<div class="alert-item">⚠️ ${alert}</div>`
).join('');
} else {
alertsDiv.style.display = 'none';
}
}
// 获取URL中的token参数
function getTokenFromUrl() {
const urlParams = new URLSearchParams(window.location.search);
return urlParams.get('token');
}
// 刷新数据
async function refreshData() {
try {
const token = getTokenFromUrl();
if (!token) {
console.error('未找到token参数');
return;
}
const response = await fetch(`/api/v1/monitoring/overview?token=${token}`);
if (response.ok) {
const data = await response.json();
updateMetrics(data);
updatePerformanceData(data);
updateAlerts(data.alerts);
}
} catch (error) {
console.error('获取监控数据失败:', error);
}
}
// 页面加载完成后初始化
document.addEventListener('DOMContentLoaded', function() {
initCharts();
refreshData();
// 每5秒自动刷新
setInterval(refreshData, 5000);
});
</script>
</body>
</html>
""")

View File

@@ -360,8 +360,6 @@ class ConfigModel(BaseModel):
# ==================== 性能配置 ====================
# 大内存模式
BIG_MEMORY_MODE: bool = False
# FastApi性能监控
PERFORMANCE_MONITOR_ENABLE: bool = False
# 是否启用编码探测的性能模式
ENCODING_DETECTION_PERFORMANCE_MODE: bool = True
# 编码探测的最低置信度阈值

View File

@@ -21,7 +21,7 @@ from app.core.config import settings
from app.core.event import eventmanager, Event
from app.db.plugindata_oper import PluginDataOper
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.plugin import PluginHelper, PluginMemoryMonitor
from app.helper.plugin import PluginHelper
from app.helper.sites import SitesHelper # noqa
from app.log import logger
from app.schemas.types import EventType, SystemConfigKey
@@ -98,8 +98,6 @@ class PluginManager(metaclass=Singleton):
self._config_key: str = "plugin.%s"
# 监听器
self._observer: Observer = None
# 内存监控器
self._memory_monitor = PluginMemoryMonitor()
# 开发者模式监测插件修改
if settings.DEV or settings.PLUGIN_AUTO_RELOAD:
self.__start_monitor()
@@ -865,28 +863,6 @@ class PluginManager(metaclass=Singleton):
"""
return list(self._running_plugins.keys())
def get_plugin_memory_stats(self, pid: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取插件内存统计信息
:param pid: 插件ID为空则获取所有插件
:return: 内存统计信息列表
"""
if pid:
plugin_instance = self._running_plugins.get(pid)
if plugin_instance:
return [self._memory_monitor.get_plugin_memory_usage(pid, plugin_instance)]
else:
return []
else:
return self._memory_monitor.get_all_plugins_memory_usage(self._running_plugins)
def clear_plugin_memory_cache(self, pid: Optional[str] = None):
"""
清除插件内存统计缓存
:param pid: 插件ID为空则清除所有缓存
"""
self._memory_monitor.clear_cache(pid)
def get_online_plugins(self, force: bool = False) -> List[schemas.Plugin]:
"""
获取所有在线插件信息

View File

@@ -3,7 +3,6 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import ORJSONResponse
from app.core.config import settings
from app.monitoring import setup_prometheus_metrics
from app.startup.lifecycle import lifespan
@@ -27,9 +26,6 @@ def create_app() -> FastAPI:
allow_headers=["*"],
)
# 设置性能监控
setup_prometheus_metrics(_app)
return _app

View File

@@ -4,11 +4,10 @@ import json
import shutil
import site
import sys
import time
import traceback
import zipfile
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Set, Callable, Awaitable, Any
from typing import Dict, List, Optional, Tuple, Set, Callable, Awaitable
import aiofiles
import aioshutil
@@ -25,7 +24,6 @@ from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils, AsyncRequestUtils
from app.utils.memory import MemoryCalculator
from app.utils.singleton import WeakSingleton
from app.utils.system import SystemUtils
from app.utils.url import UrlUtils
@@ -1593,87 +1591,3 @@ class PluginHelper(metaclass=WeakSingleton):
except Exception as e:
logger.error(f"解压 Release 压缩包失败:{e}")
return False, f"解压 Release 压缩包失败:{e}"
class PluginMemoryMonitor:
"""
插件内存监控器
"""
def __init__(self):
self._calculator = MemoryCalculator()
self._cache = {}
self._cache_ttl = 300 # 缓存5分钟
def get_plugin_memory_usage(self, plugin_id: str, plugin_instance: Any) -> Dict[str, Any]:
"""
获取插件内存使用情况
:param plugin_id: 插件ID
:param plugin_instance: 插件实例
:return: 内存使用信息
"""
# 检查缓存
if self._is_cache_valid(plugin_id):
return self._cache[plugin_id]
# 计算内存使用
memory_info = self._calculator.calculate_object_memory(plugin_instance)
# 添加插件信息
result = {
'plugin_id': plugin_id,
'plugin_name': getattr(plugin_instance, 'plugin_name', 'Unknown'),
'plugin_version': getattr(plugin_instance, 'plugin_version', 'Unknown'),
'timestamp': time.time(),
**memory_info
}
# 更新缓存
self._cache[plugin_id] = result
return result
def get_all_plugins_memory_usage(self, plugins: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
获取所有插件的内存使用情况
:param plugins: 插件实例字典
:return: 内存使用信息列表
"""
results = []
for plugin_id, plugin_instance in plugins.items():
if plugin_instance:
try:
memory_info = self.get_plugin_memory_usage(plugin_id, plugin_instance)
results.append(memory_info)
except Exception as e:
logger.error(f"获取插件 {plugin_id} 内存使用情况失败:{str(e)}")
results.append({
'plugin_id': plugin_id,
'plugin_name': getattr(plugin_instance, 'plugin_name', 'Unknown'),
'error': str(e),
'total_memory_bytes': 0,
'total_memory_mb': 0,
'object_count': 0,
'calculation_time_ms': 0
})
# 按内存使用量排序
results.sort(key=lambda x: x.get('total_memory_bytes', 0), reverse=True)
return results
def _is_cache_valid(self, plugin_id: str) -> bool:
"""
检查缓存是否有效
"""
if plugin_id not in self._cache:
return False
return time.time() - self._cache[plugin_id]['timestamp'] < self._cache_ttl
def clear_cache(self, plugin_id: Optional[str] = None):
"""
清除缓存
:param plugin_id: 插件ID为空则清除所有缓存
"""
if plugin_id:
self._cache.pop(plugin_id, None)
else:
self._cache.clear()

View File

@@ -1,338 +0,0 @@
import threading
import time
from collections import defaultdict, deque
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List, Any
import psutil
from fastapi import Request, Response
from fastapi.responses import PlainTextResponse
from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
from prometheus_fastapi_instrumentator import Instrumentator
from app.core.config import settings
from app.log import logger
@dataclass
class RequestMetrics:
"""
请求指标数据类
"""
path: str
method: str
status_code: int
response_time: float
timestamp: datetime
client_ip: str
user_agent: str
@dataclass
class PerformanceSnapshot:
"""
性能快照数据类
"""
timestamp: datetime
cpu_usage: float
memory_usage: float
active_requests: int
request_rate: float
avg_response_time: float
error_rate: float
slow_requests: int
class FastAPIMonitor:
"""
FastAPI性能监控器
"""
def __init__(self, max_history: int = 1000, window_size: int = 60):
self.max_history = max_history
self.window_size = window_size # 秒
# 请求历史记录
self.request_history: deque = deque(maxlen=max_history)
# 实时统计
self.active_requests = 0
self.total_requests = 0
self.error_requests = 0
self.slow_requests = 0 # 响应时间超过1秒的请求
# 时间窗口统计
self.window_requests: deque = deque(maxlen=window_size)
self.window_response_times: deque = deque(maxlen=window_size)
# 线程锁
self._lock = threading.Lock()
# 性能阈值
self.slow_request_threshold = 1.0 # 1秒
self.error_threshold = 0.05 # 5%
self.cpu_threshold = 80.0 # 80%
self.memory_threshold = 80.0 # 80%
# 告警状态
self.alerts: List[str] = []
logger.debug("FastAPI性能监控器已初始化")
def record_request(self, request: Request, response: Response, response_time: float):
"""
记录请求指标
"""
with self._lock:
# 创建请求指标
metrics = RequestMetrics(
path=str(request.url.path),
method=request.method,
status_code=response.status_code,
response_time=response_time,
timestamp=datetime.now(),
client_ip=request.client.host if request.client else "unknown",
user_agent=request.headers.get("user-agent", "unknown")
)
# 添加到历史记录
self.request_history.append(metrics)
# 更新统计
self.total_requests += 1
if response.status_code >= 400:
self.error_requests += 1
if response_time > self.slow_request_threshold:
self.slow_requests += 1
# 添加到时间窗口
self.window_requests.append(metrics)
self.window_response_times.append(response_time)
def start_request(self):
"""
开始处理请求
"""
with self._lock:
self.active_requests += 1
def end_request(self):
"""
结束处理请求
"""
with self._lock:
self.active_requests = max(0, self.active_requests - 1)
def get_performance_snapshot(self) -> PerformanceSnapshot:
"""
获取性能快照
"""
with self._lock:
now = datetime.now()
# 计算请求率(每分钟)
recent_requests = [
req for req in self.window_requests
if now - req.timestamp < timedelta(seconds=self.window_size)
]
request_rate = len(recent_requests) / (self.window_size / 60)
# 计算平均响应时间
recent_response_times = [
rt for rt in self.window_response_times
if len(self.window_response_times) > 0
]
avg_response_time = sum(recent_response_times) / len(recent_response_times) if recent_response_times else 0
# 计算错误率
error_rate = self.error_requests / self.total_requests if self.total_requests > 0 else 0
# 系统资源使用率
cpu_usage = psutil.cpu_percent(interval=0.1)
memory_usage = psutil.virtual_memory().percent
return PerformanceSnapshot(
timestamp=now,
cpu_usage=cpu_usage,
memory_usage=memory_usage,
active_requests=self.active_requests,
request_rate=request_rate,
avg_response_time=avg_response_time,
error_rate=error_rate,
slow_requests=self.slow_requests
)
def get_top_endpoints(self, limit: int = 10) -> List[Dict[str, Any]]:
"""
获取最活跃的端点
"""
with self._lock:
endpoint_stats = defaultdict(lambda: {
'count': 0,
'total_time': 0,
'errors': 0,
'avg_time': 0.0
})
for req in self.request_history:
key = f"{req.method} {req.path}"
endpoint_stats[key]['count'] += 1
endpoint_stats[key]['total_time'] += req.response_time
if req.status_code >= 400:
endpoint_stats[key]['errors'] += 1
# 计算平均时间
for stats in endpoint_stats.values():
if stats['count'] > 0:
stats['avg_time'] = stats['total_time'] / stats['count']
# 按请求数量排序
sorted_endpoints = sorted(
[{'endpoint': k, **v} for k, v in endpoint_stats.items()],
key=lambda x: x['count'],
reverse=True
)
return sorted_endpoints[:limit]
def get_recent_errors(self, limit: int = 20) -> List[Dict[str, Any]]:
"""
获取最近的错误请求
"""
with self._lock:
errors = [
{
'timestamp': req.timestamp.isoformat(),
'method': req.method,
'path': req.path,
'status_code': req.status_code,
'response_time': req.response_time,
'client_ip': req.client_ip
}
for req in self.request_history
if req.status_code >= 400
]
return errors[-limit:]
def check_alerts(self) -> List[str]:
"""
检查告警条件
"""
snapshot = self.get_performance_snapshot()
alerts = []
if snapshot.error_rate > self.error_threshold:
alerts.append(f"错误率过高: {snapshot.error_rate:.2%}")
if snapshot.cpu_usage > self.cpu_threshold:
alerts.append(f"CPU使用率过高: {snapshot.cpu_usage:.1f}%")
if snapshot.memory_usage > self.memory_threshold:
alerts.append(f"内存使用率过高: {snapshot.memory_usage:.1f}%")
if snapshot.avg_response_time > self.slow_request_threshold:
alerts.append(f"平均响应时间过长: {snapshot.avg_response_time:.2f}s")
if snapshot.request_rate > 1000: # 每分钟1000请求
alerts.append(f"请求率过高: {snapshot.request_rate:.0f} req/min")
self.alerts = alerts
return alerts
# 全局监控实例
monitor = FastAPIMonitor()
def setup_prometheus_metrics(app):
"""
设置Prometheus指标
"""
if not settings.PERFORMANCE_MONITOR_ENABLE:
return
# 创建Prometheus指标
request_counter = Counter(
"http_requests_total",
"Total number of HTTP requests",
["method", "endpoint", "status"]
)
request_duration = Histogram(
"http_request_duration_seconds",
"HTTP request duration in seconds",
["method", "endpoint"]
)
active_requests = Gauge(
"http_active_requests",
"Number of active HTTP requests"
)
# 自定义指标收集函数
def custom_metrics(request: Request, response: Response, response_time: float):
request_counter.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
request_duration.labels(
method=request.method,
endpoint=request.url.path
).observe(response_time)
active_requests.set(monitor.active_requests)
# 设置Prometheus监控
Instrumentator().instrument(app).expose(app, include_in_schema=False, should_gzip=True)
# 添加自定义指标
@app.middleware("http")
async def monitor_middleware(request: Request, call_next):
start_time = time.time()
# 开始请求
monitor.start_request()
try:
response = await call_next(request)
response_time = time.time() - start_time
# 记录请求指标
monitor.record_request(request, response, response_time)
# 更新Prometheus指标
custom_metrics(request, response, response_time)
return response
except Exception as e:
response_time = time.time() - start_time
logger.error(f"请求处理异常: {e}")
# 创建错误响应
response = Response(
content=str(e),
status_code=500,
media_type="text/plain"
)
# 记录错误请求
monitor.record_request(request, response, response_time)
return response
finally:
# 结束请求
monitor.end_request()
def get_metrics_response():
"""
获取Prometheus指标响应
"""
return PlainTextResponse(
generate_latest(),
media_type=CONTENT_TYPE_LATEST
)

View File

@@ -1,178 +0,0 @@
import sys
import time
from collections import deque
from typing import Any, Dict, Set
from app.log import logger
class MemoryCalculator:
"""
内存计算器,用于递归计算对象的内存占用
"""
def __init__(self):
# 缓存已计算的对象ID避免重复计算
self._calculated_ids: Set[int] = set()
# 最大递归深度,防止无限递归
self._max_depth = 10
# 最大对象数量,防止计算过多对象
self._max_objects = 10000
def calculate_object_memory(self, obj: Any, max_depth: int = None, max_objects: int = None) -> Dict[str, Any]:
"""
计算对象的内存占用
:param obj: 要计算的对象
:param max_depth: 最大递归深度
:param max_objects: 最大对象数量
:return: 内存统计信息
"""
if max_depth is None:
max_depth = self._max_depth
if max_objects is None:
max_objects = self._max_objects
# 重置缓存
self._calculated_ids.clear()
start_time = time.time()
object_details = []
try:
# 递归计算内存
memory_info = self._calculate_recursive(obj, depth=0, max_depth=max_depth,
max_objects=max_objects, object_count=0)
total_memory = memory_info['total_memory']
object_count = memory_info['object_count']
object_details = memory_info['object_details']
except Exception as e:
logger.error(f"计算对象内存时出错:{str(e)}")
total_memory = 0
object_count = 0
calculation_time = time.time() - start_time
return {
'total_memory_bytes': total_memory,
'total_memory_mb': round(total_memory / (1024 * 1024), 2),
'object_count': object_count,
'calculation_time_ms': round(calculation_time * 1000, 2),
'object_details': object_details[:10] # 只返回前10个最大的对象
}
def _calculate_recursive(self, obj: Any, depth: int, max_depth: int,
max_objects: int, object_count: int) -> Dict[str, Any]:
"""
递归计算对象内存
"""
if depth > max_depth or object_count > max_objects:
return {
'total_memory': 0,
'object_count': object_count,
'object_details': []
}
total_memory = 0
object_details = []
# 获取对象ID避免重复计算
obj_id = id(obj)
if obj_id in self._calculated_ids:
return {
'total_memory': 0,
'object_count': object_count,
'object_details': []
}
self._calculated_ids.add(obj_id)
object_count += 1
try:
# 计算对象本身的内存
obj_memory = sys.getsizeof(obj)
total_memory += obj_memory
# 记录大对象
if obj_memory > 1024: # 大于1KB的对象
object_details.append({
'type': type(obj).__name__,
'memory_bytes': obj_memory,
'memory_mb': round(obj_memory / (1024 * 1024), 2),
'depth': depth
})
# 递归计算容器对象的内容
if depth < max_depth:
container_memory = self._calculate_container_memory(
obj, depth + 1, max_depth, max_objects, object_count
)
total_memory += container_memory['total_memory']
object_count = container_memory['object_count']
object_details.extend(container_memory['object_details'])
except Exception as e:
logger.debug(f"计算对象 {type(obj).__name__} 内存时出错:{str(e)}")
return {
'total_memory': total_memory,
'object_count': object_count,
'object_details': object_details
}
def _calculate_container_memory(self, obj: Any, depth: int, max_depth: int,
max_objects: int, object_count: int) -> Dict[str, Any]:
"""
计算容器对象的内存
"""
total_memory = 0
object_details = []
try:
# 处理不同类型的容器
if isinstance(obj, (list, tuple, deque)):
for item in obj:
if object_count > max_objects:
break
item_memory = self._calculate_recursive(item, depth, max_depth, max_objects, object_count)
total_memory += item_memory['total_memory']
object_count = item_memory['object_count']
object_details.extend(item_memory['object_details'])
elif isinstance(obj, dict):
for key, value in obj.items():
if object_count > max_objects:
break
# 计算key的内存
key_memory = self._calculate_recursive(key, depth, max_depth, max_objects, object_count)
total_memory += key_memory['total_memory']
object_count = key_memory['object_count']
object_details.extend(key_memory['object_details'])
# 计算value的内存
value_memory = self._calculate_recursive(value, depth, max_depth, max_objects, object_count)
total_memory += value_memory['total_memory']
object_count = value_memory['object_count']
object_details.extend(value_memory['object_details'])
elif hasattr(obj, '__dict__'):
# 处理有__dict__属性的对象
for attr_name, attr_value in obj.__dict__.items():
if object_count > max_objects:
break
# 跳过一些特殊属性
if attr_name.startswith('_') and attr_name not in ['_calculated_ids']:
continue
attr_memory = self._calculate_recursive(attr_value, depth, max_depth, max_objects, object_count)
total_memory += attr_memory['total_memory']
object_count = attr_memory['object_count']
object_details.extend(attr_memory['object_details'])
except Exception as e:
logger.debug(f"计算容器对象 {type(obj).__name__} 内存时出错:{str(e)}")
return {
'total_memory': total_memory,
'object_count': object_count,
'object_details': object_details
}

View File

@@ -77,6 +77,4 @@ pympler~=1.1
smbprotocol~=1.15.0
setproctitle~=1.3.6
httpx[socks]~=0.28.1
prometheus-client~=0.22.1
prometheus-fastapi-instrumentator~=7.1.0
orjson~=3.11.3