Files
REPO_TEST/repodata/checker.py
2025-07-23 20:41:13 +08:00

252 lines
8.3 KiB
Python

# -*- coding:utf-8 -*-
# 仓库检查器模块
import os
import logging
from typing import List, Dict, Any, Optional, Tuple
import concurrent.futures
import time
from common.logger import get_logger
from common.config import ConfigManager
from repodata.parser import RepoDataParser
# 获取日志记录器
logger = get_logger(__name__)
class RepoChecker:
"""仓库检查器,用于检查仓库数据的一致性"""
def __init__(self, output_dir: str = "./results"):
"""
初始化检查器
Args:
output_dir: 输出目录
"""
self.output_dir = output_dir
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 初始化日志文件
self.overall_log_file = os.path.join(output_dir, "repo_rpmlist_check_results.log")
self.fail_log_file = os.path.join(output_dir, "repo_rpmlist_check_fail.log")
# 初始化日志文件
self._init_log_files()
def _init_log_files(self):
"""初始化日志文件"""
# 清空或创建结果文件
with open(self.overall_log_file, 'w', encoding='utf-8') as f:
f.write("=== 整体测试结果 ===
")
with open(self.fail_log_file, 'w', encoding='utf-8') as f:
f.write("=== 失败记录 ===
")
def check_repository(self, repo_config: Dict[str, Any]) -> Dict[str, Any]:
"""
检查单个仓库配置
Args:
repo_config: 仓库配置
Returns:
检查结果
"""
product = repo_config["product"]
base_url = repo_config["base_url"]
results = {}
for arch in repo_config["architectures"]:
for repo_type in repo_config["repo_types"]:
# 构建完整的仓库URL
if "8U8" not in product:
repo_url = f"{base_url}{repo_type}/{arch}/"
else:
repo_url = f"{base_url}{repo_type}/{arch}/os/"
# 生成输出文件名
output_file = os.path.join(self.output_dir, f"rpm_check_{product}_{arch}_{repo_type}.txt")
logger.info(f"开始测试: {product} {arch} {repo_type}")
logger.info(f"测试仓库地址: {repo_url}")
try:
# 创建解析器
parser = RepoDataParser(repo_url)
# 获取HTML中的RPM列表
html = parser.get_html_content()
if not html:
raise Exception("获取HTML内容失败")
rpm_list_html = parser.parse_rpm_list_from_html(html)
# 获取数据库中的RPM列表
sqlite_path = parser.download_and_extract_sqlite()
if not sqlite_path:
raise Exception("获取数据库文件失败")
rpm_list_sqlite = parser.get_rpm_list_from_sqlite(sqlite_path)
# 比较两个列表
comparison_result = parser.compare_rpm_lists(rpm_list_html, rpm_list_sqlite)
formatted_result = parser.format_comparison_result(comparison_result)
# 记录结果
test_passed = comparison_result["is_identical"]
test_key = f"{product}_{arch}_{repo_type}"
results[test_key] = {
"product": product,
"arch": arch,
"repo_type": repo_type,
"repo_url": repo_url,
"passed": test_passed,
"result": formatted_result,
"html_count": comparison_result["list1_count"],
"sqlite_count": comparison_result["list2_count"]
}
# 输出到日志文件
self._write_to_overall_log(product, arch, repo_type, repo_url, formatted_result)
# 如果测试失败,记录到失败日志
if not test_passed:
self._write_to_fail_log(product, arch, repo_type, repo_url, formatted_result)
except Exception as e:
error_msg = f"处理仓库出错: {str(e)}"
logger.error(error_msg)
# 记录错误信息
test_key = f"{product}_{arch}_{repo_type}"
results[test_key] = {
"product": product,
"arch": arch,
"repo_type": repo_type,
"repo_url": repo_url,
"passed": False,
"error": error_msg
}
# 输出到日志文件
self._write_error_to_logs(product, arch, repo_type, repo_url, error_msg)
return results
def check_all_repositories(self, repo_configs: List[Dict[str, Any]], max_workers: int = 5) -> Dict[str, Any]:
"""
并行检查所有仓库
Args:
repo_configs: 仓库配置列表
max_workers: 最大并行工作线程数
Returns:
所有检查结果
"""
all_results = {}
start_time = time.time()
# 使用线程池并行执行检查
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有检查任务
future_to_config = {executor.submit(self.check_repository, config): config for config in repo_configs}
# 收集结果
for future in concurrent.futures.as_completed(future_to_config):
config = future_to_config[future]
try:
result = future.result()
all_results.update(result)
except Exception as e:
logger.error(f"检查仓库 {config['product']} 时出错: {str(e)}")
end_time = time.time()
duration = end_time - start_time
# 记录总结信息
total_tests = len(all_results)
passed_tests = sum(1 for result in all_results.values() if result.get("passed", False))
summary = f"""
====== 测试总结 ======
总测试数: {total_tests}
通过测试: {passed_tests}
失败测试: {total_tests - passed_tests}
测试耗时: {duration:.2f}
"""
logger.info(summary)
# 将总结写入整体日志
with open(self.overall_log_file, 'a', encoding='utf-8') as f:
f.write(summary)
return all_results
def _write_to_overall_log(self, product: str, arch: str, repo_type: str, repo_url: str, result_lines: List[str]):
"""写入结果到整体日志"""
with open(self.overall_log_file, 'a', encoding='utf-8') as f:
f.write(f"
=== {product} {arch} {repo_type} ===
")
f.write(f"Repository URL: {repo_url}
")
for line in result_lines:
f.write(line + '
')
f.write("
")
def _write_to_fail_log(self, product: str, arch: str, repo_type: str, repo_url: str, result_lines: List[str]):
"""写入结果到失败日志"""
with open(self.fail_log_file, 'a', encoding='utf-8') as f:
f.write(f"
=== {product} {arch} {repo_type} ===
")
f.write(f"Repository URL: {repo_url}
")
for line in result_lines:
if "测试通过" not in line:
f.write(line + '
')
f.write("
")
def _write_error_to_logs(self, product: str, arch: str, repo_type: str, repo_url: str, error_msg: str):
"""写入错误信息到日志"""
# 写入到整体日志
with open(self.overall_log_file, 'a', encoding='utf-8') as f:
f.write(f"
=== {product} {arch} {repo_type} ===
")
f.write(f"Repository URL: {repo_url}
")
f.write(f"ERROR: {error_msg}
")
# 写入到失败日志
with open(self.fail_log_file, 'a', encoding='utf-8') as f:
f.write(f"
=== {product} {arch} {repo_type} ===
")
f.write(f"Repository URL: {repo_url}
")
f.write(f"ERROR: {error_msg}
")
# 写入到单独的错误日志
error_log_file = os.path.join(self.output_dir, f"error_{product}_{arch}_{repo_type}.log")
with open(error_log_file, 'w', encoding='utf-8') as f:
f.write(f"Error processing {repo_url}:
{error_msg}")