diff --git a/app/helper/memory.py b/app/helper/memory.py index bdf01596..51648b7f 100644 --- a/app/helper/memory.py +++ b/app/helper/memory.py @@ -6,6 +6,7 @@ import os import tracemalloc from datetime import datetime from typing import Optional, Dict, List, Tuple +from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError import psutil from pympler import muppy, summary, asizeof @@ -18,6 +19,11 @@ from app.schemas.types import EventType from app.utils.singleton import Singleton +class TimeoutException(Exception): + """超时异常""" + pass + + class MemoryHelper(metaclass=Singleton): """ 内存管理工具类,用于监控和优化内存使用 @@ -33,10 +39,41 @@ class MemoryHelper(metaclass=Singleton): # 保留的快照文件数量 self._keep_count = settings.MEMORY_SNAPSHOT_KEEP_COUNT + # 性能优化参数 + self._max_objects_to_analyze = 50000 # 限制分析对象数量 + self._analysis_timeout = 30 # 分析超时时间(秒) + self._large_object_threshold = 1024 * 1024 # 大对象阈值(1MB) + + # 线程池用于超时控制 + self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="MemoryAnalysis") + # 启用tracemalloc以获得更详细的内存信息 if not tracemalloc.is_tracing(): tracemalloc.start(25) # 保留25个帧 + def _run_with_timeout(self, func, *args, **kwargs): + """ + 在超时限制下运行函数,使用ThreadPoolExecutor实现跨平台超时控制 + """ + try: + future = self._executor.submit(func, *args, **kwargs) + result = future.result(timeout=self._analysis_timeout) + return result + except FutureTimeoutError: + logger.warning(f"内存分析函数 {func.__name__} 超时 ({self._analysis_timeout}秒)") + return None + except Exception as e: + logger.error(f"内存分析函数 {func.__name__} 出错: {e}") + return None + + def __del__(self): + """析构函数,确保线程池正确关闭""" + try: + if hasattr(self, '_executor'): + self._executor.shutdown(wait=False) + except: + pass + @eventmanager.register(EventType.ConfigChanged) def handle_config_changed(self, event: Event): """ @@ -117,21 +154,21 @@ class MemoryHelper(metaclass=Singleton): # 第一步:写入基本信息和系统内存统计 self._write_system_memory_info(snapshot_file, memory_usage) - # 第二步:写入Python对象类型统计 - self._write_python_objects_info(snapshot_file) + # 第二步:写入Python对象类型统计(优化版本) + self._write_python_objects_info_optimized(snapshot_file) - # 第三步:分析并写入类实例内存使用情况 - self._append_class_analysis(snapshot_file) + # 第三步:分析并写入类实例内存使用情况(简化版本) + self._append_class_analysis_simple(snapshot_file) - # 第四步:分析并写入大内存变量详情 - self._append_variable_analysis(snapshot_file) + # 第四步:分析并写入大内存变量详情(优化版本) + self._append_variable_analysis_optimized(snapshot_file) # 第五步:分析内存泄漏和增长趋势 self._append_memory_leak_analysis(snapshot_file) logger.info(f"内存快照已保存: {snapshot_file}, 当前内存使用: {memory_usage / 1024 / 1024:.2f} MB") - # 清理过期的快照文件(保留最近30个) + # 清理过期的快照文件 self._cleanup_old_snapshots() except Exception as e: @@ -148,9 +185,6 @@ class MemoryHelper(metaclass=Singleton): # 获取系统总内存信息 system_memory = psutil.virtual_memory() - # 获取内存映射信息 - memory_maps = process.memory_maps() - with open(snapshot_file, 'w', encoding='utf-8') as f: f.write(f"内存快照时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write("=" * 80 + "\n") @@ -167,95 +201,42 @@ class MemoryHelper(metaclass=Singleton): f.write(f"进程文本段: {memory_info.text / 1024 / 1024:.2f} MB\n") f.write(f"进程数据段: {memory_info.data / 1024 / 1024:.2f} MB\n") - # 分析内存映射 - f.write("\n内存映射分析:\n") - f.write("-" * 80 + "\n") - memory_regions = self._analyze_memory_maps(memory_maps) - for region_type, size_mb in memory_regions.items(): - f.write(f"{region_type}: {size_mb:.2f} MB\n") - f.flush() - def _analyze_memory_maps(self, memory_maps) -> Dict[str, float]: + def _write_python_objects_info_optimized(self, snapshot_file): """ - 分析内存映射,按类型分类统计 - """ - regions = {} - for mmap in memory_maps: - size_mb = mmap.size / 1024 / 1024 - perms = mmap.perms - - if 'r' in perms and 'w' in perms: - region_type = "读写内存" - elif 'r' in perms and 'x' in perms: - region_type = "代码段" - elif 'r' in perms: - region_type = "只读内存" - else: - region_type = "其他内存" - - if region_type in regions: - regions[region_type] += size_mb - else: - regions[region_type] = size_mb - - return regions - - def _write_python_objects_info(self, snapshot_file): - """ - 写入Python对象类型统计信息 - """ - # 获取当前tracemalloc统计 - current, peak = tracemalloc.get_traced_memory() - - # 获取所有对象 - all_objects = muppy.get_objects() - sum1 = summary.summarize(all_objects) - - # 计算Python对象总内存 - python_total_mb = 0 - for line in summary.format_(sum1): - if '|' in line and line.strip() and not line.startswith('=') and not line.startswith('-'): - parts = line.split('|') - if len(parts) >= 3: - try: - size_str = parts[2].strip() - if 'MB' in size_str: - size_mb = float(size_str.replace('MB', '').strip()) - python_total_mb += size_mb - except: - pass - - with open(snapshot_file, 'a', encoding='utf-8') as f: - f.write("\n" + "=" * 80 + "\n") - f.write("Python内存使用情况:\n") - f.write("-" * 80 + "\n") - f.write(f"tracemalloc当前内存: {current / 1024 / 1024:.2f} MB\n") - f.write(f"tracemalloc峰值内存: {peak / 1024 / 1024:.2f} MB\n") - f.write(f"Python对象总内存: {python_total_mb:.2f} MB\n") - f.write(f"未统计内存(可能为C扩展): {self._get_unaccounted_memory():.2f} MB\n") - - f.write("\n对象类型统计:\n") - f.write("-" * 80 + "\n") - # 写入对象统计信息 - for line in summary.format_(sum1): - f.write(line + "\n") - - f.flush() - - def _get_unaccounted_memory(self) -> float: - """ - 计算未统计的内存(可能是C扩展、系统缓存等) + 优化的Python对象类型统计信息 """ try: - # 获取进程总内存 - process = psutil.Process() - total_memory = process.memory_info().rss / 1024 / 1024 # MB + # 获取当前tracemalloc统计 + current, peak = tracemalloc.get_traced_memory() - # 获取Python对象总内存 + # 限制分析的对象数量 all_objects = muppy.get_objects() - sum1 = summary.summarize(all_objects) + if len(all_objects) > self._max_objects_to_analyze: + # 随机采样,避免总是分析相同的对象 + import random + all_objects = random.sample(all_objects, self._max_objects_to_analyze) + logger.info(f"对象数量过多,采样分析 {self._max_objects_to_analyze} 个对象") + # 使用超时机制 + def analyze_objects(): + sum1 = summary.summarize(all_objects) + return sum1 + + sum1 = self._run_with_timeout(analyze_objects) + if sum1 is None: + with open(snapshot_file, 'a', encoding='utf-8') as f: + f.write("\n" + "=" * 80 + "\n") + f.write("Python内存使用情况:\n") + f.write("-" * 80 + "\n") + f.write(f"tracemalloc当前内存: {current / 1024 / 1024:.2f} MB\n") + f.write(f"tracemalloc峰值内存: {peak / 1024 / 1024:.2f} MB\n") + f.write("对象分析超时,跳过详细统计\n") + f.flush() + return + + # 计算Python对象总内存 python_total_mb = 0 for line in summary.format_(sum1): if '|' in line and line.strip() and not line.startswith('=') and not line.startswith('-'): @@ -268,10 +249,258 @@ class MemoryHelper(metaclass=Singleton): python_total_mb += size_mb except: pass + + with open(snapshot_file, 'a', encoding='utf-8') as f: + f.write("\n" + "=" * 80 + "\n") + f.write("Python内存使用情况:\n") + f.write("-" * 80 + "\n") + f.write(f"tracemalloc当前内存: {current / 1024 / 1024:.2f} MB\n") + f.write(f"tracemalloc峰值内存: {peak / 1024 / 1024:.2f} MB\n") + f.write(f"Python对象总内存: {python_total_mb:.2f} MB\n") + f.write(f"分析对象数量: {len(all_objects):,}\n") + + f.write("\n对象类型统计 (前20个):\n") + f.write("-" * 80 + "\n") + # 写入对象统计信息(限制行数) + line_count = 0 + for line in summary.format_(sum1): + if line_count >= 25: # 只显示前25行 + break + f.write(line + "\n") + line_count += 1 + + f.flush() + + except Exception as e: + logger.error(f"写入Python对象信息失败: {e}") + with open(snapshot_file, 'a', encoding='utf-8') as f: + f.write(f"\nPython对象分析失败: {e}\n") + f.flush() + + def _append_class_analysis_simple(self, snapshot_file): + """ + 简化的类实例内存使用情况分析 + """ + with open(snapshot_file, 'a', encoding='utf-8') as f: + f.write("\n" + "=" * 80 + "\n") + f.write("类实例内存使用情况 (简化版):\n") + f.write("-" * 80 + "\n") + f.write("正在分析中...\n") + f.flush() + + try: + logger.debug("开始分析类实例内存使用情况") - return max(0, total_memory - python_total_mb) - except: - return 0.0 + def analyze_classes(): + class_info = {} + processed_count = 0 + + # 获取所有对象(限制数量) + all_objects = muppy.get_objects() + if len(all_objects) > self._max_objects_to_analyze: + import random + all_objects = random.sample(all_objects, self._max_objects_to_analyze) + + for obj in all_objects: + try: + # 跳过类对象本身 + if isinstance(obj, type): + continue + + # 获取对象的类名 + obj_class = type(obj) + try: + if hasattr(obj_class, '__module__') and hasattr(obj_class, '__name__'): + class_name = f"{obj_class.__module__}.{obj_class.__name__}" + else: + class_name = str(obj_class) + except: + class_name = f"" + + # 只计算对象本身的大小,避免深度计算 + size_bytes = sys.getsizeof(obj) + if size_bytes < 100: # 跳过太小的对象 + continue + + size_mb = size_bytes / 1024 / 1024 + processed_count += 1 + + if class_name in class_info: + class_info[class_name]['size_mb'] += size_mb + class_info[class_name]['count'] += 1 + else: + class_info[class_name] = { + 'name': class_name, + 'size_mb': size_mb, + 'count': 1 + } + + except Exception: + continue + + # 按内存大小排序,只返回前50个 + sorted_classes = sorted(class_info.values(), key=lambda x: x['size_mb'], reverse=True) + return sorted_classes[:50] + + class_objects = self._run_with_timeout(analyze_classes) + + # 更新文件内容 + with open(snapshot_file, 'r', encoding='utf-8') as f: + content = f.read() + + content = content.replace("正在分析中...\n", "") + + with open(snapshot_file, 'w', encoding='utf-8') as f: + f.write(content) + + if class_objects: + for i, class_info in enumerate(class_objects, 1): + f.write(f"{i:3d}. {class_info['name']:<50} " + f"{class_info['size_mb']:>8.2f} MB ({class_info['count']} 个实例)\n") + else: + f.write("类实例分析超时或失败\n") + + f.flush() + + except Exception as e: + logger.error(f"获取类实例信息失败: {e}") + with open(snapshot_file, 'r', encoding='utf-8') as f: + content = f.read() + content = content.replace("正在分析中...\n", f"获取类实例信息失败: {e}\n") + with open(snapshot_file, 'w', encoding='utf-8') as f: + f.write(content) + f.flush() + + logger.debug("类实例分析已完成并写入") + + def _append_variable_analysis_optimized(self, snapshot_file): + """ + 优化的大内存变量详情分析 + """ + with open(snapshot_file, 'a', encoding='utf-8') as f: + f.write("\n" + "=" * 80 + "\n") + f.write("大内存变量详情 (优化版):\n") + f.write("-" * 80 + "\n") + f.write("正在分析中...\n") + f.flush() + + try: + logger.debug("开始分析大内存变量") + + def analyze_large_variables(): + large_vars = [] + processed_count = 0 + calculated_objects = set() + + # 获取所有对象(限制数量) + all_objects = muppy.get_objects() + if len(all_objects) > self._max_objects_to_analyze: + import random + all_objects = random.sample(all_objects, self._max_objects_to_analyze) + + for obj in all_objects: + # 跳过类对象 + if isinstance(obj, type): + continue + + # 跳过已经计算过的对象 + obj_id = id(obj) + if obj_id in calculated_objects: + continue + + try: + # 首先使用 sys.getsizeof 快速筛选 + shallow_size = sys.getsizeof(obj) + if shallow_size < self._large_object_threshold: # 只处理大于1MB的对象 + continue + + # 对于较大的对象,使用 asizeof 进行深度计算 + size_bytes = asizeof.asizeof(obj) + + # 只处理大于1MB的对象 + if size_bytes < self._large_object_threshold: + continue + + size_mb = size_bytes / 1024 / 1024 + processed_count += 1 + calculated_objects.add(obj_id) + + # 获取对象信息 + var_info = self._get_variable_info_simple(obj, size_mb) + if var_info: + large_vars.append(var_info) + + # 如果已经找到足够多的大对象,可以提前结束 + if len(large_vars) >= 50: # 限制数量 + break + + except Exception: + continue + + # 按内存大小排序并返回前30个 + large_vars.sort(key=lambda x: x['size_mb'], reverse=True) + return large_vars[:30] + + large_variables = self._run_with_timeout(analyze_large_variables) + + # 更新文件内容 + with open(snapshot_file, 'r', encoding='utf-8') as f: + content = f.read() + + content = content.replace("正在分析中...\n", "") + + with open(snapshot_file, 'w', encoding='utf-8') as f: + f.write(content) + + if large_variables: + for i, var_info in enumerate(large_variables, 1): + f.write( + f"{i:3d}. {var_info['name']:<30} {var_info['type']:<15} {var_info['size_mb']:>8.2f} MB\n") + else: + f.write("大内存变量分析超时或失败\n") + + f.flush() + + except Exception as e: + logger.error(f"获取大内存变量信息失败: {e}") + with open(snapshot_file, 'r', encoding='utf-8') as f: + content = f.read() + content = content.replace("正在分析中...\n", f"获取变量信息失败: {e}\n") + with open(snapshot_file, 'w', encoding='utf-8') as f: + f.write(content) + f.flush() + + logger.debug("大内存变量分析已完成并写入") + + def _get_variable_info_simple(self, obj, size_mb): + """ + 简化的变量信息获取 + """ + try: + obj_type = type(obj).__name__ + + # 简化的变量名获取 + var_name = f"{obj_type}_{id(obj)}" + + # 生成描述性信息 + if isinstance(obj, dict): + key_count = len(obj) + var_name = f"dict_{key_count}items_{id(obj)}" + elif isinstance(obj, (list, tuple, set)): + var_name = f"{obj_type}_{len(obj)}items_{id(obj)}" + elif isinstance(obj, str): + var_name = f"str_{len(obj)}chars_{id(obj)}" + elif hasattr(obj, '__class__') and hasattr(obj.__class__, '__name__'): + var_name = f"{obj.__class__.__name__}_{id(obj)}" + + return { + 'name': var_name, + 'type': obj_type, + 'size_mb': size_mb + } + + except Exception: + return None def _append_memory_leak_analysis(self, snapshot_file): """ @@ -287,15 +516,14 @@ class MemoryHelper(metaclass=Singleton): f.write(f"当前tracemalloc内存: {current / 1024 / 1024:.2f} MB\n") f.write(f"tracemalloc峰值内存: {peak / 1024 / 1024:.2f} MB\n") - # 获取内存分配统计 + # 获取内存分配统计(限制数量) try: - # 获取前10个内存分配最多的位置 snapshot = tracemalloc.take_snapshot() top_stats = snapshot.statistics('lineno') - f.write("\n内存分配最多的位置 (前10个):\n") + f.write("\n内存分配最多的位置 (前5个):\n") f.write("-" * 80 + "\n") - for i, stat in enumerate(top_stats[:10], 1): + for i, stat in enumerate(top_stats[:5], 1): f.write(f"{i:2d}. {stat.count:>8} 个对象, {stat.size / 1024 / 1024:>8.2f} MB\n") f.write(f" {stat.traceback.format()}\n") @@ -317,6 +545,21 @@ class MemoryHelper(metaclass=Singleton): logger.debug("内存泄漏分析已完成并写入") + def _cleanup_old_snapshots(self): + """ + 清理过期的内存快照文件,只保留最近的指定数量文件 + """ + try: + snapshot_files = list(self._memory_snapshot_dir.glob("memory_snapshot_*.txt")) + if len(snapshot_files) > self._keep_count: + # 按修改时间排序,删除最旧的文件 + snapshot_files.sort(key=lambda x: x.stat().st_mtime) + for old_file in snapshot_files[:-self._keep_count]: + old_file.unlink() + logger.debug(f"已删除过期内存快照: {old_file}") + except Exception as e: + logger.error(f"清理过期快照失败: {e}") + def create_detailed_memory_analysis(self): """ 创建详细的内存分析报告,专门用于诊断内存问题 @@ -334,14 +577,14 @@ class MemoryHelper(metaclass=Singleton): # 1. 系统级内存分析 self._write_detailed_system_analysis(f) - # 2. Python对象深度分析 - self._write_detailed_python_analysis(f) + # 2. Python对象深度分析(优化版) + self._write_detailed_python_analysis_optimized(f) - # 3. 内存映射详细分析 - self._write_detailed_memory_maps(f) + # 3. 内存映射详细分析(简化版) + self._write_detailed_memory_maps_simple(f) - # 4. 大对象分析 - self._write_detailed_large_objects(f) + # 4. 大对象分析(优化版) + self._write_detailed_large_objects_optimized(f) # 5. 内存泄漏检测 self._write_memory_leak_detection(f) @@ -387,39 +630,47 @@ class MemoryHelper(metaclass=Singleton): f.write("\n" + "=" * 100 + "\n\n") - def _write_detailed_python_analysis(self, f): + def _write_detailed_python_analysis_optimized(self, f): """ - 写入详细的Python对象分析 + 写入优化的Python对象分析 """ - f.write("2. Python对象深度分析\n") + f.write("2. Python对象深度分析 (优化版)\n") f.write("-" * 50 + "\n") # 强制垃圾回收 collected = gc.collect() f.write(f"垃圾回收清理对象数: {collected}\n\n") - # 获取所有对象 + # 获取所有对象(限制数量) all_objects = muppy.get_objects() + if len(all_objects) > self._max_objects_to_analyze: + import random + all_objects = random.sample(all_objects, self._max_objects_to_analyze) + f.write(f"对象数量过多,采样分析 {self._max_objects_to_analyze} 个对象\n\n") + f.write(f"总对象数: {len(all_objects):,}\n") - # 按类型统计 + # 按类型统计(简化版) type_stats = {} for obj in all_objects: - obj_type = type(obj).__name__ - if obj_type not in type_stats: - type_stats[obj_type] = {'count': 0, 'size': 0} - type_stats[obj_type]['count'] += 1 - type_stats[obj_type]['size'] += sys.getsizeof(obj) + try: + obj_type = type(obj).__name__ + if obj_type not in type_stats: + type_stats[obj_type] = {'count': 0, 'size': 0} + type_stats[obj_type]['count'] += 1 + type_stats[obj_type]['size'] += sys.getsizeof(obj) + except: + continue # 按大小排序 sorted_types = sorted(type_stats.items(), key=lambda x: x[1]['size'], reverse=True) - f.write("对象类型统计 (按内存大小排序):\n") + f.write("对象类型统计 (按内存大小排序,前15个):\n") f.write(f"{'类型':<20} {'数量':<10} {'总大小(MB)':<12} {'平均大小(B)':<12}\n") f.write("-" * 60 + "\n") total_python_memory = 0 - for obj_type, stats in sorted_types[:20]: # 只显示前20个 + for obj_type, stats in sorted_types[:15]: # 只显示前15个 size_mb = stats['size'] / 1024 / 1024 avg_size = stats['size'] / stats['count'] if stats['count'] > 0 else 0 total_python_memory += size_mb @@ -435,105 +686,103 @@ class MemoryHelper(metaclass=Singleton): f.write("\n" + "=" * 100 + "\n\n") - def _write_detailed_memory_maps(self, f): + def _write_detailed_memory_maps_simple(self, f): """ - 写入详细的内存映射分析 + 写入简化的内存映射分析 """ - f.write("3. 内存映射详细分析\n") + f.write("3. 内存映射分析 (简化版)\n") f.write("-" * 50 + "\n") - process = psutil.Process() - memory_maps = process.memory_maps() - - # 按权限分类 - perm_stats = {} - file_stats = {} - - for mmap in memory_maps: - size_mb = mmap.size / 1024 / 1024 - perms = mmap.perms + try: + process = psutil.Process() + memory_maps = process.memory_maps() - # 按权限统计 - if perms not in perm_stats: - perm_stats[perms] = {'count': 0, 'size': 0} - perm_stats[perms]['count'] += 1 - perm_stats[perms]['size'] += size_mb + # 按权限分类 + perm_stats = {} - # 按文件统计 - if mmap.path: - if mmap.path not in file_stats: - file_stats[mmap.path] = {'count': 0, 'size': 0} - file_stats[mmap.path]['count'] += 1 - file_stats[mmap.path]['size'] += size_mb - - f.write("按权限分类的内存映射:\n") - f.write(f"{'权限':<10} {'数量':<8} {'大小(MB)':<12}\n") - f.write("-" * 35 + "\n") - for perms, stats in sorted(perm_stats.items(), key=lambda x: x[1]['size'], reverse=True): - f.write(f"{perms:<10} {stats['count']:<8} {stats['size']:<12.2f}\n") - - f.write(f"\n按文件分类的内存映射 (前10个):\n") - f.write(f"{'文件路径':<50} {'大小(MB)':<12}\n") - f.write("-" * 70 + "\n") - for path, stats in sorted(file_stats.items(), key=lambda x: x[1]['size'], reverse=True)[:10]: - if len(path) > 47: - path = path[:44] + "..." - f.write(f"{path:<50} {stats['size']:<12.2f}\n") + for mmap in memory_maps: + size_mb = mmap.size / 1024 / 1024 + perms = mmap.perms + + # 按权限统计 + if perms not in perm_stats: + perm_stats[perms] = {'count': 0, 'size': 0} + perm_stats[perms]['count'] += 1 + perm_stats[perms]['size'] += size_mb + + f.write("按权限分类的内存映射:\n") + f.write(f"{'权限':<10} {'数量':<8} {'大小(MB)':<12}\n") + f.write("-" * 35 + "\n") + for perms, stats in sorted(perm_stats.items(), key=lambda x: x[1]['size'], reverse=True): + f.write(f"{perms:<10} {stats['count']:<8} {stats['size']:<12.2f}\n") + + except Exception as e: + f.write(f"内存映射分析失败: {e}\n") f.write("\n" + "=" * 100 + "\n\n") - def _write_detailed_large_objects(self, f): + def _write_detailed_large_objects_optimized(self, f): """ - 写入大对象详细分析 + 写入优化的大对象详细分析 """ - f.write("4. 大对象详细分析\n") + f.write("4. 大对象详细分析 (优化版)\n") f.write("-" * 50 + "\n") - all_objects = muppy.get_objects() - large_objects = [] - - for obj in all_objects: - try: - size = asizeof.asizeof(obj) - if size > 1024 * 1024: # 大于1MB的对象 - large_objects.append((obj, size)) - except: - continue - - # 按大小排序 - large_objects.sort(key=lambda x: x[1], reverse=True) - - f.write(f"大对象 (>1MB) 数量: {len(large_objects)}\n\n") - - for i, (obj, size) in enumerate(large_objects[:20], 1): # 只显示前20个 - size_mb = size / 1024 / 1024 - obj_type = type(obj).__name__ + def analyze_large_objects(): + large_objects = [] - f.write(f"{i:2d}. {obj_type} - {size_mb:.2f} MB\n") + # 获取所有对象(限制数量) + all_objects = muppy.get_objects() + if len(all_objects) > self._max_objects_to_analyze: + import random + all_objects = random.sample(all_objects, self._max_objects_to_analyze) - # 尝试获取更多信息 - try: - if isinstance(obj, dict): - f.write(f" 字典项数: {len(obj)}\n") - if obj: - sample_keys = list(obj.keys())[:3] - f.write(f" 示例键: {sample_keys}\n") - elif isinstance(obj, (list, tuple)): - f.write(f" 元素数量: {len(obj)}\n") - elif isinstance(obj, str): - f.write(f" 字符串长度: {len(obj)}\n") - if len(obj) > 100: - f.write(f" 内容预览: {obj[:100]}...\n") - else: - f.write(f" 内容: {obj}\n") - elif hasattr(obj, '__dict__'): - f.write(f" 属性数量: {len(obj.__dict__)}\n") - if hasattr(obj, '__class__'): - f.write(f" 类名: {obj.__class__.__name__}\n") - except: - pass + for obj in all_objects: + try: + # 快速筛选 + shallow_size = sys.getsizeof(obj) + if shallow_size < self._large_object_threshold: + continue + + # 深度计算 + size = asizeof.asizeof(obj) + if size > self._large_object_threshold: + large_objects.append((obj, size)) + + # 限制数量 + if len(large_objects) >= 20: + break + + except: + continue - f.write("\n") + return large_objects + + large_objects = self._run_with_timeout(analyze_large_objects) + + if large_objects: + f.write(f"大对象 (>1MB) 数量: {len(large_objects)}\n\n") + + for i, (obj, size) in enumerate(large_objects, 1): + size_mb = size / 1024 / 1024 + obj_type = type(obj).__name__ + + f.write(f"{i:2d}. {obj_type} - {size_mb:.2f} MB\n") + + # 简化的对象信息 + try: + if isinstance(obj, dict): + f.write(f" 字典项数: {len(obj)}\n") + elif isinstance(obj, (list, tuple)): + f.write(f" 元素数量: {len(obj)}\n") + elif isinstance(obj, str): + f.write(f" 字符串长度: {len(obj)}\n") + except: + pass + + f.write("\n") + else: + f.write("大对象分析超时或未找到大对象\n") f.write("=" * 100 + "\n\n") @@ -553,9 +802,9 @@ class MemoryHelper(metaclass=Singleton): snapshot = tracemalloc.take_snapshot() top_stats = snapshot.statistics('lineno') - f.write(f"\n内存分配最多的位置 (前15个):\n") + f.write(f"\n内存分配最多的位置 (前10个):\n") f.write("-" * 50 + "\n") - for i, stat in enumerate(top_stats[:15], 1): + for i, stat in enumerate(top_stats[:10], 1): f.write(f"{i:2d}. {stat.count:>8} 个对象, {stat.size / 1024 / 1024:>8.2f} MB\n") for line in stat.traceback.format(): f.write(f" {line}\n") @@ -579,312 +828,6 @@ class MemoryHelper(metaclass=Singleton): f.write("\n" + "=" * 100 + "\n\n") - def _append_class_analysis(self, snapshot_file): - """ - 分析并追加类实例内存使用情况 - """ - with open(snapshot_file, 'a', encoding='utf-8') as f: - f.write("\n" + "=" * 80 + "\n") - f.write("类实例内存使用情况 (按内存大小排序):\n") - f.write("-" * 80 + "\n") - f.write("正在分析中...\n") - # 立即刷新,让用户知道这部分开始了 - f.flush() - - try: - logger.debug("开始分析类实例内存使用情况") - class_objects = self._get_class_memory_usage() - - # 重新打开文件,移除"正在分析中..."并写入实际结果 - with open(snapshot_file, 'r', encoding='utf-8') as f: - content = f.read() - - # 替换"正在分析中..." - content = content.replace("正在分析中...\n", "") - - with open(snapshot_file, 'w', encoding='utf-8') as f: - f.write(content) - - if class_objects: - # 只显示前100个类 - for i, class_info in enumerate(class_objects[:100], 1): - f.write(f"{i:3d}. {class_info['name']:<50} " - f"{class_info['size_mb']:>8.2f} MB ({class_info['count']} 个实例)\n") - else: - f.write("未找到有效的类实例信息\n") - - f.flush() - - except Exception as e: - logger.error(f"获取类实例信息失败: {e}") - - # 即使出错也要更新文件 - with open(snapshot_file, 'r', encoding='utf-8') as f: - content = f.read() - - content = content.replace("正在分析中...\n", f"获取类实例信息失败: {e}\n") - - with open(snapshot_file, 'w', encoding='utf-8') as f: - f.write(content) - f.flush() - - logger.debug("类实例分析已完成并写入") - - def _append_variable_analysis(self, snapshot_file): - """ - 分析并追加大内存变量详情 - """ - with open(snapshot_file, 'a', encoding='utf-8') as f: - f.write("\n" + "=" * 80 + "\n") - f.write("大内存变量详情 (前100个):\n") - f.write("-" * 80 + "\n") - f.write("正在分析中...\n") - # 立即刷新,让用户知道这部分开始了 - f.flush() - - try: - logger.debug("开始分析大内存变量") - large_variables = self._get_large_variables(100) - - # 重新打开文件,移除"正在分析中..."并写入实际结果 - with open(snapshot_file, 'r', encoding='utf-8') as f: - content = f.read() - - # 替换最后的"正在分析中..." - content = content.replace("正在分析中...\n", "") - - with open(snapshot_file, 'w', encoding='utf-8') as f: - f.write(content) - - if large_variables: - for i, var_info in enumerate(large_variables, 1): - f.write( - f"{i:3d}. {var_info['name']:<30} {var_info['type']:<15} {var_info['size_mb']:>8.2f} MB\n") - else: - f.write("未找到大内存变量\n") - - f.flush() - - except Exception as e: - logger.error(f"获取大内存变量信息失败: {e}") - - # 即使出错也要更新文件 - with open(snapshot_file, 'r', encoding='utf-8') as f: - content = f.read() - - content = content.replace("正在分析中...\n", f"获取变量信息失败: {e}\n") - - with open(snapshot_file, 'w', encoding='utf-8') as f: - f.write(content) - f.flush() - - logger.debug("大内存变量分析已完成并写入") - - def _cleanup_old_snapshots(self): - """ - 清理过期的内存快照文件,只保留最近的指定数量文件 - """ - try: - snapshot_files = list(self._memory_snapshot_dir.glob("memory_snapshot_*.txt")) - if len(snapshot_files) > self._keep_count: - # 按修改时间排序,删除最旧的文件 - snapshot_files.sort(key=lambda x: x.stat().st_mtime) - for old_file in snapshot_files[:-self._keep_count]: - old_file.unlink() - logger.debug(f"已删除过期内存快照: {old_file}") - except Exception as e: - logger.error(f"清理过期快照失败: {e}") - - @staticmethod - def _get_class_memory_usage(): - """ - 获取所有类实例的内存使用情况,按内存大小排序 - """ - class_info = {} - processed_count = 0 - error_count = 0 - - # 获取所有对象 - all_objects = muppy.get_objects() - logger.debug(f"开始分析 {len(all_objects)} 个对象的类实例内存使用情况") - - for obj in all_objects: - try: - # 跳过类对象本身,统计类的实例 - if isinstance(obj, type): - continue - - # 获取对象的类名 - 这里可能会出错 - obj_class = type(obj) - - # 安全地获取类名 - try: - if hasattr(obj_class, '__module__') and hasattr(obj_class, '__name__'): - class_name = f"{obj_class.__module__}.{obj_class.__name__}" - else: - class_name = str(obj_class) - except Exception as e: - # 如果获取类名失败,使用简单的类型描述 - class_name = f"" - logger.debug(f"获取类名失败: {e}") - - # 计算对象本身的内存使用(不包括引用对象,避免重复计算) - size_bytes = sys.getsizeof(obj) - if size_bytes < 100: # 跳过太小的对象 - continue - - size_mb = size_bytes / 1024 / 1024 - processed_count += 1 - - if class_name in class_info: - class_info[class_name]['size_mb'] += size_mb - class_info[class_name]['count'] += 1 - else: - class_info[class_name] = { - 'name': class_name, - 'size_mb': size_mb, - 'count': 1 - } - - except Exception as e: - # 捕获所有可能的异常,包括SQLAlchemy、ORM等框架的异常 - error_count += 1 - if error_count <= 5: # 只记录前5个错误,避免日志过多 - logger.debug(f"分析对象时出错: {e}") - continue - - logger.debug(f"类实例分析完成: 处理了 {processed_count} 个对象, 遇到 {error_count} 个错误") - - # 按内存大小排序 - sorted_classes = sorted(class_info.values(), key=lambda x: x['size_mb'], reverse=True) - return sorted_classes - - def _get_large_variables(self, limit=100): - """ - 获取大内存变量信息,按内存大小排序 - 使用已计算对象集合避免重复计算 - """ - large_vars = [] - processed_count = 0 - calculated_objects = set() # 避免重复计算 - - # 获取所有对象 - all_objects = muppy.get_objects() - logger.debug(f"开始分析 {len(all_objects)} 个对象的内存使用情况") - - for obj in all_objects: - # 跳过类对象 - if isinstance(obj, type): - continue - - # 跳过已经计算过的对象 - obj_id = id(obj) - if obj_id in calculated_objects: - continue - - try: - # 首先使用 sys.getsizeof 快速筛选 - shallow_size = sys.getsizeof(obj) - if shallow_size < 1024: # 只处理大于1KB的对象 - continue - - # 对于较大的对象,使用 asizeof 进行深度计算 - size_bytes = asizeof.asizeof(obj) - - # 只处理大于10KB的对象,提高分析效率 - if size_bytes < 10240: - continue - - size_mb = size_bytes / 1024 / 1024 - processed_count += 1 - calculated_objects.add(obj_id) - - # 获取对象信息 - var_info = self._get_variable_info(obj, size_mb) - if var_info: - large_vars.append(var_info) - - # 如果已经找到足够多的大对象,可以提前结束 - if len(large_vars) >= limit * 2: # 多收集一些,后面排序筛选 - break - - except Exception as e: - # 更广泛的异常捕获 - logger.debug(f"分析对象失败: {e}") - continue - - logger.debug(f"处理了 {processed_count} 个大对象,找到 {len(large_vars)} 个有效变量") - - # 按内存大小排序并返回前N个 - large_vars.sort(key=lambda x: x['size_mb'], reverse=True) - return large_vars[:limit] - - def _get_variable_info(self, obj, size_mb): - """ - 获取变量的描述信息 - """ - try: - obj_type = type(obj).__name__ - - # 尝试获取变量名 - var_name = self._get_variable_name(obj) - - # 生成描述性信息 - if isinstance(obj, dict): - key_count = len(obj) - if key_count > 0: - sample_keys = list(obj.keys())[:3] - var_name += f" ({key_count}项, 键: {sample_keys})" - elif isinstance(obj, (list, tuple, set)): - var_name += f" ({len(obj)}个元素)" - elif isinstance(obj, str): - if len(obj) > 50: - var_name += f" (长度: {len(obj)}, 内容: '{obj[:50]}...')" - else: - var_name += f" ('{obj}')" - elif hasattr(obj, '__class__') and hasattr(obj.__class__, '__name__'): - if hasattr(obj, '__dict__'): - attr_count = len(obj.__dict__) - var_name += f" ({attr_count}个属性)" - - return { - 'name': var_name, - 'type': obj_type, - 'size_mb': size_mb - } - - except Exception as e: - logger.debug(f"获取变量信息失败: {e}") - return None - - @staticmethod - def _get_variable_name(obj): - """ - 尝试获取变量名 - """ - try: - # 尝试通过gc获取引用该对象的变量名 - referrers = gc.get_referrers(obj) - - for referrer in referrers: - if isinstance(referrer, dict): - # 检查是否在某个模块的全局变量中 - for name, value in referrer.items(): - if value is obj and isinstance(name, str): - return name - elif hasattr(referrer, '__dict__'): - # 检查是否在某个实例的属性中 - for name, value in referrer.__dict__.items(): - if value is obj and isinstance(name, str): - return f"{type(referrer).__name__}.{name}" - - # 如果找不到变量名,返回对象类型和id - return f"{type(obj).__name__}_{id(obj)}" - - except Exception as e: - logger.debug(f"获取变量名失败: {e}") - return f"{type(obj).__name__}_{id(obj)}" - def get_memory_summary(self) -> Dict[str, float]: """ 获取内存使用摘要 @@ -893,8 +836,12 @@ class MemoryHelper(metaclass=Singleton): process = psutil.Process() memory_info = process.memory_info() - # 获取Python对象总内存 + # 获取Python对象总内存(简化版) all_objects = muppy.get_objects() + if len(all_objects) > 10000: # 限制对象数量 + import random + all_objects = random.sample(all_objects, 10000) + sum1 = summary.summarize(all_objects) python_total_mb = 0