252 lines
8.3 KiB
Python
252 lines
8.3 KiB
Python
# -*- coding:utf-8 -*-
|
|
# 仓库检查器模块
|
|
|
|
import os
|
|
import logging
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
import concurrent.futures
|
|
import time
|
|
|
|
from common.logger import get_logger
|
|
from common.config import ConfigManager
|
|
from repodata.parser import RepoDataParser
|
|
|
|
# 获取日志记录器
|
|
logger = get_logger(__name__)
|
|
|
|
class RepoChecker:
|
|
"""仓库检查器,用于检查仓库数据的一致性"""
|
|
|
|
def __init__(self, output_dir: str = "./results"):
|
|
"""
|
|
初始化检查器
|
|
|
|
Args:
|
|
output_dir: 输出目录
|
|
"""
|
|
self.output_dir = output_dir
|
|
if not os.path.exists(output_dir):
|
|
os.makedirs(output_dir)
|
|
|
|
# 初始化日志文件
|
|
self.overall_log_file = os.path.join(output_dir, "repo_rpmlist_check_results.log")
|
|
self.fail_log_file = os.path.join(output_dir, "repo_rpmlist_check_fail.log")
|
|
|
|
# 初始化日志文件
|
|
self._init_log_files()
|
|
|
|
def _init_log_files(self):
|
|
"""初始化日志文件"""
|
|
# 清空或创建结果文件
|
|
with open(self.overall_log_file, 'w', encoding='utf-8') as f:
|
|
f.write("=== 整体测试结果 ===
|
|
|
|
")
|
|
|
|
with open(self.fail_log_file, 'w', encoding='utf-8') as f:
|
|
f.write("=== 失败记录 ===
|
|
|
|
")
|
|
|
|
def check_repository(self, repo_config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
检查单个仓库配置
|
|
|
|
Args:
|
|
repo_config: 仓库配置
|
|
|
|
Returns:
|
|
检查结果
|
|
"""
|
|
product = repo_config["product"]
|
|
base_url = repo_config["base_url"]
|
|
|
|
results = {}
|
|
|
|
for arch in repo_config["architectures"]:
|
|
for repo_type in repo_config["repo_types"]:
|
|
# 构建完整的仓库URL
|
|
if "8U8" not in product:
|
|
repo_url = f"{base_url}{repo_type}/{arch}/"
|
|
else:
|
|
repo_url = f"{base_url}{repo_type}/{arch}/os/"
|
|
|
|
# 生成输出文件名
|
|
output_file = os.path.join(self.output_dir, f"rpm_check_{product}_{arch}_{repo_type}.txt")
|
|
|
|
logger.info(f"开始测试: {product} {arch} {repo_type}")
|
|
logger.info(f"测试仓库地址: {repo_url}")
|
|
|
|
try:
|
|
# 创建解析器
|
|
parser = RepoDataParser(repo_url)
|
|
|
|
# 获取HTML中的RPM列表
|
|
html = parser.get_html_content()
|
|
if not html:
|
|
raise Exception("获取HTML内容失败")
|
|
|
|
rpm_list_html = parser.parse_rpm_list_from_html(html)
|
|
|
|
# 获取数据库中的RPM列表
|
|
sqlite_path = parser.download_and_extract_sqlite()
|
|
if not sqlite_path:
|
|
raise Exception("获取数据库文件失败")
|
|
|
|
rpm_list_sqlite = parser.get_rpm_list_from_sqlite(sqlite_path)
|
|
|
|
# 比较两个列表
|
|
comparison_result = parser.compare_rpm_lists(rpm_list_html, rpm_list_sqlite)
|
|
formatted_result = parser.format_comparison_result(comparison_result)
|
|
|
|
# 记录结果
|
|
test_passed = comparison_result["is_identical"]
|
|
test_key = f"{product}_{arch}_{repo_type}"
|
|
results[test_key] = {
|
|
"product": product,
|
|
"arch": arch,
|
|
"repo_type": repo_type,
|
|
"repo_url": repo_url,
|
|
"passed": test_passed,
|
|
"result": formatted_result,
|
|
"html_count": comparison_result["list1_count"],
|
|
"sqlite_count": comparison_result["list2_count"]
|
|
}
|
|
|
|
# 输出到日志文件
|
|
self._write_to_overall_log(product, arch, repo_type, repo_url, formatted_result)
|
|
|
|
# 如果测试失败,记录到失败日志
|
|
if not test_passed:
|
|
self._write_to_fail_log(product, arch, repo_type, repo_url, formatted_result)
|
|
|
|
except Exception as e:
|
|
error_msg = f"处理仓库出错: {str(e)}"
|
|
logger.error(error_msg)
|
|
|
|
# 记录错误信息
|
|
test_key = f"{product}_{arch}_{repo_type}"
|
|
results[test_key] = {
|
|
"product": product,
|
|
"arch": arch,
|
|
"repo_type": repo_type,
|
|
"repo_url": repo_url,
|
|
"passed": False,
|
|
"error": error_msg
|
|
}
|
|
|
|
# 输出到日志文件
|
|
self._write_error_to_logs(product, arch, repo_type, repo_url, error_msg)
|
|
|
|
return results
|
|
|
|
def check_all_repositories(self, repo_configs: List[Dict[str, Any]], max_workers: int = 5) -> Dict[str, Any]:
|
|
"""
|
|
并行检查所有仓库
|
|
|
|
Args:
|
|
repo_configs: 仓库配置列表
|
|
max_workers: 最大并行工作线程数
|
|
|
|
Returns:
|
|
所有检查结果
|
|
"""
|
|
all_results = {}
|
|
start_time = time.time()
|
|
|
|
# 使用线程池并行执行检查
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
# 提交所有检查任务
|
|
future_to_config = {executor.submit(self.check_repository, config): config for config in repo_configs}
|
|
|
|
# 收集结果
|
|
for future in concurrent.futures.as_completed(future_to_config):
|
|
config = future_to_config[future]
|
|
try:
|
|
result = future.result()
|
|
all_results.update(result)
|
|
except Exception as e:
|
|
logger.error(f"检查仓库 {config['product']} 时出错: {str(e)}")
|
|
|
|
end_time = time.time()
|
|
duration = end_time - start_time
|
|
|
|
# 记录总结信息
|
|
total_tests = len(all_results)
|
|
passed_tests = sum(1 for result in all_results.values() if result.get("passed", False))
|
|
|
|
summary = f"""
|
|
====== 测试总结 ======
|
|
总测试数: {total_tests}
|
|
通过测试: {passed_tests}
|
|
失败测试: {total_tests - passed_tests}
|
|
测试耗时: {duration:.2f} 秒
|
|
"""
|
|
|
|
logger.info(summary)
|
|
|
|
# 将总结写入整体日志
|
|
with open(self.overall_log_file, 'a', encoding='utf-8') as f:
|
|
f.write(summary)
|
|
|
|
return all_results
|
|
|
|
def _write_to_overall_log(self, product: str, arch: str, repo_type: str, repo_url: str, result_lines: List[str]):
|
|
"""写入结果到整体日志"""
|
|
with open(self.overall_log_file, 'a', encoding='utf-8') as f:
|
|
f.write(f"
|
|
=== {product} {arch} {repo_type} ===
|
|
")
|
|
f.write(f"Repository URL: {repo_url}
|
|
")
|
|
for line in result_lines:
|
|
f.write(line + '
|
|
')
|
|
f.write("
|
|
")
|
|
|
|
def _write_to_fail_log(self, product: str, arch: str, repo_type: str, repo_url: str, result_lines: List[str]):
|
|
"""写入结果到失败日志"""
|
|
with open(self.fail_log_file, 'a', encoding='utf-8') as f:
|
|
f.write(f"
|
|
=== {product} {arch} {repo_type} ===
|
|
")
|
|
f.write(f"Repository URL: {repo_url}
|
|
")
|
|
for line in result_lines:
|
|
if "测试通过" not in line:
|
|
f.write(line + '
|
|
')
|
|
f.write("
|
|
")
|
|
|
|
def _write_error_to_logs(self, product: str, arch: str, repo_type: str, repo_url: str, error_msg: str):
|
|
"""写入错误信息到日志"""
|
|
# 写入到整体日志
|
|
with open(self.overall_log_file, 'a', encoding='utf-8') as f:
|
|
f.write(f"
|
|
=== {product} {arch} {repo_type} ===
|
|
")
|
|
f.write(f"Repository URL: {repo_url}
|
|
")
|
|
f.write(f"ERROR: {error_msg}
|
|
|
|
")
|
|
|
|
# 写入到失败日志
|
|
with open(self.fail_log_file, 'a', encoding='utf-8') as f:
|
|
f.write(f"
|
|
=== {product} {arch} {repo_type} ===
|
|
")
|
|
f.write(f"Repository URL: {repo_url}
|
|
")
|
|
f.write(f"ERROR: {error_msg}
|
|
|
|
")
|
|
|
|
# 写入到单独的错误日志
|
|
error_log_file = os.path.join(self.output_dir, f"error_{product}_{arch}_{repo_type}.log")
|
|
with open(error_log_file, 'w', encoding='utf-8') as f:
|
|
f.write(f"Error processing {repo_url}:
|
|
{error_msg}")
|