Files
REPO_TEST/repodata/parser.py
2025-07-23 20:41:13 +08:00

520 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding:utf-8 -*-
# 仓库数据解析模块
import os
import sqlite3
import logging
import xml.etree.ElementTree as ET
import time
from typing import List, Optional, Dict, Any, Tuple, Set
from bs4 import BeautifulSoup
from urllib.parse import unquote
from common.http_utils import HttpClient
from common.file_utils import FileUtils
# 获取日志记录器
logger = logging.getLogger(__name__)
# XML命名空间
NAMESPACES = {
'common': 'http://linux.duke.edu/metadata/common',
'rpm': 'http://linux.duke.edu/metadata/rpm',
'repo': 'http://linux.duke.edu/metadata/repo'
}
class RepoDataParser:
"""
仓库数据解析器用于解析RPM仓库数据
支持从HTML页面、SQLite数据库和repodata.xml文件中获取软件包列表
"""
def __init__(self, base_url: str):
"""
初始化解析器
Args:
base_url: 仓库基础URL
"""
self.base_url = base_url
self.http_client = HttpClient()
self.file_utils = FileUtils()
self.cache_dir = os.path.join(os.getcwd(), ".cache")
# 确保缓存目录存在
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
def get_html_content(self, path: str = "Packages/") -> Optional[str]:
"""
获取HTML内容
Args:
path: 相对路径,默认为"Packages/"
Returns:
HTML内容失败返回None
"""
url = HttpClient.join_url(self.base_url, path)
logger.debug(f"获取HTML内容: {url}")
response = self.http_client.get(url)
if not response:
logger.warning(f"获取HTML内容失败: {url}")
return None
return response.text
def parse_rpm_list_from_html(self, html: str) -> List[str]:
"""
从HTML中解析RPM列表
Args:
html: HTML内容
Returns:
RPM列表
"""
if not html:
logger.warning("HTML内容为空无法解析")
return []
rpmlist = []
try:
soup = BeautifulSoup(html, "html.parser")
raw_links = [item.get('href') for item in soup.find_all('a')]
# 筛选出rpm文件并修正URL编码问题
for link in raw_links:
if link and link.lower().endswith(".rpm"):
# 修正URL编码问题
link = unquote(link)
rpmlist.append(link)
logger.info(f"从HTML解析到{len(rpmlist)}个RPM文件")
return rpmlist
except Exception as e:
logger.error(f"解析HTML内容失败: {str(e)}")
return []
def download_and_extract_sqlite(self) -> Optional[str]:
"""
下载并解压数据库文件
Returns:
解压后的数据库文件路径失败返回None
"""
repodata_url = HttpClient.join_url(self.base_url, "repodata/")
response = self.http_client.get(repodata_url)
if not response:
logger.error(f"获取repodata目录失败: {repodata_url}")
return None
html = response.text
soup = BeautifulSoup(html, "html.parser")
links = [item.get('href') for item in soup.find_all('a')]
# 生成缓存文件名
cache_key = self.base_url.replace('://', '_').replace('/', '_').replace(':', '_')
sqlite_file_path = os.path.join(self.cache_dir, f"{cache_key}_primary.sqlite")
# 如果缓存文件存在且不超过24小时直接返回
if os.path.exists(sqlite_file_path):
file_age = time.time() - os.path.getmtime(sqlite_file_path)
if file_age < 86400: # 24小时 = 86400秒
logger.info(f"使用缓存的SQLite文件: {sqlite_file_path}")
return sqlite_file_path
# 尝试各种数据库文件格式
for link in links:
if not link:
continue
# 处理BZ2格式
if link.endswith("primary.sqlite.bz2"):
sqlite_url = HttpClient.join_url(repodata_url, link)
logger.info(f"发现BZ2数据库文件: {sqlite_url}")
# 下载文件
sqlite_bz2_path = os.path.join(self.cache_dir, f"{cache_key}_primary.sqlite.bz2")
if not self.http_client.download_file(sqlite_url, sqlite_bz2_path):
continue
# 解压文件
if not self.file_utils.extract_bz2(sqlite_bz2_path, sqlite_file_path):
continue
return sqlite_file_path
# 处理XZ格式
elif link.endswith("primary.sqlite.xz"):
sqlite_url = HttpClient.join_url(repodata_url, link)
logger.info(f"发现XZ数据库文件: {sqlite_url}")
# 下载文件
sqlite_xz_path = os.path.join(self.cache_dir, f"{cache_key}_primary.sqlite.xz")
if not self.http_client.download_file(sqlite_url, sqlite_xz_path):
continue
# 解压文件
if not self.file_utils.extract_xz(sqlite_xz_path, sqlite_file_path):
continue
return sqlite_file_path
# 处理GZ格式
elif link.endswith("primary.sqlite.gz"):
sqlite_url = HttpClient.join_url(repodata_url, link)
logger.info(f"发现GZ数据库文件: {sqlite_url}")
# 下载文件
sqlite_gz_path = os.path.join(self.cache_dir, f"{cache_key}_primary.sqlite.gz")
if not self.http_client.download_file(sqlite_url, sqlite_gz_path):
continue
# 解压文件
if not self.file_utils.extract_gz(sqlite_gz_path, sqlite_file_path):
continue
return sqlite_file_path
logger.error(f"未找到支持的数据库文件: {repodata_url}")
return None
def get_rpm_list_from_sqlite(self, sqlite_path: str) -> List[str]:
"""
从SQLite数据库获取RPM列表
Args:
sqlite_path: SQLite数据库文件路径
Returns:
RPM列表
"""
if not os.path.exists(sqlite_path):
logger.error(f"SQLite数据库文件不存在: {sqlite_path}")
return []
rpmlist = []
try:
# 连接数据库
conn = sqlite3.connect(sqlite_path)
cursor = conn.cursor()
# 查询软件包信息
cursor.execute("""
SELECT name, version, release, epoch, arch
FROM packages
ORDER BY name
""")
# 生成RPM文件名
for row in cursor:
name, version, release, epoch, arch = row
# 处理epoch
epoch_str = "" if not epoch or epoch == "0" else f"{epoch}:"
rpm = f"{name}-{epoch_str}{version}-{release}.{arch}.rpm"
rpmlist.append(rpm)
logger.info(f"从SQLite获取到{len(rpmlist)}个RPM文件")
# 关闭连接
cursor.close()
conn.close()
return rpmlist
except Exception as e:
logger.error(f"从SQLite获取RPM列表失败: {str(e)}")
return []
def download_repodata_xml(self) -> Optional[str]:
"""
下载repodata.xml文件
Returns:
下载的文件路径失败返回None
"""
# 首先尝试获取repomd.xml
repomd_url = HttpClient.join_url(self.base_url, "repodata/repomd.xml")
logger.info(f"尝试下载repomd.xml: {repomd_url}")
# 生成缓存文件名
cache_key = self.base_url.replace('://', '_').replace('/', '_').replace(':', '_')
repomd_path = os.path.join(self.cache_dir, f"{cache_key}_repomd.xml")
# 下载repomd.xml
if not self.http_client.download_file(repomd_url, repomd_path):
logger.error(f"下载repomd.xml失败: {repomd_url}")
return None
# 解析repomd.xml以找到primary.xml的位置
try:
tree = ET.parse(repomd_path)
root = tree.getroot()
# 查找primary.xml的位置
primary_location = None
for data_element in root.findall('.//{http://linux.duke.edu/metadata/repo}data'):
if data_element.get('type') == 'primary':
location_element = data_element.find('.//{http://linux.duke.edu/metadata/repo}location')
if location_element is not None:
primary_location = location_element.get('href')
break
if not primary_location:
logger.error("在repomd.xml中未找到primary.xml的位置")
return None
# 下载primary.xml
primary_url = HttpClient.join_url(self.base_url, primary_location)
logger.info(f"找到primary.xml位置: {primary_url}")
# 检查是否是压缩文件
primary_path = os.path.join(self.cache_dir, f"{cache_key}_primary.xml")
compressed_path = None
if primary_location.endswith('.gz'):
compressed_path = os.path.join(self.cache_dir, f"{cache_key}_primary.xml.gz")
if not self.http_client.download_file(primary_url, compressed_path):
logger.error(f"下载primary.xml.gz失败: {primary_url}")
return None
if not self.file_utils.extract_gz(compressed_path, primary_path):
logger.error(f"解压primary.xml.gz失败")
return None
elif primary_location.endswith('.bz2'):
compressed_path = os.path.join(self.cache_dir, f"{cache_key}_primary.xml.bz2")
if not self.http_client.download_file(primary_url, compressed_path):
logger.error(f"下载primary.xml.bz2失败: {primary_url}")
return None
if not self.file_utils.extract_bz2(compressed_path, primary_path):
logger.error(f"解压primary.xml.bz2失败")
return None
elif primary_location.endswith('.xz'):
compressed_path = os.path.join(self.cache_dir, f"{cache_key}_primary.xml.xz")
if not self.http_client.download_file(primary_url, compressed_path):
logger.error(f"下载primary.xml.xz失败: {primary_url}")
return None
if not self.file_utils.extract_xz(compressed_path, primary_path):
logger.error(f"解压primary.xml.xz失败")
return None
else:
# 直接下载未压缩的XML
if not self.http_client.download_file(primary_url, primary_path):
logger.error(f"下载primary.xml失败: {primary_url}")
return None
return primary_path
except Exception as e:
logger.error(f"处理repomd.xml时出错: {str(e)}")
return None
def get_rpm_list_from_xml(self, xml_path: str) -> List[str]:
"""
从primary.xml文件获取RPM列表
Args:
xml_path: primary.xml文件路径
Returns:
RPM列表
"""
if not os.path.exists(xml_path):
logger.error(f"XML文件不存在: {xml_path}")
return []
rpmlist = []
try:
# 解析XML文件
tree = ET.parse(xml_path)
root = tree.getroot()
# 查找所有包元素
for pkg_element in root.findall('.//{http://linux.duke.edu/metadata/common}package'):
if pkg_element.get('type') != 'rpm':
continue
# 获取包名
name_element = pkg_element.find('.//{http://linux.duke.edu/metadata/common}name')
if name_element is None:
continue
name = name_element.text
# 获取版本信息
version_element = pkg_element.find('.//{http://linux.duke.edu/metadata/common}version')
if version_element is None:
continue
epoch = version_element.get('epoch', '0')
version = version_element.get('ver', '')
release = version_element.get('rel', '')
# 获取架构
arch_element = pkg_element.find('.//{http://linux.duke.edu/metadata/common}arch')
if arch_element is None:
continue
arch = arch_element.text
# 处理epoch
epoch_str = "" if not epoch or epoch == "0" else f"{epoch}:"
# 生成RPM文件名
rpm = f"{name}-{epoch_str}{version}-{release}.{arch}.rpm"
rpmlist.append(rpm)
logger.info(f"从XML获取到{len(rpmlist)}个RPM文件")
return rpmlist
except Exception as e:
logger.error(f"从XML获取RPM列表失败: {str(e)}")
return []
def compare_rpm_lists(self, list1: List[str], list2: List[str]) -> Dict[str, Any]:
"""
比较两个RPM列表的差异
Args:
list1: 第一个RPM列表
list2: 第二个RPM列表
Returns:
比较结果
"""
# 标准化RPM名称去除版本信息只保留包名
def normalize_rpm_name(rpm: str) -> str:
# 移除.rpm后缀
if rpm.lower().endswith('.rpm'):
rpm = rpm[:-4]
return rpm
# 转换为集合以便比较
set1 = set(list1)
set2 = set(list2)
# 计算差异
only_in_list1 = sorted(set1 - set2)
only_in_list2 = sorted(set2 - set1)
# 检查是否完全相同
is_identical = len(only_in_list1) == 0 and len(only_in_list2) == 0
# 构建结果
result = {
"is_identical": is_identical,
"only_in_list1": only_in_list1,
"only_in_list2": only_in_list2,
"list1_count": len(list1),
"list2_count": len(list2),
"common_count": len(set1.intersection(set2))
}
return result
def format_comparison_result(self, result: Dict[str, Any]) -> List[str]:
"""
格式化比较结果
Args:
result: 比较结果字典
Returns:
格式化后的文本行列表
"""
output_lines = []
# 添加统计信息
output_lines.append(f"列表1软件包数量: {result['list1_count']}")
output_lines.append(f"列表2软件包数量: {result['list2_count']}")
output_lines.append(f"共同软件包数量: {result.get('common_count', 0)}")
output_lines.append("")
if result["is_identical"]:
output_lines.append("两个软件包列表完全相同")
output_lines.append("#####【测试通过】#####")
return output_lines
# 报告第一个列表独有的项目
if result["only_in_list1"]:
output_lines.append(f"仅在第一个列表中存在的软件包({len(result['only_in_list1'])}个):")
output_lines.extend(result["only_in_list1"])
else:
output_lines.append("没有仅在第一个列表中存在的软件包。")
output_lines.append("")
# 报告第二个列表独有的项目
if result["only_in_list2"]:
output_lines.append(f"仅在第二个列表中存在的软件包({len(result['only_in_list2'])}个):")
output_lines.extend(result["only_in_list2"])
else:
output_lines.append("没有仅在第二个列表中存在的软件包。")
return output_lines
def get_all_rpm_sources(self) -> Dict[str, List[str]]:
"""
获取所有可用的RPM列表来源
Returns:
包含不同来源RPM列表的字典
"""
results = {}
# 1. 从HTML获取RPM列表
html = self.get_html_content()
if html:
rpm_list_html = self.parse_rpm_list_from_html(html)
results["html"] = rpm_list_html
# 2. 从SQLite获取RPM列表
sqlite_path = self.download_and_extract_sqlite()
if sqlite_path:
rpm_list_sqlite = self.get_rpm_list_from_sqlite(sqlite_path)
results["sqlite"] = rpm_list_sqlite
# 3. 从XML获取RPM列表
xml_path = self.download_repodata_xml()
if xml_path:
rpm_list_xml = self.get_rpm_list_from_xml(xml_path)
results["xml"] = rpm_list_xml
return results
def compare_all_sources(self) -> Dict[str, Any]:
"""
比较所有来源的RPM列表
Returns:
比较结果
"""
sources = self.get_all_rpm_sources()
if not sources:
logger.error("没有找到任何RPM列表来源")
return {"error": "没有找到任何RPM列表来源"}
results = {}
# 如果有HTML和SQLite来源比较它们
if "html" in sources and "sqlite" in sources:
html_vs_sqlite = self.compare_rpm_lists(sources["html"], sources["sqlite"])
results["html_vs_sqlite"] = html_vs_sqlite
# 如果有HTML和XML来源比较它们
if "html" in sources and "xml" in sources:
html_vs_xml = self.compare_rpm_lists(sources["html"], sources["xml"])
results["html_vs_xml"] = html_vs_xml
# 如果有SQLite和XML来源比较它们
if "sqlite" in sources and "xml" in sources:
sqlite_vs_xml = self.compare_rpm_lists(sources["sqlite"], sources["xml"])
results["sqlite_vs_xml"] = sqlite_vs_xml
return results