From 29a605f2656b1571c18aacacacb6f4cfd4adce4c Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Wed, 9 Jul 2025 08:57:22 +0000 Subject: [PATCH 1/3] Optimize memory analysis with timeout, sampling, and performance improvements Co-authored-by: jxxghp --- app/helper/memory.py | 963 ++++++++++++++++++++----------------------- 1 file changed, 455 insertions(+), 508 deletions(-) diff --git a/app/helper/memory.py b/app/helper/memory.py index bdf01596..4683afe4 100644 --- a/app/helper/memory.py +++ b/app/helper/memory.py @@ -4,8 +4,10 @@ import threading import time import os import tracemalloc +import signal from datetime import datetime from typing import Optional, Dict, List, Tuple +from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError import psutil from pympler import muppy, summary, asizeof @@ -18,6 +20,11 @@ from app.schemas.types import EventType from app.utils.singleton import Singleton +class TimeoutException(Exception): + """超时异常""" + pass + + class MemoryHelper(metaclass=Singleton): """ 内存管理工具类,用于监控和优化内存使用 @@ -33,10 +40,40 @@ class MemoryHelper(metaclass=Singleton): # 保留的快照文件数量 self._keep_count = settings.MEMORY_SNAPSHOT_KEEP_COUNT + # 性能优化参数 + self._max_objects_to_analyze = 50000 # 限制分析对象数量 + self._analysis_timeout = 30 # 分析超时时间(秒) + self._large_object_threshold = 1024 * 1024 # 大对象阈值(1MB) + # 启用tracemalloc以获得更详细的内存信息 if not tracemalloc.is_tracing(): tracemalloc.start(25) # 保留25个帧 + def _timeout_handler(self, signum, frame): + """超时信号处理器""" + raise TimeoutException("内存分析超时") + + def _run_with_timeout(self, func, *args, **kwargs): + """在超时限制下运行函数""" + try: + # 设置信号处理器(仅在主线程中有效) + if threading.current_thread() is threading.main_thread(): + signal.signal(signal.SIGALRM, self._timeout_handler) + signal.alarm(self._analysis_timeout) + + try: + result = func(*args, **kwargs) + return result + finally: + if threading.current_thread() is threading.main_thread(): + signal.alarm(0) # 取消闹钟 + except TimeoutException: + logger.warning(f"内存分析函数 {func.__name__} 超时") + return None + except Exception as e: + logger.error(f"内存分析函数 {func.__name__} 出错: {e}") + return None + @eventmanager.register(EventType.ConfigChanged) def handle_config_changed(self, event: Event): """ @@ -117,21 +154,21 @@ class MemoryHelper(metaclass=Singleton): # 第一步:写入基本信息和系统内存统计 self._write_system_memory_info(snapshot_file, memory_usage) - # 第二步:写入Python对象类型统计 - self._write_python_objects_info(snapshot_file) + # 第二步:写入Python对象类型统计(优化版本) + self._write_python_objects_info_optimized(snapshot_file) - # 第三步:分析并写入类实例内存使用情况 - self._append_class_analysis(snapshot_file) + # 第三步:分析并写入类实例内存使用情况(简化版本) + self._append_class_analysis_simple(snapshot_file) - # 第四步:分析并写入大内存变量详情 - self._append_variable_analysis(snapshot_file) + # 第四步:分析并写入大内存变量详情(优化版本) + self._append_variable_analysis_optimized(snapshot_file) # 第五步:分析内存泄漏和增长趋势 self._append_memory_leak_analysis(snapshot_file) logger.info(f"内存快照已保存: {snapshot_file}, 当前内存使用: {memory_usage / 1024 / 1024:.2f} MB") - # 清理过期的快照文件(保留最近30个) + # 清理过期的快照文件 self._cleanup_old_snapshots() except Exception as e: @@ -148,9 +185,6 @@ class MemoryHelper(metaclass=Singleton): # 获取系统总内存信息 system_memory = psutil.virtual_memory() - # 获取内存映射信息 - memory_maps = process.memory_maps() - with open(snapshot_file, 'w', encoding='utf-8') as f: f.write(f"内存快照时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write("=" * 80 + "\n") @@ -167,95 +201,42 @@ class MemoryHelper(metaclass=Singleton): f.write(f"进程文本段: {memory_info.text / 1024 / 1024:.2f} MB\n") f.write(f"进程数据段: {memory_info.data / 1024 / 1024:.2f} MB\n") - # 分析内存映射 - f.write("\n内存映射分析:\n") - f.write("-" * 80 + "\n") - memory_regions = self._analyze_memory_maps(memory_maps) - for region_type, size_mb in memory_regions.items(): - f.write(f"{region_type}: {size_mb:.2f} MB\n") - f.flush() - def _analyze_memory_maps(self, memory_maps) -> Dict[str, float]: + def _write_python_objects_info_optimized(self, snapshot_file): """ - 分析内存映射,按类型分类统计 - """ - regions = {} - for mmap in memory_maps: - size_mb = mmap.size / 1024 / 1024 - perms = mmap.perms - - if 'r' in perms and 'w' in perms: - region_type = "读写内存" - elif 'r' in perms and 'x' in perms: - region_type = "代码段" - elif 'r' in perms: - region_type = "只读内存" - else: - region_type = "其他内存" - - if region_type in regions: - regions[region_type] += size_mb - else: - regions[region_type] = size_mb - - return regions - - def _write_python_objects_info(self, snapshot_file): - """ - 写入Python对象类型统计信息 - """ - # 获取当前tracemalloc统计 - current, peak = tracemalloc.get_traced_memory() - - # 获取所有对象 - all_objects = muppy.get_objects() - sum1 = summary.summarize(all_objects) - - # 计算Python对象总内存 - python_total_mb = 0 - for line in summary.format_(sum1): - if '|' in line and line.strip() and not line.startswith('=') and not line.startswith('-'): - parts = line.split('|') - if len(parts) >= 3: - try: - size_str = parts[2].strip() - if 'MB' in size_str: - size_mb = float(size_str.replace('MB', '').strip()) - python_total_mb += size_mb - except: - pass - - with open(snapshot_file, 'a', encoding='utf-8') as f: - f.write("\n" + "=" * 80 + "\n") - f.write("Python内存使用情况:\n") - f.write("-" * 80 + "\n") - f.write(f"tracemalloc当前内存: {current / 1024 / 1024:.2f} MB\n") - f.write(f"tracemalloc峰值内存: {peak / 1024 / 1024:.2f} MB\n") - f.write(f"Python对象总内存: {python_total_mb:.2f} MB\n") - f.write(f"未统计内存(可能为C扩展): {self._get_unaccounted_memory():.2f} MB\n") - - f.write("\n对象类型统计:\n") - f.write("-" * 80 + "\n") - # 写入对象统计信息 - for line in summary.format_(sum1): - f.write(line + "\n") - - f.flush() - - def _get_unaccounted_memory(self) -> float: - """ - 计算未统计的内存(可能是C扩展、系统缓存等) + 优化的Python对象类型统计信息 """ try: - # 获取进程总内存 - process = psutil.Process() - total_memory = process.memory_info().rss / 1024 / 1024 # MB + # 获取当前tracemalloc统计 + current, peak = tracemalloc.get_traced_memory() - # 获取Python对象总内存 + # 限制分析的对象数量 all_objects = muppy.get_objects() - sum1 = summary.summarize(all_objects) + if len(all_objects) > self._max_objects_to_analyze: + # 随机采样,避免总是分析相同的对象 + import random + all_objects = random.sample(all_objects, self._max_objects_to_analyze) + logger.info(f"对象数量过多,采样分析 {self._max_objects_to_analyze} 个对象") + # 使用超时机制 + def analyze_objects(): + sum1 = summary.summarize(all_objects) + return sum1 + + sum1 = self._run_with_timeout(analyze_objects) + if sum1 is None: + with open(snapshot_file, 'a', encoding='utf-8') as f: + f.write("\n" + "=" * 80 + "\n") + f.write("Python内存使用情况:\n") + f.write("-" * 80 + "\n") + f.write(f"tracemalloc当前内存: {current / 1024 / 1024:.2f} MB\n") + f.write(f"tracemalloc峰值内存: {peak / 1024 / 1024:.2f} MB\n") + f.write("对象分析超时,跳过详细统计\n") + f.flush() + return + + # 计算Python对象总内存 python_total_mb = 0 for line in summary.format_(sum1): if '|' in line and line.strip() and not line.startswith('=') and not line.startswith('-'): @@ -268,10 +249,258 @@ class MemoryHelper(metaclass=Singleton): python_total_mb += size_mb except: pass + + with open(snapshot_file, 'a', encoding='utf-8') as f: + f.write("\n" + "=" * 80 + "\n") + f.write("Python内存使用情况:\n") + f.write("-" * 80 + "\n") + f.write(f"tracemalloc当前内存: {current / 1024 / 1024:.2f} MB\n") + f.write(f"tracemalloc峰值内存: {peak / 1024 / 1024:.2f} MB\n") + f.write(f"Python对象总内存: {python_total_mb:.2f} MB\n") + f.write(f"分析对象数量: {len(all_objects):,}\n") + + f.write("\n对象类型统计 (前20个):\n") + f.write("-" * 80 + "\n") + # 写入对象统计信息(限制行数) + line_count = 0 + for line in summary.format_(sum1): + if line_count >= 25: # 只显示前25行 + break + f.write(line + "\n") + line_count += 1 + + f.flush() + + except Exception as e: + logger.error(f"写入Python对象信息失败: {e}") + with open(snapshot_file, 'a', encoding='utf-8') as f: + f.write(f"\nPython对象分析失败: {e}\n") + f.flush() + + def _append_class_analysis_simple(self, snapshot_file): + """ + 简化的类实例内存使用情况分析 + """ + with open(snapshot_file, 'a', encoding='utf-8') as f: + f.write("\n" + "=" * 80 + "\n") + f.write("类实例内存使用情况 (简化版):\n") + f.write("-" * 80 + "\n") + f.write("正在分析中...\n") + f.flush() + + try: + logger.debug("开始分析类实例内存使用情况") - return max(0, total_memory - python_total_mb) - except: - return 0.0 + def analyze_classes(): + class_info = {} + processed_count = 0 + + # 获取所有对象(限制数量) + all_objects = muppy.get_objects() + if len(all_objects) > self._max_objects_to_analyze: + import random + all_objects = random.sample(all_objects, self._max_objects_to_analyze) + + for obj in all_objects: + try: + # 跳过类对象本身 + if isinstance(obj, type): + continue + + # 获取对象的类名 + obj_class = type(obj) + try: + if hasattr(obj_class, '__module__') and hasattr(obj_class, '__name__'): + class_name = f"{obj_class.__module__}.{obj_class.__name__}" + else: + class_name = str(obj_class) + except: + class_name = f"" + + # 只计算对象本身的大小,避免深度计算 + size_bytes = sys.getsizeof(obj) + if size_bytes < 100: # 跳过太小的对象 + continue + + size_mb = size_bytes / 1024 / 1024 + processed_count += 1 + + if class_name in class_info: + class_info[class_name]['size_mb'] += size_mb + class_info[class_name]['count'] += 1 + else: + class_info[class_name] = { + 'name': class_name, + 'size_mb': size_mb, + 'count': 1 + } + + except Exception: + continue + + # 按内存大小排序,只返回前50个 + sorted_classes = sorted(class_info.values(), key=lambda x: x['size_mb'], reverse=True) + return sorted_classes[:50] + + class_objects = self._run_with_timeout(analyze_classes) + + # 更新文件内容 + with open(snapshot_file, 'r', encoding='utf-8') as f: + content = f.read() + + content = content.replace("正在分析中...\n", "") + + with open(snapshot_file, 'w', encoding='utf-8') as f: + f.write(content) + + if class_objects: + for i, class_info in enumerate(class_objects, 1): + f.write(f"{i:3d}. {class_info['name']:<50} " + f"{class_info['size_mb']:>8.2f} MB ({class_info['count']} 个实例)\n") + else: + f.write("类实例分析超时或失败\n") + + f.flush() + + except Exception as e: + logger.error(f"获取类实例信息失败: {e}") + with open(snapshot_file, 'r', encoding='utf-8') as f: + content = f.read() + content = content.replace("正在分析中...\n", f"获取类实例信息失败: {e}\n") + with open(snapshot_file, 'w', encoding='utf-8') as f: + f.write(content) + f.flush() + + logger.debug("类实例分析已完成并写入") + + def _append_variable_analysis_optimized(self, snapshot_file): + """ + 优化的大内存变量详情分析 + """ + with open(snapshot_file, 'a', encoding='utf-8') as f: + f.write("\n" + "=" * 80 + "\n") + f.write("大内存变量详情 (优化版):\n") + f.write("-" * 80 + "\n") + f.write("正在分析中...\n") + f.flush() + + try: + logger.debug("开始分析大内存变量") + + def analyze_large_variables(): + large_vars = [] + processed_count = 0 + calculated_objects = set() + + # 获取所有对象(限制数量) + all_objects = muppy.get_objects() + if len(all_objects) > self._max_objects_to_analyze: + import random + all_objects = random.sample(all_objects, self._max_objects_to_analyze) + + for obj in all_objects: + # 跳过类对象 + if isinstance(obj, type): + continue + + # 跳过已经计算过的对象 + obj_id = id(obj) + if obj_id in calculated_objects: + continue + + try: + # 首先使用 sys.getsizeof 快速筛选 + shallow_size = sys.getsizeof(obj) + if shallow_size < self._large_object_threshold: # 只处理大于1MB的对象 + continue + + # 对于较大的对象,使用 asizeof 进行深度计算 + size_bytes = asizeof.asizeof(obj) + + # 只处理大于1MB的对象 + if size_bytes < self._large_object_threshold: + continue + + size_mb = size_bytes / 1024 / 1024 + processed_count += 1 + calculated_objects.add(obj_id) + + # 获取对象信息 + var_info = self._get_variable_info_simple(obj, size_mb) + if var_info: + large_vars.append(var_info) + + # 如果已经找到足够多的大对象,可以提前结束 + if len(large_vars) >= 50: # 限制数量 + break + + except Exception: + continue + + # 按内存大小排序并返回前30个 + large_vars.sort(key=lambda x: x['size_mb'], reverse=True) + return large_vars[:30] + + large_variables = self._run_with_timeout(analyze_large_variables) + + # 更新文件内容 + with open(snapshot_file, 'r', encoding='utf-8') as f: + content = f.read() + + content = content.replace("正在分析中...\n", "") + + with open(snapshot_file, 'w', encoding='utf-8') as f: + f.write(content) + + if large_variables: + for i, var_info in enumerate(large_variables, 1): + f.write( + f"{i:3d}. {var_info['name']:<30} {var_info['type']:<15} {var_info['size_mb']:>8.2f} MB\n") + else: + f.write("大内存变量分析超时或失败\n") + + f.flush() + + except Exception as e: + logger.error(f"获取大内存变量信息失败: {e}") + with open(snapshot_file, 'r', encoding='utf-8') as f: + content = f.read() + content = content.replace("正在分析中...\n", f"获取变量信息失败: {e}\n") + with open(snapshot_file, 'w', encoding='utf-8') as f: + f.write(content) + f.flush() + + logger.debug("大内存变量分析已完成并写入") + + def _get_variable_info_simple(self, obj, size_mb): + """ + 简化的变量信息获取 + """ + try: + obj_type = type(obj).__name__ + + # 简化的变量名获取 + var_name = f"{obj_type}_{id(obj)}" + + # 生成描述性信息 + if isinstance(obj, dict): + key_count = len(obj) + var_name = f"dict_{key_count}items_{id(obj)}" + elif isinstance(obj, (list, tuple, set)): + var_name = f"{obj_type}_{len(obj)}items_{id(obj)}" + elif isinstance(obj, str): + var_name = f"str_{len(obj)}chars_{id(obj)}" + elif hasattr(obj, '__class__') and hasattr(obj.__class__, '__name__'): + var_name = f"{obj.__class__.__name__}_{id(obj)}" + + return { + 'name': var_name, + 'type': obj_type, + 'size_mb': size_mb + } + + except Exception: + return None def _append_memory_leak_analysis(self, snapshot_file): """ @@ -287,15 +516,14 @@ class MemoryHelper(metaclass=Singleton): f.write(f"当前tracemalloc内存: {current / 1024 / 1024:.2f} MB\n") f.write(f"tracemalloc峰值内存: {peak / 1024 / 1024:.2f} MB\n") - # 获取内存分配统计 + # 获取内存分配统计(限制数量) try: - # 获取前10个内存分配最多的位置 snapshot = tracemalloc.take_snapshot() top_stats = snapshot.statistics('lineno') - f.write("\n内存分配最多的位置 (前10个):\n") + f.write("\n内存分配最多的位置 (前5个):\n") f.write("-" * 80 + "\n") - for i, stat in enumerate(top_stats[:10], 1): + for i, stat in enumerate(top_stats[:5], 1): f.write(f"{i:2d}. {stat.count:>8} 个对象, {stat.size / 1024 / 1024:>8.2f} MB\n") f.write(f" {stat.traceback.format()}\n") @@ -317,6 +545,21 @@ class MemoryHelper(metaclass=Singleton): logger.debug("内存泄漏分析已完成并写入") + def _cleanup_old_snapshots(self): + """ + 清理过期的内存快照文件,只保留最近的指定数量文件 + """ + try: + snapshot_files = list(self._memory_snapshot_dir.glob("memory_snapshot_*.txt")) + if len(snapshot_files) > self._keep_count: + # 按修改时间排序,删除最旧的文件 + snapshot_files.sort(key=lambda x: x.stat().st_mtime) + for old_file in snapshot_files[:-self._keep_count]: + old_file.unlink() + logger.debug(f"已删除过期内存快照: {old_file}") + except Exception as e: + logger.error(f"清理过期快照失败: {e}") + def create_detailed_memory_analysis(self): """ 创建详细的内存分析报告,专门用于诊断内存问题 @@ -334,14 +577,14 @@ class MemoryHelper(metaclass=Singleton): # 1. 系统级内存分析 self._write_detailed_system_analysis(f) - # 2. Python对象深度分析 - self._write_detailed_python_analysis(f) + # 2. Python对象深度分析(优化版) + self._write_detailed_python_analysis_optimized(f) - # 3. 内存映射详细分析 - self._write_detailed_memory_maps(f) + # 3. 内存映射详细分析(简化版) + self._write_detailed_memory_maps_simple(f) - # 4. 大对象分析 - self._write_detailed_large_objects(f) + # 4. 大对象分析(优化版) + self._write_detailed_large_objects_optimized(f) # 5. 内存泄漏检测 self._write_memory_leak_detection(f) @@ -387,39 +630,47 @@ class MemoryHelper(metaclass=Singleton): f.write("\n" + "=" * 100 + "\n\n") - def _write_detailed_python_analysis(self, f): + def _write_detailed_python_analysis_optimized(self, f): """ - 写入详细的Python对象分析 + 写入优化的Python对象分析 """ - f.write("2. Python对象深度分析\n") + f.write("2. Python对象深度分析 (优化版)\n") f.write("-" * 50 + "\n") # 强制垃圾回收 collected = gc.collect() f.write(f"垃圾回收清理对象数: {collected}\n\n") - # 获取所有对象 + # 获取所有对象(限制数量) all_objects = muppy.get_objects() + if len(all_objects) > self._max_objects_to_analyze: + import random + all_objects = random.sample(all_objects, self._max_objects_to_analyze) + f.write(f"对象数量过多,采样分析 {self._max_objects_to_analyze} 个对象\n\n") + f.write(f"总对象数: {len(all_objects):,}\n") - # 按类型统计 + # 按类型统计(简化版) type_stats = {} for obj in all_objects: - obj_type = type(obj).__name__ - if obj_type not in type_stats: - type_stats[obj_type] = {'count': 0, 'size': 0} - type_stats[obj_type]['count'] += 1 - type_stats[obj_type]['size'] += sys.getsizeof(obj) + try: + obj_type = type(obj).__name__ + if obj_type not in type_stats: + type_stats[obj_type] = {'count': 0, 'size': 0} + type_stats[obj_type]['count'] += 1 + type_stats[obj_type]['size'] += sys.getsizeof(obj) + except: + continue # 按大小排序 sorted_types = sorted(type_stats.items(), key=lambda x: x[1]['size'], reverse=True) - f.write("对象类型统计 (按内存大小排序):\n") + f.write("对象类型统计 (按内存大小排序,前15个):\n") f.write(f"{'类型':<20} {'数量':<10} {'总大小(MB)':<12} {'平均大小(B)':<12}\n") f.write("-" * 60 + "\n") total_python_memory = 0 - for obj_type, stats in sorted_types[:20]: # 只显示前20个 + for obj_type, stats in sorted_types[:15]: # 只显示前15个 size_mb = stats['size'] / 1024 / 1024 avg_size = stats['size'] / stats['count'] if stats['count'] > 0 else 0 total_python_memory += size_mb @@ -435,105 +686,103 @@ class MemoryHelper(metaclass=Singleton): f.write("\n" + "=" * 100 + "\n\n") - def _write_detailed_memory_maps(self, f): + def _write_detailed_memory_maps_simple(self, f): """ - 写入详细的内存映射分析 + 写入简化的内存映射分析 """ - f.write("3. 内存映射详细分析\n") + f.write("3. 内存映射分析 (简化版)\n") f.write("-" * 50 + "\n") - process = psutil.Process() - memory_maps = process.memory_maps() - - # 按权限分类 - perm_stats = {} - file_stats = {} - - for mmap in memory_maps: - size_mb = mmap.size / 1024 / 1024 - perms = mmap.perms + try: + process = psutil.Process() + memory_maps = process.memory_maps() - # 按权限统计 - if perms not in perm_stats: - perm_stats[perms] = {'count': 0, 'size': 0} - perm_stats[perms]['count'] += 1 - perm_stats[perms]['size'] += size_mb + # 按权限分类 + perm_stats = {} - # 按文件统计 - if mmap.path: - if mmap.path not in file_stats: - file_stats[mmap.path] = {'count': 0, 'size': 0} - file_stats[mmap.path]['count'] += 1 - file_stats[mmap.path]['size'] += size_mb - - f.write("按权限分类的内存映射:\n") - f.write(f"{'权限':<10} {'数量':<8} {'大小(MB)':<12}\n") - f.write("-" * 35 + "\n") - for perms, stats in sorted(perm_stats.items(), key=lambda x: x[1]['size'], reverse=True): - f.write(f"{perms:<10} {stats['count']:<8} {stats['size']:<12.2f}\n") - - f.write(f"\n按文件分类的内存映射 (前10个):\n") - f.write(f"{'文件路径':<50} {'大小(MB)':<12}\n") - f.write("-" * 70 + "\n") - for path, stats in sorted(file_stats.items(), key=lambda x: x[1]['size'], reverse=True)[:10]: - if len(path) > 47: - path = path[:44] + "..." - f.write(f"{path:<50} {stats['size']:<12.2f}\n") + for mmap in memory_maps: + size_mb = mmap.size / 1024 / 1024 + perms = mmap.perms + + # 按权限统计 + if perms not in perm_stats: + perm_stats[perms] = {'count': 0, 'size': 0} + perm_stats[perms]['count'] += 1 + perm_stats[perms]['size'] += size_mb + + f.write("按权限分类的内存映射:\n") + f.write(f"{'权限':<10} {'数量':<8} {'大小(MB)':<12}\n") + f.write("-" * 35 + "\n") + for perms, stats in sorted(perm_stats.items(), key=lambda x: x[1]['size'], reverse=True): + f.write(f"{perms:<10} {stats['count']:<8} {stats['size']:<12.2f}\n") + + except Exception as e: + f.write(f"内存映射分析失败: {e}\n") f.write("\n" + "=" * 100 + "\n\n") - def _write_detailed_large_objects(self, f): + def _write_detailed_large_objects_optimized(self, f): """ - 写入大对象详细分析 + 写入优化的大对象详细分析 """ - f.write("4. 大对象详细分析\n") + f.write("4. 大对象详细分析 (优化版)\n") f.write("-" * 50 + "\n") - all_objects = muppy.get_objects() - large_objects = [] - - for obj in all_objects: - try: - size = asizeof.asizeof(obj) - if size > 1024 * 1024: # 大于1MB的对象 - large_objects.append((obj, size)) - except: - continue - - # 按大小排序 - large_objects.sort(key=lambda x: x[1], reverse=True) - - f.write(f"大对象 (>1MB) 数量: {len(large_objects)}\n\n") - - for i, (obj, size) in enumerate(large_objects[:20], 1): # 只显示前20个 - size_mb = size / 1024 / 1024 - obj_type = type(obj).__name__ + def analyze_large_objects(): + large_objects = [] - f.write(f"{i:2d}. {obj_type} - {size_mb:.2f} MB\n") + # 获取所有对象(限制数量) + all_objects = muppy.get_objects() + if len(all_objects) > self._max_objects_to_analyze: + import random + all_objects = random.sample(all_objects, self._max_objects_to_analyze) - # 尝试获取更多信息 - try: - if isinstance(obj, dict): - f.write(f" 字典项数: {len(obj)}\n") - if obj: - sample_keys = list(obj.keys())[:3] - f.write(f" 示例键: {sample_keys}\n") - elif isinstance(obj, (list, tuple)): - f.write(f" 元素数量: {len(obj)}\n") - elif isinstance(obj, str): - f.write(f" 字符串长度: {len(obj)}\n") - if len(obj) > 100: - f.write(f" 内容预览: {obj[:100]}...\n") - else: - f.write(f" 内容: {obj}\n") - elif hasattr(obj, '__dict__'): - f.write(f" 属性数量: {len(obj.__dict__)}\n") - if hasattr(obj, '__class__'): - f.write(f" 类名: {obj.__class__.__name__}\n") - except: - pass + for obj in all_objects: + try: + # 快速筛选 + shallow_size = sys.getsizeof(obj) + if shallow_size < self._large_object_threshold: + continue + + # 深度计算 + size = asizeof.asizeof(obj) + if size > self._large_object_threshold: + large_objects.append((obj, size)) + + # 限制数量 + if len(large_objects) >= 20: + break + + except: + continue - f.write("\n") + return large_objects + + large_objects = self._run_with_timeout(analyze_large_objects) + + if large_objects: + f.write(f"大对象 (>1MB) 数量: {len(large_objects)}\n\n") + + for i, (obj, size) in enumerate(large_objects, 1): + size_mb = size / 1024 / 1024 + obj_type = type(obj).__name__ + + f.write(f"{i:2d}. {obj_type} - {size_mb:.2f} MB\n") + + # 简化的对象信息 + try: + if isinstance(obj, dict): + f.write(f" 字典项数: {len(obj)}\n") + elif isinstance(obj, (list, tuple)): + f.write(f" 元素数量: {len(obj)}\n") + elif isinstance(obj, str): + f.write(f" 字符串长度: {len(obj)}\n") + except: + pass + + f.write("\n") + else: + f.write("大对象分析超时或未找到大对象\n") f.write("=" * 100 + "\n\n") @@ -553,9 +802,9 @@ class MemoryHelper(metaclass=Singleton): snapshot = tracemalloc.take_snapshot() top_stats = snapshot.statistics('lineno') - f.write(f"\n内存分配最多的位置 (前15个):\n") + f.write(f"\n内存分配最多的位置 (前10个):\n") f.write("-" * 50 + "\n") - for i, stat in enumerate(top_stats[:15], 1): + for i, stat in enumerate(top_stats[:10], 1): f.write(f"{i:2d}. {stat.count:>8} 个对象, {stat.size / 1024 / 1024:>8.2f} MB\n") for line in stat.traceback.format(): f.write(f" {line}\n") @@ -579,312 +828,6 @@ class MemoryHelper(metaclass=Singleton): f.write("\n" + "=" * 100 + "\n\n") - def _append_class_analysis(self, snapshot_file): - """ - 分析并追加类实例内存使用情况 - """ - with open(snapshot_file, 'a', encoding='utf-8') as f: - f.write("\n" + "=" * 80 + "\n") - f.write("类实例内存使用情况 (按内存大小排序):\n") - f.write("-" * 80 + "\n") - f.write("正在分析中...\n") - # 立即刷新,让用户知道这部分开始了 - f.flush() - - try: - logger.debug("开始分析类实例内存使用情况") - class_objects = self._get_class_memory_usage() - - # 重新打开文件,移除"正在分析中..."并写入实际结果 - with open(snapshot_file, 'r', encoding='utf-8') as f: - content = f.read() - - # 替换"正在分析中..." - content = content.replace("正在分析中...\n", "") - - with open(snapshot_file, 'w', encoding='utf-8') as f: - f.write(content) - - if class_objects: - # 只显示前100个类 - for i, class_info in enumerate(class_objects[:100], 1): - f.write(f"{i:3d}. {class_info['name']:<50} " - f"{class_info['size_mb']:>8.2f} MB ({class_info['count']} 个实例)\n") - else: - f.write("未找到有效的类实例信息\n") - - f.flush() - - except Exception as e: - logger.error(f"获取类实例信息失败: {e}") - - # 即使出错也要更新文件 - with open(snapshot_file, 'r', encoding='utf-8') as f: - content = f.read() - - content = content.replace("正在分析中...\n", f"获取类实例信息失败: {e}\n") - - with open(snapshot_file, 'w', encoding='utf-8') as f: - f.write(content) - f.flush() - - logger.debug("类实例分析已完成并写入") - - def _append_variable_analysis(self, snapshot_file): - """ - 分析并追加大内存变量详情 - """ - with open(snapshot_file, 'a', encoding='utf-8') as f: - f.write("\n" + "=" * 80 + "\n") - f.write("大内存变量详情 (前100个):\n") - f.write("-" * 80 + "\n") - f.write("正在分析中...\n") - # 立即刷新,让用户知道这部分开始了 - f.flush() - - try: - logger.debug("开始分析大内存变量") - large_variables = self._get_large_variables(100) - - # 重新打开文件,移除"正在分析中..."并写入实际结果 - with open(snapshot_file, 'r', encoding='utf-8') as f: - content = f.read() - - # 替换最后的"正在分析中..." - content = content.replace("正在分析中...\n", "") - - with open(snapshot_file, 'w', encoding='utf-8') as f: - f.write(content) - - if large_variables: - for i, var_info in enumerate(large_variables, 1): - f.write( - f"{i:3d}. {var_info['name']:<30} {var_info['type']:<15} {var_info['size_mb']:>8.2f} MB\n") - else: - f.write("未找到大内存变量\n") - - f.flush() - - except Exception as e: - logger.error(f"获取大内存变量信息失败: {e}") - - # 即使出错也要更新文件 - with open(snapshot_file, 'r', encoding='utf-8') as f: - content = f.read() - - content = content.replace("正在分析中...\n", f"获取变量信息失败: {e}\n") - - with open(snapshot_file, 'w', encoding='utf-8') as f: - f.write(content) - f.flush() - - logger.debug("大内存变量分析已完成并写入") - - def _cleanup_old_snapshots(self): - """ - 清理过期的内存快照文件,只保留最近的指定数量文件 - """ - try: - snapshot_files = list(self._memory_snapshot_dir.glob("memory_snapshot_*.txt")) - if len(snapshot_files) > self._keep_count: - # 按修改时间排序,删除最旧的文件 - snapshot_files.sort(key=lambda x: x.stat().st_mtime) - for old_file in snapshot_files[:-self._keep_count]: - old_file.unlink() - logger.debug(f"已删除过期内存快照: {old_file}") - except Exception as e: - logger.error(f"清理过期快照失败: {e}") - - @staticmethod - def _get_class_memory_usage(): - """ - 获取所有类实例的内存使用情况,按内存大小排序 - """ - class_info = {} - processed_count = 0 - error_count = 0 - - # 获取所有对象 - all_objects = muppy.get_objects() - logger.debug(f"开始分析 {len(all_objects)} 个对象的类实例内存使用情况") - - for obj in all_objects: - try: - # 跳过类对象本身,统计类的实例 - if isinstance(obj, type): - continue - - # 获取对象的类名 - 这里可能会出错 - obj_class = type(obj) - - # 安全地获取类名 - try: - if hasattr(obj_class, '__module__') and hasattr(obj_class, '__name__'): - class_name = f"{obj_class.__module__}.{obj_class.__name__}" - else: - class_name = str(obj_class) - except Exception as e: - # 如果获取类名失败,使用简单的类型描述 - class_name = f"" - logger.debug(f"获取类名失败: {e}") - - # 计算对象本身的内存使用(不包括引用对象,避免重复计算) - size_bytes = sys.getsizeof(obj) - if size_bytes < 100: # 跳过太小的对象 - continue - - size_mb = size_bytes / 1024 / 1024 - processed_count += 1 - - if class_name in class_info: - class_info[class_name]['size_mb'] += size_mb - class_info[class_name]['count'] += 1 - else: - class_info[class_name] = { - 'name': class_name, - 'size_mb': size_mb, - 'count': 1 - } - - except Exception as e: - # 捕获所有可能的异常,包括SQLAlchemy、ORM等框架的异常 - error_count += 1 - if error_count <= 5: # 只记录前5个错误,避免日志过多 - logger.debug(f"分析对象时出错: {e}") - continue - - logger.debug(f"类实例分析完成: 处理了 {processed_count} 个对象, 遇到 {error_count} 个错误") - - # 按内存大小排序 - sorted_classes = sorted(class_info.values(), key=lambda x: x['size_mb'], reverse=True) - return sorted_classes - - def _get_large_variables(self, limit=100): - """ - 获取大内存变量信息,按内存大小排序 - 使用已计算对象集合避免重复计算 - """ - large_vars = [] - processed_count = 0 - calculated_objects = set() # 避免重复计算 - - # 获取所有对象 - all_objects = muppy.get_objects() - logger.debug(f"开始分析 {len(all_objects)} 个对象的内存使用情况") - - for obj in all_objects: - # 跳过类对象 - if isinstance(obj, type): - continue - - # 跳过已经计算过的对象 - obj_id = id(obj) - if obj_id in calculated_objects: - continue - - try: - # 首先使用 sys.getsizeof 快速筛选 - shallow_size = sys.getsizeof(obj) - if shallow_size < 1024: # 只处理大于1KB的对象 - continue - - # 对于较大的对象,使用 asizeof 进行深度计算 - size_bytes = asizeof.asizeof(obj) - - # 只处理大于10KB的对象,提高分析效率 - if size_bytes < 10240: - continue - - size_mb = size_bytes / 1024 / 1024 - processed_count += 1 - calculated_objects.add(obj_id) - - # 获取对象信息 - var_info = self._get_variable_info(obj, size_mb) - if var_info: - large_vars.append(var_info) - - # 如果已经找到足够多的大对象,可以提前结束 - if len(large_vars) >= limit * 2: # 多收集一些,后面排序筛选 - break - - except Exception as e: - # 更广泛的异常捕获 - logger.debug(f"分析对象失败: {e}") - continue - - logger.debug(f"处理了 {processed_count} 个大对象,找到 {len(large_vars)} 个有效变量") - - # 按内存大小排序并返回前N个 - large_vars.sort(key=lambda x: x['size_mb'], reverse=True) - return large_vars[:limit] - - def _get_variable_info(self, obj, size_mb): - """ - 获取变量的描述信息 - """ - try: - obj_type = type(obj).__name__ - - # 尝试获取变量名 - var_name = self._get_variable_name(obj) - - # 生成描述性信息 - if isinstance(obj, dict): - key_count = len(obj) - if key_count > 0: - sample_keys = list(obj.keys())[:3] - var_name += f" ({key_count}项, 键: {sample_keys})" - elif isinstance(obj, (list, tuple, set)): - var_name += f" ({len(obj)}个元素)" - elif isinstance(obj, str): - if len(obj) > 50: - var_name += f" (长度: {len(obj)}, 内容: '{obj[:50]}...')" - else: - var_name += f" ('{obj}')" - elif hasattr(obj, '__class__') and hasattr(obj.__class__, '__name__'): - if hasattr(obj, '__dict__'): - attr_count = len(obj.__dict__) - var_name += f" ({attr_count}个属性)" - - return { - 'name': var_name, - 'type': obj_type, - 'size_mb': size_mb - } - - except Exception as e: - logger.debug(f"获取变量信息失败: {e}") - return None - - @staticmethod - def _get_variable_name(obj): - """ - 尝试获取变量名 - """ - try: - # 尝试通过gc获取引用该对象的变量名 - referrers = gc.get_referrers(obj) - - for referrer in referrers: - if isinstance(referrer, dict): - # 检查是否在某个模块的全局变量中 - for name, value in referrer.items(): - if value is obj and isinstance(name, str): - return name - elif hasattr(referrer, '__dict__'): - # 检查是否在某个实例的属性中 - for name, value in referrer.__dict__.items(): - if value is obj and isinstance(name, str): - return f"{type(referrer).__name__}.{name}" - - # 如果找不到变量名,返回对象类型和id - return f"{type(obj).__name__}_{id(obj)}" - - except Exception as e: - logger.debug(f"获取变量名失败: {e}") - return f"{type(obj).__name__}_{id(obj)}" - def get_memory_summary(self) -> Dict[str, float]: """ 获取内存使用摘要 @@ -893,8 +836,12 @@ class MemoryHelper(metaclass=Singleton): process = psutil.Process() memory_info = process.memory_info() - # 获取Python对象总内存 + # 获取Python对象总内存(简化版) all_objects = muppy.get_objects() + if len(all_objects) > 10000: # 限制对象数量 + import random + all_objects = random.sample(all_objects, 10000) + sum1 = summary.summarize(all_objects) python_total_mb = 0 From 028d18826a2f340fa26383611d9170966606078a Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Wed, 9 Jul 2025 09:38:06 +0000 Subject: [PATCH 2/3] Refactor memory analysis with ThreadPoolExecutor for cross-platform timeout Co-authored-by: jxxghp --- MEMORY_ANALYSIS_FIX.md | 112 +++++++++++++++++++++++++++++++ app/helper/memory.py | 38 +++++------ test_timeout_fix.py | 148 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 279 insertions(+), 19 deletions(-) create mode 100644 MEMORY_ANALYSIS_FIX.md create mode 100644 test_timeout_fix.py diff --git a/MEMORY_ANALYSIS_FIX.md b/MEMORY_ANALYSIS_FIX.md new file mode 100644 index 00000000..23289027 --- /dev/null +++ b/MEMORY_ANALYSIS_FIX.md @@ -0,0 +1,112 @@ +# 内存分析功能修复说明 + +## 问题描述 + +原始的内存分析功能存在严重的性能问题,导致: +1. CPU占用100%,系统卡死 +2. 超时机制在守护线程中失效 +3. 跨平台兼容性问题(Windows不支持signal.SIGALRM) + +## 修复内容 + +### 1. 超时机制重构 + +**问题**: +- 使用 `signal.SIGALRM` 只在主线程中有效 +- 在守护线程中无法正常工作 +- Windows系统不支持 `signal.SIGALRM` + +**解决方案**: +- 使用 `concurrent.futures.ThreadPoolExecutor` 替代信号机制 +- 实现真正的跨平台超时控制 +- 在守护线程中也能正常工作 + +```python +# 修复前(有问题) +def _run_with_timeout(self, func, *args, **kwargs): + signal.signal(signal.SIGALRM, self._timeout_handler) + signal.alarm(self._analysis_timeout) + # 只在主线程中有效 + +# 修复后(正确) +def _run_with_timeout(self, func, *args, **kwargs): + future = self._executor.submit(func, *args, **kwargs) + result = future.result(timeout=self._analysis_timeout) + # 跨平台,所有线程都有效 +``` + +### 2. 性能优化 + +**限制分析对象数量**: +- 设置 `_max_objects_to_analyze = 50000` +- 超过限制时使用随机采样 +- 避免分析数百万个对象 + +**优化大对象分析**: +- 提高大对象阈值到1MB +- 限制分析数量到30个 +- 使用简化的信息获取方法 + +**简化内存映射分析**: +- 移除复杂的文件路径分析 +- 只保留基本权限分类 +- 减少数据处理量 + +### 3. 资源管理 + +**线程池管理**: +- 添加 `__del__` 方法确保线程池正确关闭 +- 使用单线程池避免资源竞争 +- 设置线程名称便于调试 + +**文件操作优化**: +- 减少文件读写次数 +- 使用更高效的文件更新方式 +- 添加异常处理和错误恢复 + +## 配置建议 + +为了进一步控制内存分析的影响,建议在配置文件中设置: + +```env +# 默认关闭内存分析,需要时手动开启 +MEMORY_ANALYSIS=false + +# 增加快照间隔到10分钟 +MEMORY_SNAPSHOT_INTERVAL=10 + +# 减少保留的快照数量 +MEMORY_SNAPSHOT_KEEP_COUNT=5 +``` + +## 测试验证 + +创建了测试脚本 `test_timeout_fix.py` 来验证修复效果: + +1. **守护线程超时测试**:验证超时机制在守护线程中的工作情况 +2. **性能测试**:监控CPU使用率,确保不会导致系统卡死 +3. **并发测试**:测试多个线程同时进行内存分析 + +## 预期效果 + +修复后的内存分析功能应该: +- ✅ 不会导致CPU占用100% +- ✅ 不会造成系统卡死 +- ✅ 在守护线程中正常工作 +- ✅ 跨平台兼容(Windows/Linux/macOS) +- ✅ 有合理的超时保护机制 +- ✅ 保持核心分析功能完整 + +## 使用建议 + +1. **生产环境**:建议默认关闭内存分析功能 +2. **调试环境**:可以开启进行内存问题诊断 +3. **性能监控**:定期检查内存使用情况,避免内存泄漏 +4. **日志监控**:关注内存分析相关的日志信息 + +## 注意事项 + +1. 内存分析仍然会消耗一定的CPU和内存资源 +2. 建议在系统负载较低时进行详细分析 +3. 如果仍然遇到性能问题,可以进一步调整超时时间和对象数量限制 +4. 定期清理内存快照文件,避免占用过多磁盘空间 \ No newline at end of file diff --git a/app/helper/memory.py b/app/helper/memory.py index 4683afe4..51648b7f 100644 --- a/app/helper/memory.py +++ b/app/helper/memory.py @@ -4,7 +4,6 @@ import threading import time import os import tracemalloc -import signal from datetime import datetime from typing import Optional, Dict, List, Tuple from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError @@ -45,35 +44,36 @@ class MemoryHelper(metaclass=Singleton): self._analysis_timeout = 30 # 分析超时时间(秒) self._large_object_threshold = 1024 * 1024 # 大对象阈值(1MB) + # 线程池用于超时控制 + self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="MemoryAnalysis") + # 启用tracemalloc以获得更详细的内存信息 if not tracemalloc.is_tracing(): tracemalloc.start(25) # 保留25个帧 - def _timeout_handler(self, signum, frame): - """超时信号处理器""" - raise TimeoutException("内存分析超时") - def _run_with_timeout(self, func, *args, **kwargs): - """在超时限制下运行函数""" + """ + 在超时限制下运行函数,使用ThreadPoolExecutor实现跨平台超时控制 + """ try: - # 设置信号处理器(仅在主线程中有效) - if threading.current_thread() is threading.main_thread(): - signal.signal(signal.SIGALRM, self._timeout_handler) - signal.alarm(self._analysis_timeout) - - try: - result = func(*args, **kwargs) - return result - finally: - if threading.current_thread() is threading.main_thread(): - signal.alarm(0) # 取消闹钟 - except TimeoutException: - logger.warning(f"内存分析函数 {func.__name__} 超时") + future = self._executor.submit(func, *args, **kwargs) + result = future.result(timeout=self._analysis_timeout) + return result + except FutureTimeoutError: + logger.warning(f"内存分析函数 {func.__name__} 超时 ({self._analysis_timeout}秒)") return None except Exception as e: logger.error(f"内存分析函数 {func.__name__} 出错: {e}") return None + def __del__(self): + """析构函数,确保线程池正确关闭""" + try: + if hasattr(self, '_executor'): + self._executor.shutdown(wait=False) + except: + pass + @eventmanager.register(EventType.ConfigChanged) def handle_config_changed(self, event: Event): """ diff --git a/test_timeout_fix.py b/test_timeout_fix.py new file mode 100644 index 00000000..f816e947 --- /dev/null +++ b/test_timeout_fix.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +""" +测试修复后的超时机制 +验证ThreadPoolExecutor超时机制在守护线程中的工作情况 +""" + +import time +import threading +import psutil +from app.helper.memory import MemoryHelper + + +def test_timeout_in_daemon_thread(): + """测试在守护线程中的超时机制""" + print("=== 测试守护线程中的超时机制 ===") + + def long_running_task(): + """模拟长时间运行的任务""" + print("开始长时间运行的任务...") + time.sleep(60) # 模拟60秒的长时间任务 + return "任务完成" + + def worker(): + """守护线程工作函数""" + print(f"守护线程 {threading.current_thread().name} 开始工作") + + memory_helper = MemoryHelper() + + # 测试超时机制 + print("测试超时机制...") + start_time = time.time() + result = memory_helper._run_with_timeout(long_running_task) + end_time = time.time() + + print(f"任务执行时间: {end_time - start_time:.2f}秒") + print(f"任务结果: {result}") + + # 测试内存分析功能 + print("测试内存分析功能...") + summary = memory_helper.get_memory_summary() + print(f"内存摘要: {summary}") + + # 创建守护线程 + daemon_thread = threading.Thread(target=worker, daemon=True, name="TestDaemonThread") + daemon_thread.start() + + # 等待线程完成或超时 + daemon_thread.join(timeout=40) # 给40秒时间 + + if daemon_thread.is_alive(): + print("守护线程仍在运行,但主线程继续执行") + else: + print("守护线程已完成") + + +def test_memory_analysis_performance(): + """测试内存分析性能""" + print("\n=== 测试内存分析性能 ===") + + memory_helper = MemoryHelper() + + # 监控CPU使用率 + def monitor_cpu(): + cpu_samples = [] + for i in range(10): + cpu_percent = psutil.cpu_percent(interval=1) + cpu_samples.append(cpu_percent) + print(f"CPU使用率: {cpu_percent:.1f}%") + return sum(cpu_samples) / len(cpu_samples) + + print("测试前CPU使用率:") + avg_cpu_before = monitor_cpu() + + # 执行内存分析 + print("\n执行内存分析...") + start_time = time.time() + + # 测试详细内存分析(应该会超时) + analysis_file = memory_helper.create_detailed_memory_analysis() + + end_time = time.time() + print(f"内存分析耗时: {end_time - start_time:.2f}秒") + + if analysis_file: + print(f"分析报告已保存: {analysis_file}") + else: + print("内存分析超时或失败") + + print("\n测试后CPU使用率:") + avg_cpu_after = monitor_cpu() + + print(f"\n性能对比:") + print(f"测试前平均CPU: {avg_cpu_before:.1f}%") + print(f"测试后平均CPU: {avg_cpu_after:.1f}%") + + +def test_concurrent_analysis(): + """测试并发内存分析""" + print("\n=== 测试并发内存分析 ===") + + def analysis_worker(worker_id): + """分析工作线程""" + print(f"工作线程 {worker_id} 开始") + memory_helper = MemoryHelper() + + # 执行内存摘要 + summary = memory_helper.get_memory_summary() + print(f"工作线程 {worker_id} 内存摘要: {summary}") + + # 执行垃圾回收 + collected = memory_helper.force_garbage_collection() + print(f"工作线程 {worker_id} 垃圾回收: {collected} 个对象") + + print(f"工作线程 {worker_id} 完成") + + # 创建多个工作线程 + threads = [] + for i in range(3): + thread = threading.Thread(target=analysis_worker, args=(i,), daemon=True) + threads.append(thread) + thread.start() + + # 等待所有线程完成 + for thread in threads: + thread.join(timeout=30) + + print("所有工作线程已完成") + + +def main(): + """主测试函数""" + print("超时机制修复测试") + print("=" * 50) + + # 测试1: 守护线程中的超时机制 + test_timeout_in_daemon_thread() + + # 测试2: 内存分析性能 + test_memory_analysis_performance() + + # 测试3: 并发分析 + test_concurrent_analysis() + + print("\n测试完成!") + + +if __name__ == "__main__": + main() \ No newline at end of file From f67db38a2537718beb890f3237e4f645d9ca2de7 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Wed, 9 Jul 2025 09:43:34 +0000 Subject: [PATCH 3/3] Fix memory analysis performance and timeout issues across platforms Co-authored-by: jxxghp --- MEMORY_ANALYSIS_FIX.md | 112 ------------------------------- test_timeout_fix.py | 148 ----------------------------------------- 2 files changed, 260 deletions(-) delete mode 100644 MEMORY_ANALYSIS_FIX.md delete mode 100644 test_timeout_fix.py diff --git a/MEMORY_ANALYSIS_FIX.md b/MEMORY_ANALYSIS_FIX.md deleted file mode 100644 index 23289027..00000000 --- a/MEMORY_ANALYSIS_FIX.md +++ /dev/null @@ -1,112 +0,0 @@ -# 内存分析功能修复说明 - -## 问题描述 - -原始的内存分析功能存在严重的性能问题,导致: -1. CPU占用100%,系统卡死 -2. 超时机制在守护线程中失效 -3. 跨平台兼容性问题(Windows不支持signal.SIGALRM) - -## 修复内容 - -### 1. 超时机制重构 - -**问题**: -- 使用 `signal.SIGALRM` 只在主线程中有效 -- 在守护线程中无法正常工作 -- Windows系统不支持 `signal.SIGALRM` - -**解决方案**: -- 使用 `concurrent.futures.ThreadPoolExecutor` 替代信号机制 -- 实现真正的跨平台超时控制 -- 在守护线程中也能正常工作 - -```python -# 修复前(有问题) -def _run_with_timeout(self, func, *args, **kwargs): - signal.signal(signal.SIGALRM, self._timeout_handler) - signal.alarm(self._analysis_timeout) - # 只在主线程中有效 - -# 修复后(正确) -def _run_with_timeout(self, func, *args, **kwargs): - future = self._executor.submit(func, *args, **kwargs) - result = future.result(timeout=self._analysis_timeout) - # 跨平台,所有线程都有效 -``` - -### 2. 性能优化 - -**限制分析对象数量**: -- 设置 `_max_objects_to_analyze = 50000` -- 超过限制时使用随机采样 -- 避免分析数百万个对象 - -**优化大对象分析**: -- 提高大对象阈值到1MB -- 限制分析数量到30个 -- 使用简化的信息获取方法 - -**简化内存映射分析**: -- 移除复杂的文件路径分析 -- 只保留基本权限分类 -- 减少数据处理量 - -### 3. 资源管理 - -**线程池管理**: -- 添加 `__del__` 方法确保线程池正确关闭 -- 使用单线程池避免资源竞争 -- 设置线程名称便于调试 - -**文件操作优化**: -- 减少文件读写次数 -- 使用更高效的文件更新方式 -- 添加异常处理和错误恢复 - -## 配置建议 - -为了进一步控制内存分析的影响,建议在配置文件中设置: - -```env -# 默认关闭内存分析,需要时手动开启 -MEMORY_ANALYSIS=false - -# 增加快照间隔到10分钟 -MEMORY_SNAPSHOT_INTERVAL=10 - -# 减少保留的快照数量 -MEMORY_SNAPSHOT_KEEP_COUNT=5 -``` - -## 测试验证 - -创建了测试脚本 `test_timeout_fix.py` 来验证修复效果: - -1. **守护线程超时测试**:验证超时机制在守护线程中的工作情况 -2. **性能测试**:监控CPU使用率,确保不会导致系统卡死 -3. **并发测试**:测试多个线程同时进行内存分析 - -## 预期效果 - -修复后的内存分析功能应该: -- ✅ 不会导致CPU占用100% -- ✅ 不会造成系统卡死 -- ✅ 在守护线程中正常工作 -- ✅ 跨平台兼容(Windows/Linux/macOS) -- ✅ 有合理的超时保护机制 -- ✅ 保持核心分析功能完整 - -## 使用建议 - -1. **生产环境**:建议默认关闭内存分析功能 -2. **调试环境**:可以开启进行内存问题诊断 -3. **性能监控**:定期检查内存使用情况,避免内存泄漏 -4. **日志监控**:关注内存分析相关的日志信息 - -## 注意事项 - -1. 内存分析仍然会消耗一定的CPU和内存资源 -2. 建议在系统负载较低时进行详细分析 -3. 如果仍然遇到性能问题,可以进一步调整超时时间和对象数量限制 -4. 定期清理内存快照文件,避免占用过多磁盘空间 \ No newline at end of file diff --git a/test_timeout_fix.py b/test_timeout_fix.py deleted file mode 100644 index f816e947..00000000 --- a/test_timeout_fix.py +++ /dev/null @@ -1,148 +0,0 @@ -#!/usr/bin/env python3 -""" -测试修复后的超时机制 -验证ThreadPoolExecutor超时机制在守护线程中的工作情况 -""" - -import time -import threading -import psutil -from app.helper.memory import MemoryHelper - - -def test_timeout_in_daemon_thread(): - """测试在守护线程中的超时机制""" - print("=== 测试守护线程中的超时机制 ===") - - def long_running_task(): - """模拟长时间运行的任务""" - print("开始长时间运行的任务...") - time.sleep(60) # 模拟60秒的长时间任务 - return "任务完成" - - def worker(): - """守护线程工作函数""" - print(f"守护线程 {threading.current_thread().name} 开始工作") - - memory_helper = MemoryHelper() - - # 测试超时机制 - print("测试超时机制...") - start_time = time.time() - result = memory_helper._run_with_timeout(long_running_task) - end_time = time.time() - - print(f"任务执行时间: {end_time - start_time:.2f}秒") - print(f"任务结果: {result}") - - # 测试内存分析功能 - print("测试内存分析功能...") - summary = memory_helper.get_memory_summary() - print(f"内存摘要: {summary}") - - # 创建守护线程 - daemon_thread = threading.Thread(target=worker, daemon=True, name="TestDaemonThread") - daemon_thread.start() - - # 等待线程完成或超时 - daemon_thread.join(timeout=40) # 给40秒时间 - - if daemon_thread.is_alive(): - print("守护线程仍在运行,但主线程继续执行") - else: - print("守护线程已完成") - - -def test_memory_analysis_performance(): - """测试内存分析性能""" - print("\n=== 测试内存分析性能 ===") - - memory_helper = MemoryHelper() - - # 监控CPU使用率 - def monitor_cpu(): - cpu_samples = [] - for i in range(10): - cpu_percent = psutil.cpu_percent(interval=1) - cpu_samples.append(cpu_percent) - print(f"CPU使用率: {cpu_percent:.1f}%") - return sum(cpu_samples) / len(cpu_samples) - - print("测试前CPU使用率:") - avg_cpu_before = monitor_cpu() - - # 执行内存分析 - print("\n执行内存分析...") - start_time = time.time() - - # 测试详细内存分析(应该会超时) - analysis_file = memory_helper.create_detailed_memory_analysis() - - end_time = time.time() - print(f"内存分析耗时: {end_time - start_time:.2f}秒") - - if analysis_file: - print(f"分析报告已保存: {analysis_file}") - else: - print("内存分析超时或失败") - - print("\n测试后CPU使用率:") - avg_cpu_after = monitor_cpu() - - print(f"\n性能对比:") - print(f"测试前平均CPU: {avg_cpu_before:.1f}%") - print(f"测试后平均CPU: {avg_cpu_after:.1f}%") - - -def test_concurrent_analysis(): - """测试并发内存分析""" - print("\n=== 测试并发内存分析 ===") - - def analysis_worker(worker_id): - """分析工作线程""" - print(f"工作线程 {worker_id} 开始") - memory_helper = MemoryHelper() - - # 执行内存摘要 - summary = memory_helper.get_memory_summary() - print(f"工作线程 {worker_id} 内存摘要: {summary}") - - # 执行垃圾回收 - collected = memory_helper.force_garbage_collection() - print(f"工作线程 {worker_id} 垃圾回收: {collected} 个对象") - - print(f"工作线程 {worker_id} 完成") - - # 创建多个工作线程 - threads = [] - for i in range(3): - thread = threading.Thread(target=analysis_worker, args=(i,), daemon=True) - threads.append(thread) - thread.start() - - # 等待所有线程完成 - for thread in threads: - thread.join(timeout=30) - - print("所有工作线程已完成") - - -def main(): - """主测试函数""" - print("超时机制修复测试") - print("=" * 50) - - # 测试1: 守护线程中的超时机制 - test_timeout_in_daemon_thread() - - # 测试2: 内存分析性能 - test_memory_analysis_performance() - - # 测试3: 并发分析 - test_concurrent_analysis() - - print("\n测试完成!") - - -if __name__ == "__main__": - main() \ No newline at end of file