95 lines
3.6 KiB
Python
95 lines
3.6 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
|
||
import os
|
||
import glob
|
||
import xml.etree.ElementTree as ET
|
||
import tempfile
|
||
import requests
|
||
|
||
class YumRepoParser:
|
||
def __init__(self, config_dir='/etc/yum.repo.d/', config_files=None):
|
||
self.config_dir = config_dir
|
||
self.config_files = config_files or self._discover_config_files()
|
||
|
||
def _discover_config_files(self):
|
||
"""
|
||
自动发现配置文件(*.repo)
|
||
"""
|
||
pattern = os.path.join(self.config_dir, '*.repo')
|
||
return glob.glob(pattern)
|
||
|
||
def parse_repo_metadata(self, repo_path_or_url):
|
||
"""
|
||
解析指定 yum 仓库的 repodata/primary.xml,支持本地路径和远程 http(s) 地址
|
||
返回 src 包和二进制包列表
|
||
"""
|
||
if repo_path_or_url.startswith('http://') or repo_path_or_url.startswith('https://'):
|
||
primary_xml_url = repo_path_or_url.rstrip('/') + '/repodata/primary.xml'
|
||
with tempfile.NamedTemporaryFile(delete=False) as tmp:
|
||
try:
|
||
resp = requests.get(primary_xml_url, timeout=30)
|
||
resp.raise_for_status()
|
||
tmp.write(resp.content)
|
||
tmp_path = tmp.name
|
||
except Exception as e:
|
||
raise RuntimeError(f"下载远程 primary.xml 失败: {primary_xml_url}, 原因: {e}")
|
||
else:
|
||
tmp_path = os.path.join(repo_path_or_url, 'repodata', 'primary.xml')
|
||
if not os.path.exists(tmp_path):
|
||
raise FileNotFoundError(f"未找到 primary.xml: {repo_path_or_url}")
|
||
|
||
src_packages = []
|
||
bin_packages = []
|
||
|
||
tree = ET.parse(tmp_path)
|
||
root = tree.getroot()
|
||
|
||
for package in root.findall('.//package'):
|
||
pkg_type = package.get('type')
|
||
name = package.findtext('name')
|
||
location = package.find('location').get('href')
|
||
if pkg_type == 'src':
|
||
src_packages.append({'name': name, 'location': location})
|
||
else:
|
||
bin_packages.append({'name': name, 'location': location})
|
||
|
||
# 清理临时文件(远程下载时)
|
||
if repo_path_or_url.startswith('http://') or repo_path_or_url.startswith('https://'):
|
||
os.remove(tmp_path)
|
||
|
||
return src_packages, bin_packages
|
||
|
||
def get_all_repos(self):
|
||
"""
|
||
解析所有配置文件,返回所有仓库地址列表,支持 baseurl/mirrorlist/metalink
|
||
"""
|
||
repos = []
|
||
for cfg in self.config_files:
|
||
with open(cfg, 'r', encoding='utf-8') as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line.startswith('baseurl='):
|
||
url = line.split('=', 1)[1].strip()
|
||
repos.append(url)
|
||
elif line.startswith('mirrorlist='):
|
||
url = line.split('=', 1)[1].strip()
|
||
repos.append(url)
|
||
elif line.startswith('metalink='):
|
||
url = line.split('=', 1)[1].strip()
|
||
repos.append(url)
|
||
return repos
|
||
|
||
# 示例用法
|
||
if __name__ == "__main__":
|
||
parser = YumRepoParser()
|
||
repo_urls = parser.get_all_repos()
|
||
for repo_url in repo_urls:
|
||
# 假设 repo_url 是本地路径,实际可根据需求下载 repodata/primary.xml
|
||
try:
|
||
src_pkgs, bin_pkgs = parser.parse_repo_metadata(repo_url)
|
||
print(f"仓库: {repo_url}")
|
||
print(f"源码包: {len(src_pkgs)},二进制包: {len(bin_pkgs)}")
|
||
except Exception as e:
|
||
print(f"解析失败: {repo_url},原因: {e}") |