From a18040ccfade5eca0b63e8e0fd74072df344729b Mon Sep 17 00:00:00 2001 From: jxxghp Date: Sun, 8 Jun 2025 18:54:35 +0800 Subject: [PATCH] add pympler --- app/core/config.py | 6 + app/helper/memory.py | 313 ++++++++++++++++++++++++++++++ app/startup/lifecycle.py | 5 + app/startup/memory_initializer.py | 15 ++ requirements.in | 1 + 5 files changed, 340 insertions(+) create mode 100644 app/helper/memory.py create mode 100644 app/startup/memory_initializer.py diff --git a/app/core/config.py b/app/core/config.py index 24afc0ea..fca27ad6 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -247,6 +247,12 @@ class ConfigModel(BaseModel): REPO_GITHUB_TOKEN: Optional[str] = None # 大内存模式 BIG_MEMORY_MODE: bool = False + # 是否启用内存监控 + MEMORY_ANALYSIS: bool = False + # 内存快照间隔(分钟) + MEMORY_SNAPSHOT_INTERVAL: int = 5 + # 保留的内存快照文件数量 + MEMORY_SNAPSHOT_KEEP_COUNT: int = 30 # 全局图片缓存,将媒体图片缓存到本地 GLOBAL_IMAGE_CACHE: bool = False # 是否启用编码探测的性能模式 diff --git a/app/helper/memory.py b/app/helper/memory.py new file mode 100644 index 00000000..5c960c5f --- /dev/null +++ b/app/helper/memory.py @@ -0,0 +1,313 @@ +import threading +import time +from datetime import datetime +from typing import Optional + +import psutil +from pympler import muppy, summary, asizeof + +from app.core.config import settings +from app.core.event import eventmanager, Event +from app.log import logger +from app.schemas import ConfigChangeEventData +from app.schemas.types import EventType +from app.utils.singleton import Singleton + + +class MemoryHelper(metaclass=Singleton): + """ + 内存管理工具类,用于监控和优化内存使用 + """ + + def __init__(self): + # 检查间隔(秒) - 从配置获取,默认5分钟 + self._check_interval = settings.MEMORY_SNAPSHOT_INTERVAL * 60 + self._monitoring = False + self._monitor_thread: Optional[threading.Thread] = None + # 内存快照保存目录 + self._memory_snapshot_dir = settings.LOG_PATH / "memory_snapshots" + # 保留的快照文件数量 + self._keep_count = settings.MEMORY_SNAPSHOT_KEEP_COUNT + + @eventmanager.register(EventType.ConfigChanged) + def handle_config_changed(self, event: Event): + """ + 处理配置变更事件,更新内存监控设置 + :param event: 事件对象 + """ + if not event: + return + event_data: ConfigChangeEventData = event.event_data + if event_data.key not in ['MEMORY_ANALYSIS', 'MEMORY_SNAPSHOT_INTERVAL', 'MEMORY_SNAPSHOT_KEEP_COUNT']: + return + + # 更新配置 + if event_data.key == 'MEMORY_SNAPSHOT_INTERVAL': + self._check_interval = settings.MEMORY_SNAPSHOT_INTERVAL * 60 + elif event_data.key == 'MEMORY_SNAPSHOT_KEEP_COUNT': + self._keep_count = settings.MEMORY_SNAPSHOT_KEEP_COUNT + self.stop_monitoring() + self.start_monitoring() + + def start_monitoring(self): + """ + 开始内存监控 + """ + if not settings.MEMORY_ANALYSIS: + return + if self._monitoring: + return + + # 创建内存快照目录 + self._memory_snapshot_dir.mkdir(parents=True, exist_ok=True) + + # 初始化内存分析器 + self._monitoring = True + self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) + self._monitor_thread.start() + logger.info("内存监控已启动") + + def stop_monitoring(self): + """ + 停止内存监控 + """ + self._monitoring = False + if self._monitor_thread: + self._monitor_thread.join(timeout=5) + + logger.info("内存监控已停止") + + def _monitor_loop(self): + """ + 内存监控循环 + """ + logger.info("内存监控循环开始") + while self._monitoring: + try: + # 生成内存快照 + self._create_memory_snapshot() + time.sleep(self._check_interval) + except Exception as e: + logger.error(f"内存监控出错: {e}") + # 出错后等待1分钟再继续 + time.sleep(60) + + logger.info("内存监控循环结束") + + def _create_memory_snapshot(self): + """ + 创建内存快照并保存到文件 + """ + try: + # 获取当前时间戳 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + snapshot_file = self._memory_snapshot_dir / f"memory_snapshot_{timestamp}.txt" + + # 获取当前进程的内存使用情况 + all_objects = muppy.get_objects() + sum1 = summary.summarize(all_objects) + + # 获取系统内存使用情况 + memory_usage = psutil.Process().memory_info().rss + + # 写入内存快照文件 + with open(snapshot_file, 'w', encoding='utf-8') as f: + f.write(f"内存快照时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + f.write(f"当前进程内存使用: {memory_usage / 1024 / 1024:.2f} MB\n") + f.write("=" * 80 + "\n") + f.write("对象类型统计:\n") + f.write("-" * 80 + "\n") + + # 写入对象统计信息 + for line in summary.format_(sum1): + f.write(line + "\n") + + # 添加最大对象信息 + f.write("\n" + "=" * 80 + "\n") + f.write("最大内存占用对象详情:\n") + f.write("-" * 80 + "\n") + + largest_objects = self._get_largest_objects() + for i, obj_info in enumerate(largest_objects[:10], 1): + f.write(f"{i:2d}. {obj_info['type']:<30} {obj_info['size_mb']:>8.2f} MB - {obj_info['description']}\n") + + logger.info(f"内存快照已保存: {snapshot_file}, 当前内存使用: {memory_usage / 1024 / 1024:.2f} MB") + + # 清理过期的快照文件(保留最近30个) + self._cleanup_old_snapshots() + + except Exception as e: + logger.error(f"创建内存快照失败: {e}") + + def _cleanup_old_snapshots(self): + """ + 清理过期的内存快照文件,只保留最近的指定数量文件 + """ + try: + snapshot_files = list(self._memory_snapshot_dir.glob("memory_snapshot_*.txt")) + if len(snapshot_files) > self._keep_count: + # 按修改时间排序,删除最旧的文件 + snapshot_files.sort(key=lambda x: x.stat().st_mtime) + for old_file in snapshot_files[:-self._keep_count]: + old_file.unlink() + logger.debug(f"已删除过期内存快照: {old_file}") + except Exception as e: + logger.error(f"清理过期快照失败: {e}") + + def _get_largest_objects(self, top_n: int = 20) -> list: + """ + 获取内存占用最大的对象列表 + :param top_n: 返回前N个最大对象 + :return: 对象信息列表 + """ + try: + # 获取所有对象 + all_objects = muppy.get_objects() + + # 计算每个对象的大小并收集信息 + object_sizes = [] + for obj in all_objects: + try: + # 使用asizeof计算对象真实大小 + size = asizeof.asizeof(obj) + if size > 1024: # 只关注大于1KB的对象 + obj_type = type(obj).__name__ + obj_module = getattr(type(obj), '__module__', 'unknown') + + # 生成对象描述 + description = self._generate_object_description(obj) + + object_sizes.append({ + 'size': size, + 'type': f"{obj_module}.{obj_type}" if obj_module != 'builtins' else obj_type, + 'description': description + }) + except (TypeError, AttributeError, RuntimeError): + # 某些对象可能无法计算大小,跳过 + continue + + # 按大小排序并取前N个 + object_sizes.sort(key=lambda x: x['size'], reverse=True) + + # 转换为所需格式 + return [ + { + 'type': obj_info['type'], + 'size_mb': obj_info['size'] / 1024 / 1024, + 'size_bytes': obj_info['size'], + 'description': obj_info['description'] + } + for obj_info in object_sizes[:top_n] + ] + + except Exception as e: + logger.error(f"获取最大对象信息失败: {e}") + return [] + + @staticmethod + def _generate_object_description(obj) -> str: + """ + 生成对象的描述信息 + :param obj: 要描述的对象 + :return: 对象描述字符串 + """ + try: + # 根据对象类型生成不同的描述 + if isinstance(obj, (list, tuple)): + length = len(obj) + first_type = type(obj[0]).__name__ if obj else 'empty' + return f"长度={length}, 示例元素类型={first_type}" + + elif isinstance(obj, dict): + length = len(obj) + if obj: + first_key = next(iter(obj)) + key_type = type(first_key).__name__ + return f"键值对数={length}, 示例键类型={key_type}" + return f"键值对数={length}, 示例键类型=empty" + + elif isinstance(obj, set): + length = len(obj) + if obj: + first_item = next(iter(obj)) + item_type = type(first_item).__name__ + return f"元素数={length}, 示例元素类型={item_type}" + return f"元素数={length}, 示例元素类型=empty" + + elif isinstance(obj, str): + length = len(obj) + preview = obj[:50] + '...' if length > 50 else obj + return f"长度={length}, 内容='{preview}'" + + elif hasattr(obj, '__name__'): + return f"名称={obj.__name__}" + + elif hasattr(obj, '__dict__'): + attrs_count = len(obj.__dict__) + return f"实例属性数={attrs_count}" + + else: + obj_str = str(obj)[:50] + return f"对象={obj_str}" + + except Exception as e: + return f"无法获取描述:{e}" + + def get_current_memory_info(self) -> dict: + """ + 获取当前内存使用信息 + :return: 内存使用信息字典 + """ + try: + # 获取当前进程内存使用 + current_memory = psutil.Process().memory_info().rss + + # 获取对象统计 + all_objects = muppy.get_objects() + sum1 = summary.summarize(all_objects) + + memory_info = { + "current_memory_mb": round(current_memory / 1024 / 1024, 2), + "object_count": len(all_objects), + "top_objects": [], + "largest_objects": [] + } + + # 解析对象统计信息 + formatted_lines = list(summary.format_(sum1)) + for line in formatted_lines: + parts = line.split('|') + if len(parts) >= 3 and not line.startswith('=') and not line.startswith(' types'): + # 解析格式: "type_name | count | size" + type_name = parts[0].strip() + count_str = parts[1].strip() + size_str = parts[2].strip() + + # 跳过表头和分隔线 + if type_name and count_str.isdigit(): + # 解析大小(可能包含单位) + size_mb = 0 + if 'MB' in size_str: + size_mb = float(size_str.replace('MB', '').strip()) + elif 'KB' in size_str: + size_mb = float(size_str.replace('KB', '').strip()) / 1024 + elif 'B' in size_str: + size_mb = float(size_str.replace('B', '').strip()) / 1024 / 1024 + + memory_info["top_objects"].append({ + "type": type_name, + "count": count_str, + "size_mb": round(size_mb, 2) + }) + + # 只取前10个有效对象 + if len(memory_info["top_objects"]) >= 10: + break + + # 获取最大对象信息 + memory_info["largest_objects"] = self._get_largest_objects(10) + + return memory_info + except Exception as e: + logger.error(f"获取内存信息失败: {e}") + return {"error": str(e)} diff --git a/app/startup/lifecycle.py b/app/startup/lifecycle.py index 1225e381..079d66b2 100644 --- a/app/startup/lifecycle.py +++ b/app/startup/lifecycle.py @@ -6,6 +6,7 @@ from fastapi import FastAPI from app.chain.system import SystemChain from app.core.config import global_vars from app.startup.command_initializer import init_command, stop_command, restart_command +from app.startup.memory_initializer import init_memory_manager, stop_memory_manager from app.startup.modules_initializer import init_modules, stop_modules from app.startup.monitor_initializer import stop_monitor, init_monitor from app.startup.plugins_initializer import init_plugins, stop_plugins, sync_plugins @@ -47,6 +48,8 @@ async def lifespan(app: FastAPI): init_command() # 初始化工作流 init_workflow() + # 初始化内存管理 + init_memory_manager() # 插件同步到本地 sync_plugins_task = asyncio.create_task(init_plugin_system()) try: @@ -64,6 +67,8 @@ async def lifespan(app: FastAPI): pass except Exception as e: print(str(e)) + # 停止内存管理器 + stop_memory_manager() # 停止工作流 stop_workflow() # 停止命令 diff --git a/app/startup/memory_initializer.py b/app/startup/memory_initializer.py new file mode 100644 index 00000000..80719184 --- /dev/null +++ b/app/startup/memory_initializer.py @@ -0,0 +1,15 @@ +from app.helper.memory import MemoryHelper + + +def init_memory_manager(): + """ + 初始化内存监控器 + """ + MemoryHelper().start_monitoring() + + +def stop_memory_manager(): + """ + 停止内存监控器 + """ + MemoryHelper().stop_monitoring() diff --git a/requirements.in b/requirements.in index 099bfd1d..867ecf94 100644 --- a/requirements.in +++ b/requirements.in @@ -70,3 +70,4 @@ cf_clearance~=0.31.0 oss2~=2.19.1 tqdm~=4.67.1 setuptools~=78.1.0 +pympler~=0.9