# -*- coding:utf-8 -*- # 仓库数据解析模块 import os import sqlite3 import logging import xml.etree.ElementTree as ET import time from typing import List, Optional, Dict, Any, Tuple, Set from bs4 import BeautifulSoup from urllib.parse import unquote from common.http_utils import HttpClient from common.file_utils import FileUtils # 获取日志记录器 logger = logging.getLogger(__name__) # XML命名空间 NAMESPACES = { 'common': 'http://linux.duke.edu/metadata/common', 'rpm': 'http://linux.duke.edu/metadata/rpm', 'repo': 'http://linux.duke.edu/metadata/repo' } class RepoDataParser: """ 仓库数据解析器,用于解析RPM仓库数据 支持从HTML页面、SQLite数据库和repodata.xml文件中获取软件包列表 """ def __init__(self, base_url: str): """ 初始化解析器 Args: base_url: 仓库基础URL """ self.base_url = base_url self.http_client = HttpClient() self.file_utils = FileUtils() self.cache_dir = os.path.join(os.getcwd(), ".cache") # 确保缓存目录存在 if not os.path.exists(self.cache_dir): os.makedirs(self.cache_dir) def get_html_content(self, path: str = "Packages/") -> Optional[str]: """ 获取HTML内容 Args: path: 相对路径,默认为"Packages/" Returns: HTML内容,失败返回None """ url = HttpClient.join_url(self.base_url, path) logger.debug(f"获取HTML内容: {url}") response = self.http_client.get(url) if not response: logger.warning(f"获取HTML内容失败: {url}") return None return response.text def parse_rpm_list_from_html(self, html: str) -> List[str]: """ 从HTML中解析RPM列表 Args: html: HTML内容 Returns: RPM列表 """ if not html: logger.warning("HTML内容为空,无法解析") return [] rpmlist = [] try: soup = BeautifulSoup(html, "html.parser") raw_links = [item.get('href') for item in soup.find_all('a')] # 筛选出rpm文件并修正URL编码问题 for link in raw_links: if link and link.lower().endswith(".rpm"): # 修正URL编码问题 link = unquote(link) rpmlist.append(link) logger.info(f"从HTML解析到{len(rpmlist)}个RPM文件") return rpmlist except Exception as e: logger.error(f"解析HTML内容失败: {str(e)}") return [] def download_and_extract_sqlite(self) -> Optional[str]: """ 下载并解压数据库文件 Returns: 解压后的数据库文件路径,失败返回None """ repodata_url = HttpClient.join_url(self.base_url, "repodata/") response = self.http_client.get(repodata_url) if not response: logger.error(f"获取repodata目录失败: {repodata_url}") return None html = response.text soup = BeautifulSoup(html, "html.parser") links = [item.get('href') for item in soup.find_all('a')] # 生成缓存文件名 cache_key = self.base_url.replace('://', '_').replace('/', '_').replace(':', '_') sqlite_file_path = os.path.join(self.cache_dir, f"{cache_key}_primary.sqlite") # 如果缓存文件存在且不超过24小时,直接返回 if os.path.exists(sqlite_file_path): file_age = time.time() - os.path.getmtime(sqlite_file_path) if file_age < 86400: # 24小时 = 86400秒 logger.info(f"使用缓存的SQLite文件: {sqlite_file_path}") return sqlite_file_path # 尝试各种数据库文件格式 for link in links: if not link: continue # 处理BZ2格式 if link.endswith("primary.sqlite.bz2"): sqlite_url = HttpClient.join_url(repodata_url, link) logger.info(f"发现BZ2数据库文件: {sqlite_url}") # 下载文件 sqlite_bz2_path = os.path.join(self.cache_dir, f"{cache_key}_primary.sqlite.bz2") if not self.http_client.download_file(sqlite_url, sqlite_bz2_path): continue # 解压文件 if not self.file_utils.extract_bz2(sqlite_bz2_path, sqlite_file_path): continue return sqlite_file_path # 处理XZ格式 elif link.endswith("primary.sqlite.xz"): sqlite_url = HttpClient.join_url(repodata_url, link) logger.info(f"发现XZ数据库文件: {sqlite_url}") # 下载文件 sqlite_xz_path = os.path.join(self.cache_dir, f"{cache_key}_primary.sqlite.xz") if not self.http_client.download_file(sqlite_url, sqlite_xz_path): continue # 解压文件 if not self.file_utils.extract_xz(sqlite_xz_path, sqlite_file_path): continue return sqlite_file_path # 处理GZ格式 elif link.endswith("primary.sqlite.gz"): sqlite_url = HttpClient.join_url(repodata_url, link) logger.info(f"发现GZ数据库文件: {sqlite_url}") # 下载文件 sqlite_gz_path = os.path.join(self.cache_dir, f"{cache_key}_primary.sqlite.gz") if not self.http_client.download_file(sqlite_url, sqlite_gz_path): continue # 解压文件 if not self.file_utils.extract_gz(sqlite_gz_path, sqlite_file_path): continue return sqlite_file_path logger.error(f"未找到支持的数据库文件: {repodata_url}") return None def get_rpm_list_from_sqlite(self, sqlite_path: str) -> List[str]: """ 从SQLite数据库获取RPM列表 Args: sqlite_path: SQLite数据库文件路径 Returns: RPM列表 """ if not os.path.exists(sqlite_path): logger.error(f"SQLite数据库文件不存在: {sqlite_path}") return [] rpmlist = [] try: # 连接数据库 conn = sqlite3.connect(sqlite_path) cursor = conn.cursor() # 查询软件包信息 cursor.execute(""" SELECT name, version, release, epoch, arch FROM packages ORDER BY name """) # 生成RPM文件名 for row in cursor: name, version, release, epoch, arch = row # 处理epoch epoch_str = "" if not epoch or epoch == "0" else f"{epoch}:" rpm = f"{name}-{epoch_str}{version}-{release}.{arch}.rpm" rpmlist.append(rpm) logger.info(f"从SQLite获取到{len(rpmlist)}个RPM文件") # 关闭连接 cursor.close() conn.close() return rpmlist except Exception as e: logger.error(f"从SQLite获取RPM列表失败: {str(e)}") return [] def download_repodata_xml(self) -> Optional[str]: """ 下载repodata.xml文件 Returns: 下载的文件路径,失败返回None """ # 首先尝试获取repomd.xml repomd_url = HttpClient.join_url(self.base_url, "repodata/repomd.xml") logger.info(f"尝试下载repomd.xml: {repomd_url}") # 生成缓存文件名 cache_key = self.base_url.replace('://', '_').replace('/', '_').replace(':', '_') repomd_path = os.path.join(self.cache_dir, f"{cache_key}_repomd.xml") # 下载repomd.xml if not self.http_client.download_file(repomd_url, repomd_path): logger.error(f"下载repomd.xml失败: {repomd_url}") return None # 解析repomd.xml以找到primary.xml的位置 try: tree = ET.parse(repomd_path) root = tree.getroot() # 查找primary.xml的位置 primary_location = None for data_element in root.findall('.//{http://linux.duke.edu/metadata/repo}data'): if data_element.get('type') == 'primary': location_element = data_element.find('.//{http://linux.duke.edu/metadata/repo}location') if location_element is not None: primary_location = location_element.get('href') break if not primary_location: logger.error("在repomd.xml中未找到primary.xml的位置") return None # 下载primary.xml primary_url = HttpClient.join_url(self.base_url, primary_location) logger.info(f"找到primary.xml位置: {primary_url}") # 检查是否是压缩文件 primary_path = os.path.join(self.cache_dir, f"{cache_key}_primary.xml") compressed_path = None if primary_location.endswith('.gz'): compressed_path = os.path.join(self.cache_dir, f"{cache_key}_primary.xml.gz") if not self.http_client.download_file(primary_url, compressed_path): logger.error(f"下载primary.xml.gz失败: {primary_url}") return None if not self.file_utils.extract_gz(compressed_path, primary_path): logger.error(f"解压primary.xml.gz失败") return None elif primary_location.endswith('.bz2'): compressed_path = os.path.join(self.cache_dir, f"{cache_key}_primary.xml.bz2") if not self.http_client.download_file(primary_url, compressed_path): logger.error(f"下载primary.xml.bz2失败: {primary_url}") return None if not self.file_utils.extract_bz2(compressed_path, primary_path): logger.error(f"解压primary.xml.bz2失败") return None elif primary_location.endswith('.xz'): compressed_path = os.path.join(self.cache_dir, f"{cache_key}_primary.xml.xz") if not self.http_client.download_file(primary_url, compressed_path): logger.error(f"下载primary.xml.xz失败: {primary_url}") return None if not self.file_utils.extract_xz(compressed_path, primary_path): logger.error(f"解压primary.xml.xz失败") return None else: # 直接下载未压缩的XML if not self.http_client.download_file(primary_url, primary_path): logger.error(f"下载primary.xml失败: {primary_url}") return None return primary_path except Exception as e: logger.error(f"处理repomd.xml时出错: {str(e)}") return None def get_rpm_list_from_xml(self, xml_path: str) -> List[str]: """ 从primary.xml文件获取RPM列表 Args: xml_path: primary.xml文件路径 Returns: RPM列表 """ if not os.path.exists(xml_path): logger.error(f"XML文件不存在: {xml_path}") return [] rpmlist = [] try: # 解析XML文件 tree = ET.parse(xml_path) root = tree.getroot() # 查找所有包元素 for pkg_element in root.findall('.//{http://linux.duke.edu/metadata/common}package'): if pkg_element.get('type') != 'rpm': continue # 获取包名 name_element = pkg_element.find('.//{http://linux.duke.edu/metadata/common}name') if name_element is None: continue name = name_element.text # 获取版本信息 version_element = pkg_element.find('.//{http://linux.duke.edu/metadata/common}version') if version_element is None: continue epoch = version_element.get('epoch', '0') version = version_element.get('ver', '') release = version_element.get('rel', '') # 获取架构 arch_element = pkg_element.find('.//{http://linux.duke.edu/metadata/common}arch') if arch_element is None: continue arch = arch_element.text # 处理epoch epoch_str = "" if not epoch or epoch == "0" else f"{epoch}:" # 生成RPM文件名 rpm = f"{name}-{epoch_str}{version}-{release}.{arch}.rpm" rpmlist.append(rpm) logger.info(f"从XML获取到{len(rpmlist)}个RPM文件") return rpmlist except Exception as e: logger.error(f"从XML获取RPM列表失败: {str(e)}") return [] def compare_rpm_lists(self, list1: List[str], list2: List[str]) -> Dict[str, Any]: """ 比较两个RPM列表的差异 Args: list1: 第一个RPM列表 list2: 第二个RPM列表 Returns: 比较结果 """ # 标准化RPM名称(去除版本信息,只保留包名) def normalize_rpm_name(rpm: str) -> str: # 移除.rpm后缀 if rpm.lower().endswith('.rpm'): rpm = rpm[:-4] return rpm # 转换为集合以便比较 set1 = set(list1) set2 = set(list2) # 计算差异 only_in_list1 = sorted(set1 - set2) only_in_list2 = sorted(set2 - set1) # 检查是否完全相同 is_identical = len(only_in_list1) == 0 and len(only_in_list2) == 0 # 构建结果 result = { "is_identical": is_identical, "only_in_list1": only_in_list1, "only_in_list2": only_in_list2, "list1_count": len(list1), "list2_count": len(list2), "common_count": len(set1.intersection(set2)) } return result def format_comparison_result(self, result: Dict[str, Any]) -> List[str]: """ 格式化比较结果 Args: result: 比较结果字典 Returns: 格式化后的文本行列表 """ output_lines = [] # 添加统计信息 output_lines.append(f"列表1软件包数量: {result['list1_count']}") output_lines.append(f"列表2软件包数量: {result['list2_count']}") output_lines.append(f"共同软件包数量: {result.get('common_count', 0)}") output_lines.append("") if result["is_identical"]: output_lines.append("两个软件包列表完全相同") output_lines.append("#####【测试通过】#####") return output_lines # 报告第一个列表独有的项目 if result["only_in_list1"]: output_lines.append(f"仅在第一个列表中存在的软件包({len(result['only_in_list1'])}个):") output_lines.extend(result["only_in_list1"]) else: output_lines.append("没有仅在第一个列表中存在的软件包。") output_lines.append("") # 报告第二个列表独有的项目 if result["only_in_list2"]: output_lines.append(f"仅在第二个列表中存在的软件包({len(result['only_in_list2'])}个):") output_lines.extend(result["only_in_list2"]) else: output_lines.append("没有仅在第二个列表中存在的软件包。") return output_lines def get_all_rpm_sources(self) -> Dict[str, List[str]]: """ 获取所有可用的RPM列表来源 Returns: 包含不同来源RPM列表的字典 """ results = {} # 1. 从HTML获取RPM列表 html = self.get_html_content() if html: rpm_list_html = self.parse_rpm_list_from_html(html) results["html"] = rpm_list_html # 2. 从SQLite获取RPM列表 sqlite_path = self.download_and_extract_sqlite() if sqlite_path: rpm_list_sqlite = self.get_rpm_list_from_sqlite(sqlite_path) results["sqlite"] = rpm_list_sqlite # 3. 从XML获取RPM列表 xml_path = self.download_repodata_xml() if xml_path: rpm_list_xml = self.get_rpm_list_from_xml(xml_path) results["xml"] = rpm_list_xml return results def compare_all_sources(self) -> Dict[str, Any]: """ 比较所有来源的RPM列表 Returns: 比较结果 """ sources = self.get_all_rpm_sources() if not sources: logger.error("没有找到任何RPM列表来源") return {"error": "没有找到任何RPM列表来源"} results = {} # 如果有HTML和SQLite来源,比较它们 if "html" in sources and "sqlite" in sources: html_vs_sqlite = self.compare_rpm_lists(sources["html"], sources["sqlite"]) results["html_vs_sqlite"] = html_vs_sqlite # 如果有HTML和XML来源,比较它们 if "html" in sources and "xml" in sources: html_vs_xml = self.compare_rpm_lists(sources["html"], sources["xml"]) results["html_vs_xml"] = html_vs_xml # 如果有SQLite和XML来源,比较它们 if "sqlite" in sources and "xml" in sources: sqlite_vs_xml = self.compare_rpm_lists(sources["sqlite"], sources["xml"]) results["sqlite_vs_xml"] = sqlite_vs_xml return results