520 lines
18 KiB
Python
520 lines
18 KiB
Python
# -*- coding:utf-8 -*-
|
||
# 仓库数据解析模块
|
||
|
||
import os
|
||
import sqlite3
|
||
import logging
|
||
import xml.etree.ElementTree as ET
|
||
import time
|
||
from typing import List, Optional, Dict, Any, Tuple, Set
|
||
from bs4 import BeautifulSoup
|
||
from urllib.parse import unquote
|
||
|
||
from common.http_utils import HttpClient
|
||
from common.file_utils import FileUtils
|
||
|
||
# 获取日志记录器
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# XML命名空间
|
||
NAMESPACES = {
|
||
'common': 'http://linux.duke.edu/metadata/common',
|
||
'rpm': 'http://linux.duke.edu/metadata/rpm',
|
||
'repo': 'http://linux.duke.edu/metadata/repo'
|
||
}
|
||
|
||
class RepoDataParser:
|
||
"""
|
||
仓库数据解析器,用于解析RPM仓库数据
|
||
|
||
支持从HTML页面、SQLite数据库和repodata.xml文件中获取软件包列表
|
||
"""
|
||
|
||
def __init__(self, base_url: str):
|
||
"""
|
||
初始化解析器
|
||
|
||
Args:
|
||
base_url: 仓库基础URL
|
||
"""
|
||
self.base_url = base_url
|
||
self.http_client = HttpClient()
|
||
self.file_utils = FileUtils()
|
||
self.cache_dir = os.path.join(os.getcwd(), ".cache")
|
||
|
||
# 确保缓存目录存在
|
||
if not os.path.exists(self.cache_dir):
|
||
os.makedirs(self.cache_dir)
|
||
|
||
def get_html_content(self, path: str = "Packages/") -> Optional[str]:
|
||
"""
|
||
获取HTML内容
|
||
|
||
Args:
|
||
path: 相对路径,默认为"Packages/"
|
||
|
||
Returns:
|
||
HTML内容,失败返回None
|
||
"""
|
||
url = HttpClient.join_url(self.base_url, path)
|
||
logger.debug(f"获取HTML内容: {url}")
|
||
|
||
response = self.http_client.get(url)
|
||
if not response:
|
||
logger.warning(f"获取HTML内容失败: {url}")
|
||
return None
|
||
|
||
return response.text
|
||
|
||
def parse_rpm_list_from_html(self, html: str) -> List[str]:
|
||
"""
|
||
从HTML中解析RPM列表
|
||
|
||
Args:
|
||
html: HTML内容
|
||
|
||
Returns:
|
||
RPM列表
|
||
"""
|
||
if not html:
|
||
logger.warning("HTML内容为空,无法解析")
|
||
return []
|
||
|
||
rpmlist = []
|
||
try:
|
||
soup = BeautifulSoup(html, "html.parser")
|
||
raw_links = [item.get('href') for item in soup.find_all('a')]
|
||
|
||
# 筛选出rpm文件并修正URL编码问题
|
||
for link in raw_links:
|
||
if link and link.lower().endswith(".rpm"):
|
||
# 修正URL编码问题
|
||
link = unquote(link)
|
||
rpmlist.append(link)
|
||
|
||
logger.info(f"从HTML解析到{len(rpmlist)}个RPM文件")
|
||
return rpmlist
|
||
|
||
except Exception as e:
|
||
logger.error(f"解析HTML内容失败: {str(e)}")
|
||
return []
|
||
|
||
def download_and_extract_sqlite(self) -> Optional[str]:
|
||
"""
|
||
下载并解压数据库文件
|
||
|
||
Returns:
|
||
解压后的数据库文件路径,失败返回None
|
||
"""
|
||
repodata_url = HttpClient.join_url(self.base_url, "repodata/")
|
||
response = self.http_client.get(repodata_url)
|
||
|
||
if not response:
|
||
logger.error(f"获取repodata目录失败: {repodata_url}")
|
||
return None
|
||
|
||
html = response.text
|
||
soup = BeautifulSoup(html, "html.parser")
|
||
links = [item.get('href') for item in soup.find_all('a')]
|
||
|
||
# 生成缓存文件名
|
||
cache_key = self.base_url.replace('://', '_').replace('/', '_').replace(':', '_')
|
||
sqlite_file_path = os.path.join(self.cache_dir, f"{cache_key}_primary.sqlite")
|
||
|
||
# 如果缓存文件存在且不超过24小时,直接返回
|
||
if os.path.exists(sqlite_file_path):
|
||
file_age = time.time() - os.path.getmtime(sqlite_file_path)
|
||
if file_age < 86400: # 24小时 = 86400秒
|
||
logger.info(f"使用缓存的SQLite文件: {sqlite_file_path}")
|
||
return sqlite_file_path
|
||
|
||
# 尝试各种数据库文件格式
|
||
for link in links:
|
||
if not link:
|
||
continue
|
||
|
||
# 处理BZ2格式
|
||
if link.endswith("primary.sqlite.bz2"):
|
||
sqlite_url = HttpClient.join_url(repodata_url, link)
|
||
logger.info(f"发现BZ2数据库文件: {sqlite_url}")
|
||
|
||
# 下载文件
|
||
sqlite_bz2_path = os.path.join(self.cache_dir, f"{cache_key}_primary.sqlite.bz2")
|
||
if not self.http_client.download_file(sqlite_url, sqlite_bz2_path):
|
||
continue
|
||
|
||
# 解压文件
|
||
if not self.file_utils.extract_bz2(sqlite_bz2_path, sqlite_file_path):
|
||
continue
|
||
|
||
return sqlite_file_path
|
||
|
||
# 处理XZ格式
|
||
elif link.endswith("primary.sqlite.xz"):
|
||
sqlite_url = HttpClient.join_url(repodata_url, link)
|
||
logger.info(f"发现XZ数据库文件: {sqlite_url}")
|
||
|
||
# 下载文件
|
||
sqlite_xz_path = os.path.join(self.cache_dir, f"{cache_key}_primary.sqlite.xz")
|
||
if not self.http_client.download_file(sqlite_url, sqlite_xz_path):
|
||
continue
|
||
|
||
# 解压文件
|
||
if not self.file_utils.extract_xz(sqlite_xz_path, sqlite_file_path):
|
||
continue
|
||
|
||
return sqlite_file_path
|
||
|
||
# 处理GZ格式
|
||
elif link.endswith("primary.sqlite.gz"):
|
||
sqlite_url = HttpClient.join_url(repodata_url, link)
|
||
logger.info(f"发现GZ数据库文件: {sqlite_url}")
|
||
|
||
# 下载文件
|
||
sqlite_gz_path = os.path.join(self.cache_dir, f"{cache_key}_primary.sqlite.gz")
|
||
if not self.http_client.download_file(sqlite_url, sqlite_gz_path):
|
||
continue
|
||
|
||
# 解压文件
|
||
if not self.file_utils.extract_gz(sqlite_gz_path, sqlite_file_path):
|
||
continue
|
||
|
||
return sqlite_file_path
|
||
|
||
logger.error(f"未找到支持的数据库文件: {repodata_url}")
|
||
return None
|
||
|
||
def get_rpm_list_from_sqlite(self, sqlite_path: str) -> List[str]:
|
||
"""
|
||
从SQLite数据库获取RPM列表
|
||
|
||
Args:
|
||
sqlite_path: SQLite数据库文件路径
|
||
|
||
Returns:
|
||
RPM列表
|
||
"""
|
||
if not os.path.exists(sqlite_path):
|
||
logger.error(f"SQLite数据库文件不存在: {sqlite_path}")
|
||
return []
|
||
|
||
rpmlist = []
|
||
try:
|
||
# 连接数据库
|
||
conn = sqlite3.connect(sqlite_path)
|
||
cursor = conn.cursor()
|
||
|
||
# 查询软件包信息
|
||
cursor.execute("""
|
||
SELECT name, version, release, epoch, arch
|
||
FROM packages
|
||
ORDER BY name
|
||
""")
|
||
|
||
# 生成RPM文件名
|
||
for row in cursor:
|
||
name, version, release, epoch, arch = row
|
||
|
||
# 处理epoch
|
||
epoch_str = "" if not epoch or epoch == "0" else f"{epoch}:"
|
||
|
||
rpm = f"{name}-{epoch_str}{version}-{release}.{arch}.rpm"
|
||
rpmlist.append(rpm)
|
||
|
||
logger.info(f"从SQLite获取到{len(rpmlist)}个RPM文件")
|
||
|
||
# 关闭连接
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
return rpmlist
|
||
|
||
except Exception as e:
|
||
logger.error(f"从SQLite获取RPM列表失败: {str(e)}")
|
||
return []
|
||
|
||
def download_repodata_xml(self) -> Optional[str]:
|
||
"""
|
||
下载repodata.xml文件
|
||
|
||
Returns:
|
||
下载的文件路径,失败返回None
|
||
"""
|
||
# 首先尝试获取repomd.xml
|
||
repomd_url = HttpClient.join_url(self.base_url, "repodata/repomd.xml")
|
||
logger.info(f"尝试下载repomd.xml: {repomd_url}")
|
||
|
||
# 生成缓存文件名
|
||
cache_key = self.base_url.replace('://', '_').replace('/', '_').replace(':', '_')
|
||
repomd_path = os.path.join(self.cache_dir, f"{cache_key}_repomd.xml")
|
||
|
||
# 下载repomd.xml
|
||
if not self.http_client.download_file(repomd_url, repomd_path):
|
||
logger.error(f"下载repomd.xml失败: {repomd_url}")
|
||
return None
|
||
|
||
# 解析repomd.xml以找到primary.xml的位置
|
||
try:
|
||
tree = ET.parse(repomd_path)
|
||
root = tree.getroot()
|
||
|
||
# 查找primary.xml的位置
|
||
primary_location = None
|
||
for data_element in root.findall('.//{http://linux.duke.edu/metadata/repo}data'):
|
||
if data_element.get('type') == 'primary':
|
||
location_element = data_element.find('.//{http://linux.duke.edu/metadata/repo}location')
|
||
if location_element is not None:
|
||
primary_location = location_element.get('href')
|
||
break
|
||
|
||
if not primary_location:
|
||
logger.error("在repomd.xml中未找到primary.xml的位置")
|
||
return None
|
||
|
||
# 下载primary.xml
|
||
primary_url = HttpClient.join_url(self.base_url, primary_location)
|
||
logger.info(f"找到primary.xml位置: {primary_url}")
|
||
|
||
# 检查是否是压缩文件
|
||
primary_path = os.path.join(self.cache_dir, f"{cache_key}_primary.xml")
|
||
compressed_path = None
|
||
|
||
if primary_location.endswith('.gz'):
|
||
compressed_path = os.path.join(self.cache_dir, f"{cache_key}_primary.xml.gz")
|
||
if not self.http_client.download_file(primary_url, compressed_path):
|
||
logger.error(f"下载primary.xml.gz失败: {primary_url}")
|
||
return None
|
||
if not self.file_utils.extract_gz(compressed_path, primary_path):
|
||
logger.error(f"解压primary.xml.gz失败")
|
||
return None
|
||
elif primary_location.endswith('.bz2'):
|
||
compressed_path = os.path.join(self.cache_dir, f"{cache_key}_primary.xml.bz2")
|
||
if not self.http_client.download_file(primary_url, compressed_path):
|
||
logger.error(f"下载primary.xml.bz2失败: {primary_url}")
|
||
return None
|
||
if not self.file_utils.extract_bz2(compressed_path, primary_path):
|
||
logger.error(f"解压primary.xml.bz2失败")
|
||
return None
|
||
elif primary_location.endswith('.xz'):
|
||
compressed_path = os.path.join(self.cache_dir, f"{cache_key}_primary.xml.xz")
|
||
if not self.http_client.download_file(primary_url, compressed_path):
|
||
logger.error(f"下载primary.xml.xz失败: {primary_url}")
|
||
return None
|
||
if not self.file_utils.extract_xz(compressed_path, primary_path):
|
||
logger.error(f"解压primary.xml.xz失败")
|
||
return None
|
||
else:
|
||
# 直接下载未压缩的XML
|
||
if not self.http_client.download_file(primary_url, primary_path):
|
||
logger.error(f"下载primary.xml失败: {primary_url}")
|
||
return None
|
||
|
||
return primary_path
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理repomd.xml时出错: {str(e)}")
|
||
return None
|
||
|
||
def get_rpm_list_from_xml(self, xml_path: str) -> List[str]:
|
||
"""
|
||
从primary.xml文件获取RPM列表
|
||
|
||
Args:
|
||
xml_path: primary.xml文件路径
|
||
|
||
Returns:
|
||
RPM列表
|
||
"""
|
||
if not os.path.exists(xml_path):
|
||
logger.error(f"XML文件不存在: {xml_path}")
|
||
return []
|
||
|
||
rpmlist = []
|
||
try:
|
||
# 解析XML文件
|
||
tree = ET.parse(xml_path)
|
||
root = tree.getroot()
|
||
|
||
# 查找所有包元素
|
||
for pkg_element in root.findall('.//{http://linux.duke.edu/metadata/common}package'):
|
||
if pkg_element.get('type') != 'rpm':
|
||
continue
|
||
|
||
# 获取包名
|
||
name_element = pkg_element.find('.//{http://linux.duke.edu/metadata/common}name')
|
||
if name_element is None:
|
||
continue
|
||
name = name_element.text
|
||
|
||
# 获取版本信息
|
||
version_element = pkg_element.find('.//{http://linux.duke.edu/metadata/common}version')
|
||
if version_element is None:
|
||
continue
|
||
|
||
epoch = version_element.get('epoch', '0')
|
||
version = version_element.get('ver', '')
|
||
release = version_element.get('rel', '')
|
||
|
||
# 获取架构
|
||
arch_element = pkg_element.find('.//{http://linux.duke.edu/metadata/common}arch')
|
||
if arch_element is None:
|
||
continue
|
||
arch = arch_element.text
|
||
|
||
# 处理epoch
|
||
epoch_str = "" if not epoch or epoch == "0" else f"{epoch}:"
|
||
|
||
# 生成RPM文件名
|
||
rpm = f"{name}-{epoch_str}{version}-{release}.{arch}.rpm"
|
||
rpmlist.append(rpm)
|
||
|
||
logger.info(f"从XML获取到{len(rpmlist)}个RPM文件")
|
||
return rpmlist
|
||
|
||
except Exception as e:
|
||
logger.error(f"从XML获取RPM列表失败: {str(e)}")
|
||
return []
|
||
|
||
def compare_rpm_lists(self, list1: List[str], list2: List[str]) -> Dict[str, Any]:
|
||
"""
|
||
比较两个RPM列表的差异
|
||
|
||
Args:
|
||
list1: 第一个RPM列表
|
||
list2: 第二个RPM列表
|
||
|
||
Returns:
|
||
比较结果
|
||
"""
|
||
# 标准化RPM名称(去除版本信息,只保留包名)
|
||
def normalize_rpm_name(rpm: str) -> str:
|
||
# 移除.rpm后缀
|
||
if rpm.lower().endswith('.rpm'):
|
||
rpm = rpm[:-4]
|
||
return rpm
|
||
|
||
# 转换为集合以便比较
|
||
set1 = set(list1)
|
||
set2 = set(list2)
|
||
|
||
# 计算差异
|
||
only_in_list1 = sorted(set1 - set2)
|
||
only_in_list2 = sorted(set2 - set1)
|
||
|
||
# 检查是否完全相同
|
||
is_identical = len(only_in_list1) == 0 and len(only_in_list2) == 0
|
||
|
||
# 构建结果
|
||
result = {
|
||
"is_identical": is_identical,
|
||
"only_in_list1": only_in_list1,
|
||
"only_in_list2": only_in_list2,
|
||
"list1_count": len(list1),
|
||
"list2_count": len(list2),
|
||
"common_count": len(set1.intersection(set2))
|
||
}
|
||
|
||
return result
|
||
|
||
def format_comparison_result(self, result: Dict[str, Any]) -> List[str]:
|
||
"""
|
||
格式化比较结果
|
||
|
||
Args:
|
||
result: 比较结果字典
|
||
|
||
Returns:
|
||
格式化后的文本行列表
|
||
"""
|
||
output_lines = []
|
||
|
||
# 添加统计信息
|
||
output_lines.append(f"列表1软件包数量: {result['list1_count']}")
|
||
output_lines.append(f"列表2软件包数量: {result['list2_count']}")
|
||
output_lines.append(f"共同软件包数量: {result.get('common_count', 0)}")
|
||
output_lines.append("")
|
||
|
||
if result["is_identical"]:
|
||
output_lines.append("两个软件包列表完全相同")
|
||
output_lines.append("#####【测试通过】#####")
|
||
return output_lines
|
||
|
||
# 报告第一个列表独有的项目
|
||
if result["only_in_list1"]:
|
||
output_lines.append(f"仅在第一个列表中存在的软件包({len(result['only_in_list1'])}个):")
|
||
output_lines.extend(result["only_in_list1"])
|
||
else:
|
||
output_lines.append("没有仅在第一个列表中存在的软件包。")
|
||
|
||
output_lines.append("")
|
||
|
||
# 报告第二个列表独有的项目
|
||
if result["only_in_list2"]:
|
||
output_lines.append(f"仅在第二个列表中存在的软件包({len(result['only_in_list2'])}个):")
|
||
output_lines.extend(result["only_in_list2"])
|
||
else:
|
||
output_lines.append("没有仅在第二个列表中存在的软件包。")
|
||
|
||
return output_lines
|
||
|
||
def get_all_rpm_sources(self) -> Dict[str, List[str]]:
|
||
"""
|
||
获取所有可用的RPM列表来源
|
||
|
||
Returns:
|
||
包含不同来源RPM列表的字典
|
||
"""
|
||
results = {}
|
||
|
||
# 1. 从HTML获取RPM列表
|
||
html = self.get_html_content()
|
||
if html:
|
||
rpm_list_html = self.parse_rpm_list_from_html(html)
|
||
results["html"] = rpm_list_html
|
||
|
||
# 2. 从SQLite获取RPM列表
|
||
sqlite_path = self.download_and_extract_sqlite()
|
||
if sqlite_path:
|
||
rpm_list_sqlite = self.get_rpm_list_from_sqlite(sqlite_path)
|
||
results["sqlite"] = rpm_list_sqlite
|
||
|
||
# 3. 从XML获取RPM列表
|
||
xml_path = self.download_repodata_xml()
|
||
if xml_path:
|
||
rpm_list_xml = self.get_rpm_list_from_xml(xml_path)
|
||
results["xml"] = rpm_list_xml
|
||
|
||
return results
|
||
|
||
def compare_all_sources(self) -> Dict[str, Any]:
|
||
"""
|
||
比较所有来源的RPM列表
|
||
|
||
Returns:
|
||
比较结果
|
||
"""
|
||
sources = self.get_all_rpm_sources()
|
||
|
||
if not sources:
|
||
logger.error("没有找到任何RPM列表来源")
|
||
return {"error": "没有找到任何RPM列表来源"}
|
||
|
||
results = {}
|
||
|
||
# 如果有HTML和SQLite来源,比较它们
|
||
if "html" in sources and "sqlite" in sources:
|
||
html_vs_sqlite = self.compare_rpm_lists(sources["html"], sources["sqlite"])
|
||
results["html_vs_sqlite"] = html_vs_sqlite
|
||
|
||
# 如果有HTML和XML来源,比较它们
|
||
if "html" in sources and "xml" in sources:
|
||
html_vs_xml = self.compare_rpm_lists(sources["html"], sources["xml"])
|
||
results["html_vs_xml"] = html_vs_xml
|
||
|
||
# 如果有SQLite和XML来源,比较它们
|
||
if "sqlite" in sources and "xml" in sources:
|
||
sqlite_vs_xml = self.compare_rpm_lists(sources["sqlite"], sources["xml"])
|
||
results["sqlite_vs_xml"] = sqlite_vs_xml
|
||
|
||
return results
|