# -*- coding:utf-8 -*- # 仓库检查器模块 import os import logging from typing import List, Dict, Any, Optional, Tuple import concurrent.futures import time from common.logger import get_logger from common.config import ConfigManager from repodata.parser import RepoDataParser # 获取日志记录器 logger = get_logger(__name__) class RepoChecker: """仓库检查器,用于检查仓库数据的一致性""" def __init__(self, output_dir: str = "./results"): """ 初始化检查器 Args: output_dir: 输出目录 """ self.output_dir = output_dir if not os.path.exists(output_dir): os.makedirs(output_dir) # 初始化日志文件 self.overall_log_file = os.path.join(output_dir, "repo_rpmlist_check_results.log") self.fail_log_file = os.path.join(output_dir, "repo_rpmlist_check_fail.log") # 初始化日志文件 self._init_log_files() def _init_log_files(self): """初始化日志文件""" # 清空或创建结果文件 with open(self.overall_log_file, 'w', encoding='utf-8') as f: f.write("=== 整体测试结果 === ") with open(self.fail_log_file, 'w', encoding='utf-8') as f: f.write("=== 失败记录 === ") def check_repository(self, repo_config: Dict[str, Any]) -> Dict[str, Any]: """ 检查单个仓库配置 Args: repo_config: 仓库配置 Returns: 检查结果 """ product = repo_config["product"] base_url = repo_config["base_url"] results = {} for arch in repo_config["architectures"]: for repo_type in repo_config["repo_types"]: # 构建完整的仓库URL if "8U8" not in product: repo_url = f"{base_url}{repo_type}/{arch}/" else: repo_url = f"{base_url}{repo_type}/{arch}/os/" # 生成输出文件名 output_file = os.path.join(self.output_dir, f"rpm_check_{product}_{arch}_{repo_type}.txt") logger.info(f"开始测试: {product} {arch} {repo_type}") logger.info(f"测试仓库地址: {repo_url}") try: # 创建解析器 parser = RepoDataParser(repo_url) # 获取HTML中的RPM列表 html = parser.get_html_content() if not html: raise Exception("获取HTML内容失败") rpm_list_html = parser.parse_rpm_list_from_html(html) # 获取数据库中的RPM列表 sqlite_path = parser.download_and_extract_sqlite() if not sqlite_path: raise Exception("获取数据库文件失败") rpm_list_sqlite = parser.get_rpm_list_from_sqlite(sqlite_path) # 比较两个列表 comparison_result = parser.compare_rpm_lists(rpm_list_html, rpm_list_sqlite) formatted_result = parser.format_comparison_result(comparison_result) # 记录结果 test_passed = comparison_result["is_identical"] test_key = f"{product}_{arch}_{repo_type}" results[test_key] = { "product": product, "arch": arch, "repo_type": repo_type, "repo_url": repo_url, "passed": test_passed, "result": formatted_result, "html_count": comparison_result["list1_count"], "sqlite_count": comparison_result["list2_count"] } # 输出到日志文件 self._write_to_overall_log(product, arch, repo_type, repo_url, formatted_result) # 如果测试失败,记录到失败日志 if not test_passed: self._write_to_fail_log(product, arch, repo_type, repo_url, formatted_result) except Exception as e: error_msg = f"处理仓库出错: {str(e)}" logger.error(error_msg) # 记录错误信息 test_key = f"{product}_{arch}_{repo_type}" results[test_key] = { "product": product, "arch": arch, "repo_type": repo_type, "repo_url": repo_url, "passed": False, "error": error_msg } # 输出到日志文件 self._write_error_to_logs(product, arch, repo_type, repo_url, error_msg) return results def check_all_repositories(self, repo_configs: List[Dict[str, Any]], max_workers: int = 5) -> Dict[str, Any]: """ 并行检查所有仓库 Args: repo_configs: 仓库配置列表 max_workers: 最大并行工作线程数 Returns: 所有检查结果 """ all_results = {} start_time = time.time() # 使用线程池并行执行检查 with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有检查任务 future_to_config = {executor.submit(self.check_repository, config): config for config in repo_configs} # 收集结果 for future in concurrent.futures.as_completed(future_to_config): config = future_to_config[future] try: result = future.result() all_results.update(result) except Exception as e: logger.error(f"检查仓库 {config['product']} 时出错: {str(e)}") end_time = time.time() duration = end_time - start_time # 记录总结信息 total_tests = len(all_results) passed_tests = sum(1 for result in all_results.values() if result.get("passed", False)) summary = f""" ====== 测试总结 ====== 总测试数: {total_tests} 通过测试: {passed_tests} 失败测试: {total_tests - passed_tests} 测试耗时: {duration:.2f} 秒 """ logger.info(summary) # 将总结写入整体日志 with open(self.overall_log_file, 'a', encoding='utf-8') as f: f.write(summary) return all_results def _write_to_overall_log(self, product: str, arch: str, repo_type: str, repo_url: str, result_lines: List[str]): """写入结果到整体日志""" with open(self.overall_log_file, 'a', encoding='utf-8') as f: f.write(f" === {product} {arch} {repo_type} === ") f.write(f"Repository URL: {repo_url} ") for line in result_lines: f.write(line + ' ') f.write(" ") def _write_to_fail_log(self, product: str, arch: str, repo_type: str, repo_url: str, result_lines: List[str]): """写入结果到失败日志""" with open(self.fail_log_file, 'a', encoding='utf-8') as f: f.write(f" === {product} {arch} {repo_type} === ") f.write(f"Repository URL: {repo_url} ") for line in result_lines: if "测试通过" not in line: f.write(line + ' ') f.write(" ") def _write_error_to_logs(self, product: str, arch: str, repo_type: str, repo_url: str, error_msg: str): """写入错误信息到日志""" # 写入到整体日志 with open(self.overall_log_file, 'a', encoding='utf-8') as f: f.write(f" === {product} {arch} {repo_type} === ") f.write(f"Repository URL: {repo_url} ") f.write(f"ERROR: {error_msg} ") # 写入到失败日志 with open(self.fail_log_file, 'a', encoding='utf-8') as f: f.write(f" === {product} {arch} {repo_type} === ") f.write(f"Repository URL: {repo_url} ") f.write(f"ERROR: {error_msg} ") # 写入到单独的错误日志 error_log_file = os.path.join(self.output_dir, f"error_{product}_{arch}_{repo_type}.log") with open(error_log_file, 'w', encoding='utf-8') as f: f.write(f"Error processing {repo_url}: {error_msg}")