add pympler

This commit is contained in:
jxxghp
2025-06-08 18:54:35 +08:00
parent 0835a75503
commit a18040ccfa
5 changed files with 340 additions and 0 deletions

View File

@@ -247,6 +247,12 @@ class ConfigModel(BaseModel):
REPO_GITHUB_TOKEN: Optional[str] = None
# 大内存模式
BIG_MEMORY_MODE: bool = False
# 是否启用内存监控
MEMORY_ANALYSIS: bool = False
# 内存快照间隔(分钟)
MEMORY_SNAPSHOT_INTERVAL: int = 5
# 保留的内存快照文件数量
MEMORY_SNAPSHOT_KEEP_COUNT: int = 30
# 全局图片缓存,将媒体图片缓存到本地
GLOBAL_IMAGE_CACHE: bool = False
# 是否启用编码探测的性能模式

313
app/helper/memory.py Normal file
View File

@@ -0,0 +1,313 @@
import threading
import time
from datetime import datetime
from typing import Optional
import psutil
from pympler import muppy, summary, asizeof
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.schemas import ConfigChangeEventData
from app.schemas.types import EventType
from app.utils.singleton import Singleton
class MemoryHelper(metaclass=Singleton):
"""
内存管理工具类,用于监控和优化内存使用
"""
def __init__(self):
# 检查间隔(秒) - 从配置获取默认5分钟
self._check_interval = settings.MEMORY_SNAPSHOT_INTERVAL * 60
self._monitoring = False
self._monitor_thread: Optional[threading.Thread] = None
# 内存快照保存目录
self._memory_snapshot_dir = settings.LOG_PATH / "memory_snapshots"
# 保留的快照文件数量
self._keep_count = settings.MEMORY_SNAPSHOT_KEEP_COUNT
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件,更新内存监控设置
:param event: 事件对象
"""
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in ['MEMORY_ANALYSIS', 'MEMORY_SNAPSHOT_INTERVAL', 'MEMORY_SNAPSHOT_KEEP_COUNT']:
return
# 更新配置
if event_data.key == 'MEMORY_SNAPSHOT_INTERVAL':
self._check_interval = settings.MEMORY_SNAPSHOT_INTERVAL * 60
elif event_data.key == 'MEMORY_SNAPSHOT_KEEP_COUNT':
self._keep_count = settings.MEMORY_SNAPSHOT_KEEP_COUNT
self.stop_monitoring()
self.start_monitoring()
def start_monitoring(self):
"""
开始内存监控
"""
if not settings.MEMORY_ANALYSIS:
return
if self._monitoring:
return
# 创建内存快照目录
self._memory_snapshot_dir.mkdir(parents=True, exist_ok=True)
# 初始化内存分析器
self._monitoring = True
self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._monitor_thread.start()
logger.info("内存监控已启动")
def stop_monitoring(self):
"""
停止内存监控
"""
self._monitoring = False
if self._monitor_thread:
self._monitor_thread.join(timeout=5)
logger.info("内存监控已停止")
def _monitor_loop(self):
"""
内存监控循环
"""
logger.info("内存监控循环开始")
while self._monitoring:
try:
# 生成内存快照
self._create_memory_snapshot()
time.sleep(self._check_interval)
except Exception as e:
logger.error(f"内存监控出错: {e}")
# 出错后等待1分钟再继续
time.sleep(60)
logger.info("内存监控循环结束")
def _create_memory_snapshot(self):
"""
创建内存快照并保存到文件
"""
try:
# 获取当前时间戳
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
snapshot_file = self._memory_snapshot_dir / f"memory_snapshot_{timestamp}.txt"
# 获取当前进程的内存使用情况
all_objects = muppy.get_objects()
sum1 = summary.summarize(all_objects)
# 获取系统内存使用情况
memory_usage = psutil.Process().memory_info().rss
# 写入内存快照文件
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write(f"内存快照时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"当前进程内存使用: {memory_usage / 1024 / 1024:.2f} MB\n")
f.write("=" * 80 + "\n")
f.write("对象类型统计:\n")
f.write("-" * 80 + "\n")
# 写入对象统计信息
for line in summary.format_(sum1):
f.write(line + "\n")
# 添加最大对象信息
f.write("\n" + "=" * 80 + "\n")
f.write("最大内存占用对象详情:\n")
f.write("-" * 80 + "\n")
largest_objects = self._get_largest_objects()
for i, obj_info in enumerate(largest_objects[:10], 1):
f.write(f"{i:2d}. {obj_info['type']:<30} {obj_info['size_mb']:>8.2f} MB - {obj_info['description']}\n")
logger.info(f"内存快照已保存: {snapshot_file}, 当前内存使用: {memory_usage / 1024 / 1024:.2f} MB")
# 清理过期的快照文件保留最近30个
self._cleanup_old_snapshots()
except Exception as e:
logger.error(f"创建内存快照失败: {e}")
def _cleanup_old_snapshots(self):
"""
清理过期的内存快照文件,只保留最近的指定数量文件
"""
try:
snapshot_files = list(self._memory_snapshot_dir.glob("memory_snapshot_*.txt"))
if len(snapshot_files) > self._keep_count:
# 按修改时间排序,删除最旧的文件
snapshot_files.sort(key=lambda x: x.stat().st_mtime)
for old_file in snapshot_files[:-self._keep_count]:
old_file.unlink()
logger.debug(f"已删除过期内存快照: {old_file}")
except Exception as e:
logger.error(f"清理过期快照失败: {e}")
def _get_largest_objects(self, top_n: int = 20) -> list:
"""
获取内存占用最大的对象列表
:param top_n: 返回前N个最大对象
:return: 对象信息列表
"""
try:
# 获取所有对象
all_objects = muppy.get_objects()
# 计算每个对象的大小并收集信息
object_sizes = []
for obj in all_objects:
try:
# 使用asizeof计算对象真实大小
size = asizeof.asizeof(obj)
if size > 1024: # 只关注大于1KB的对象
obj_type = type(obj).__name__
obj_module = getattr(type(obj), '__module__', 'unknown')
# 生成对象描述
description = self._generate_object_description(obj)
object_sizes.append({
'size': size,
'type': f"{obj_module}.{obj_type}" if obj_module != 'builtins' else obj_type,
'description': description
})
except (TypeError, AttributeError, RuntimeError):
# 某些对象可能无法计算大小,跳过
continue
# 按大小排序并取前N个
object_sizes.sort(key=lambda x: x['size'], reverse=True)
# 转换为所需格式
return [
{
'type': obj_info['type'],
'size_mb': obj_info['size'] / 1024 / 1024,
'size_bytes': obj_info['size'],
'description': obj_info['description']
}
for obj_info in object_sizes[:top_n]
]
except Exception as e:
logger.error(f"获取最大对象信息失败: {e}")
return []
@staticmethod
def _generate_object_description(obj) -> str:
"""
生成对象的描述信息
:param obj: 要描述的对象
:return: 对象描述字符串
"""
try:
# 根据对象类型生成不同的描述
if isinstance(obj, (list, tuple)):
length = len(obj)
first_type = type(obj[0]).__name__ if obj else 'empty'
return f"长度={length}, 示例元素类型={first_type}"
elif isinstance(obj, dict):
length = len(obj)
if obj:
first_key = next(iter(obj))
key_type = type(first_key).__name__
return f"键值对数={length}, 示例键类型={key_type}"
return f"键值对数={length}, 示例键类型=empty"
elif isinstance(obj, set):
length = len(obj)
if obj:
first_item = next(iter(obj))
item_type = type(first_item).__name__
return f"元素数={length}, 示例元素类型={item_type}"
return f"元素数={length}, 示例元素类型=empty"
elif isinstance(obj, str):
length = len(obj)
preview = obj[:50] + '...' if length > 50 else obj
return f"长度={length}, 内容='{preview}'"
elif hasattr(obj, '__name__'):
return f"名称={obj.__name__}"
elif hasattr(obj, '__dict__'):
attrs_count = len(obj.__dict__)
return f"实例属性数={attrs_count}"
else:
obj_str = str(obj)[:50]
return f"对象={obj_str}"
except Exception as e:
return f"无法获取描述:{e}"
def get_current_memory_info(self) -> dict:
"""
获取当前内存使用信息
:return: 内存使用信息字典
"""
try:
# 获取当前进程内存使用
current_memory = psutil.Process().memory_info().rss
# 获取对象统计
all_objects = muppy.get_objects()
sum1 = summary.summarize(all_objects)
memory_info = {
"current_memory_mb": round(current_memory / 1024 / 1024, 2),
"object_count": len(all_objects),
"top_objects": [],
"largest_objects": []
}
# 解析对象统计信息
formatted_lines = list(summary.format_(sum1))
for line in formatted_lines:
parts = line.split('|')
if len(parts) >= 3 and not line.startswith('=') and not line.startswith(' types'):
# 解析格式: "type_name | count | size"
type_name = parts[0].strip()
count_str = parts[1].strip()
size_str = parts[2].strip()
# 跳过表头和分隔线
if type_name and count_str.isdigit():
# 解析大小(可能包含单位)
size_mb = 0
if 'MB' in size_str:
size_mb = float(size_str.replace('MB', '').strip())
elif 'KB' in size_str:
size_mb = float(size_str.replace('KB', '').strip()) / 1024
elif 'B' in size_str:
size_mb = float(size_str.replace('B', '').strip()) / 1024 / 1024
memory_info["top_objects"].append({
"type": type_name,
"count": count_str,
"size_mb": round(size_mb, 2)
})
# 只取前10个有效对象
if len(memory_info["top_objects"]) >= 10:
break
# 获取最大对象信息
memory_info["largest_objects"] = self._get_largest_objects(10)
return memory_info
except Exception as e:
logger.error(f"获取内存信息失败: {e}")
return {"error": str(e)}

View File

@@ -6,6 +6,7 @@ from fastapi import FastAPI
from app.chain.system import SystemChain
from app.core.config import global_vars
from app.startup.command_initializer import init_command, stop_command, restart_command
from app.startup.memory_initializer import init_memory_manager, stop_memory_manager
from app.startup.modules_initializer import init_modules, stop_modules
from app.startup.monitor_initializer import stop_monitor, init_monitor
from app.startup.plugins_initializer import init_plugins, stop_plugins, sync_plugins
@@ -47,6 +48,8 @@ async def lifespan(app: FastAPI):
init_command()
# 初始化工作流
init_workflow()
# 初始化内存管理
init_memory_manager()
# 插件同步到本地
sync_plugins_task = asyncio.create_task(init_plugin_system())
try:
@@ -64,6 +67,8 @@ async def lifespan(app: FastAPI):
pass
except Exception as e:
print(str(e))
# 停止内存管理器
stop_memory_manager()
# 停止工作流
stop_workflow()
# 停止命令

View File

@@ -0,0 +1,15 @@
from app.helper.memory import MemoryHelper
def init_memory_manager():
"""
初始化内存监控器
"""
MemoryHelper().start_monitoring()
def stop_memory_manager():
"""
停止内存监控器
"""
MemoryHelper().stop_monitoring()

View File

@@ -70,3 +70,4 @@ cf_clearance~=0.31.0
oss2~=2.19.1
tqdm~=4.67.1
setuptools~=78.1.0
pympler~=0.9