Files
REPO_TEST/script/04_package_tps_test/compare.py
2025-07-23 20:41:13 +08:00

320 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from collections import defaultdict
import bz2
import lzma
import shutil
import sqlite3
import requests
from lxml import etree
import configparser
import datetime
import pandas as pd
import openpyxl
import gzip
from openpyxl.styles import Font, PatternFill
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ---------------------
# 下载 primary.sqlite 并解压
# ---------------------
def Getsqlite_primary(baseurl):
"""
从repodata中获取数据库压缩文件下载并解压
url 格式 例如 basrutl=https://update.cs2c.com.cn/NS/V10/8U6/os/adv/lic/AppStream/x86_64/os/
"""
head = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
}
path = f"{baseurl}repodata/"
try:
response = requests.get(path, headers=head, verify=False)
html = response.text
html = etree.HTML(html)
a_links = html.xpath("//a")
for item in a_links:
item1 = item.get('href')
if item1.endswith("primary.sqlite.bz2"):
sqlite_url = '/'.join([path, item1])
print(sqlite_url)
response = requests.get(sqlite_url, headers=head, verify=False)
un_path = "repotest.sqlite.bz2"
with open(un_path, 'wb') as code:
code.write(response.content)
bz2file = bz2.BZ2File(un_path)
data = bz2file.read()
newfilepath = "repotest.sqlite"
open(newfilepath, 'wb').write(data)
return newfilepath
elif item1.endswith("primary.sqlite.xz"):
sqlite_url = '/'.join([path, item1])
response = requests.get(sqlite_url, headers=head, verify=False)
un_path = "repotest.sqlite.xz"
with open(un_path, 'wb') as code:
code.write(response.content)
with lzma.open(un_path, 'rb') as input:
with open("repotest.sqlite", 'wb') as output:
shutil.copyfileobj(input, output)
return "repotest.sqlite"
elif item1.endswith("primary.sqlite.gz"):
sqlite_url = '/'.join([path, item1])
print(sqlite_url)
response = requests.get(sqlite_url, headers=head, verify=False)
un_path = "repotest.sqlite.gz"
with open(un_path, 'wb') as code:
code.write(response.content)
with gzip.open(un_path, 'rb') as input:
with open("repotest.sqlite", 'wb') as output:
shutil.copyfileobj(input, output)
return "repotest.sqlite"
print("获取数据库文件失败,请检查!")
return None
except Exception as e:
print("发生异常:", e)
return None
# ---------------------
# 获取包信息
# ---------------------
def get_package_info_from_db1(baseurl):
db_path = Getsqlite_primary(baseurl)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT pkgkey, name, arch, version, release, rpm_sourcerpm FROM packages")
result = cursor.fetchall()
conn.close()
pkg_info = {}
for pkgkey, name, arch, version, release, rpm_sourcerpm in result:
pkg_info[pkgkey] = {
"pkgkey": pkgkey,
"name": name,
"arch": arch,
"version": version,
"release": release,
"source": rpm_sourcerpm,
}
return pkg_info
# ---------------------
# 提取 SRPM 名称/NVR
# ---------------------
def extract_srpm_nvr(pkg_info):
return set(info['source'].replace('.src.rpm', '').replace('.nosrc.rpm', '') for info in pkg_info.values() if info['source'])
def extract_binary_nvr(pkg_info):
return set(f"{info['name']}-{info['version']}-{info['release']}.{info['arch']}" for info in pkg_info.values())
def extract_srpm_names_from_nvr_list(nvr_list):
srpm_names = set()
for nvr in nvr_list:
parts = nvr.rsplit('-', 2)
if len(parts) == 3:
srpm_names.add(parts[0])
return sorted(srpm_names)
# ---------------------
# 差异分析(多仓库)
# ---------------------
def analyze_binary_distribution_all_repos(all_pkg_info_by_repo):
srpm_versions = defaultdict(lambda: defaultdict(lambda: defaultdict(set)))
for repo_name, pkg_info in all_pkg_info_by_repo.items():
for info in pkg_info.values():
source_rpm = info['source']
binary_name = info['name']
try:
srpm_base = source_rpm.replace('.src.rpm', '').replace('.nosrc.rpm', '')
srpm_name, srpm_version, srpm_release = srpm_base.rsplit('-', 2)
srpm_fullver = f"{srpm_version}-{srpm_release}"
except Exception:
continue
srpm_versions[srpm_name][repo_name][srpm_fullver].add(binary_name)
srpm_change_flags = {}
srpm_all_binaries = {}
for srpm_name, repo_versions in srpm_versions.items():
all_binary_counts = []
all_binaries = set()
for version_map in repo_versions.values():
for binaries in version_map.values():
all_binary_counts.append(len(binaries))
all_binaries.update(binaries)
srpm_change_flags[srpm_name] = len(set(all_binary_counts)) > 1
srpm_all_binaries[srpm_name] = all_binaries
records = []
for srpm_name, repo_versions in srpm_versions.items():
changed = srpm_change_flags[srpm_name]
all_binaries = srpm_all_binaries[srpm_name]
for repo_name, versions in repo_versions.items():
pkg_info_map = all_pkg_info_by_repo[repo_name]
for version, binaries in versions.items():
added = sorted(binaries - all_binaries)
removed = sorted(all_binaries - binaries)
binary_nvrs = sorted([
f"{info['name']}-{info['version']}-{info['release']}.{info['arch']}"
for info in pkg_info_map.values()
if info["name"] in binaries and
info["source"].replace('.src.rpm', '').replace('.nosrc.rpm', '').endswith(version)
])
records.append({
"Repo": repo_name,
"SRPM Name": srpm_name,
"SRPM Version": version,
"Binary Count": len(binaries),
"Binary Package List": ' '.join(sorted(binaries)),
"Binary NVR List": ' '.join(binary_nvrs),
"Binary Count Changed": "Yes" if changed else "No",
"Added Binaries": ' '.join(added),
"Removed Binaries": ' '.join(removed),
})
df = pd.DataFrame(records)
df.sort_values(by=["SRPM Name", "SRPM Version", "Repo"], inplace=True)
return df
# ---------------------
# 对比测试仓库并返回重点 SRPM
# ---------------------
def compare_updates_and_test(updates_url, test_url):
new_info = get_package_info_from_db1(updates_url)
test_info = get_package_info_from_db1(test_url)
new_srpms = extract_srpm_nvr(new_info)
test_srpms = extract_srpm_nvr(test_info)
new_bins = extract_binary_nvr(new_info)
test_bins = extract_binary_nvr(test_info)
added_srpms = sorted(test_srpms - new_srpms)
removed_srpms = sorted(new_srpms - test_srpms)
added_bins = sorted(test_bins - new_bins)
removed_bins = sorted(new_bins - test_bins)
focus_srpms = extract_srpm_names_from_nvr_list(added_srpms + removed_srpms)
return added_srpms, removed_srpms, added_bins, removed_bins, focus_srpms
def write_test_comparison_sheet(wb, added_srpms, removed_srpms, added_bins, removed_bins):
sheet = wb.create_sheet("测试仓库对比")
sheet.append(["新增 SRPM仅测试仓库对比", "新增 Binary NVR仅测试仓库对比"])
for i in range(max(len(added_srpms), len(added_bins))):
sheet.append([added_srpms[i] if i < len(added_srpms) else "", added_bins[i] if i < len(added_bins) else ""])
sheet.append([])
sheet.append(["减少 SRPM仅测试仓库对比", "减少 Binary NVR仅测试仓库对比"])
for i in range(max(len(removed_srpms), len(removed_bins))):
sheet.append([removed_srpms[i] if i < len(removed_srpms) else "", removed_bins[i] if i < len(removed_bins) else ""])
# ---------------------
# 写入分析表
# ---------------------
def write_filtered_analysis_sheet(analysis_df, target_srpms, worksheet):
filtered_df = analysis_df[analysis_df["SRPM Name"].isin(target_srpms)]
worksheet.append(list(filtered_df.columns))
for _, row in filtered_df.iterrows():
worksheet.append(list(row.values))
def write_analysis_to_excel(analysis_df, repo_name, worksheet, is_first=False):
if is_first:
worksheet.append(["Repo"] + list(analysis_df.columns))
for _, row in analysis_df.iterrows():
worksheet.append([repo_name] + list(row.values))
# ---------------------
# 配置读取
# ---------------------
def load_product_config():
config = configparser.ConfigParser()
config.read('config.ini', encoding='utf-8')
all_products = {}
for section in config.sections():
if not config.has_option(section, 'baseurls'):
continue
baseurls_raw = config.get(section, 'baseurls', fallback='')
baseurl_lines = [line.strip() for line in baseurls_raw.splitlines() if line.strip()]
baseurl_dict = {}
for line in baseurl_lines:
if '=' in line:
name, url = line.split('=', 1)
baseurl_dict[name.strip()] = url.strip()
filename = config.get(section, 'filename', fallback=section)
updates_test = config.get(section, 'updates_test', fallback=None)
all_products[section] = {
"baseurls": baseurl_dict,
"filename": filename,
"updates_test": updates_test
}
return all_products
# ---------------------
# 主执行逻辑
# ---------------------
def run_for_product(product_name, baseurl_map, filename_prefix, updates_test_url, show_raw=False):
print(f"\n🔍 正在分析产品:{product_name}")
wb = openpyxl.Workbook()
if show_raw:
raw_sheet = wb.create_sheet("仓库原始数据")
analysis_sheet = wb.create_sheet("仓库所有SRPM分析")
focus_sheet_external = wb.create_sheet("外网仓库本周重点分析的SRPM")
focus_sheet_test = wb.create_sheet("测试仓库本周重点分析的SRPM")
wb.remove(wb.active)
all_pkg_info_by_repo = {}
if show_raw:
is_first_raw = True
for repo_name, baseurl in baseurl_map.items():
info = get_package_info_from_db1(baseurl)
all_pkg_info_by_repo[repo_name] = info
write_Excel(info, repo_name, raw_sheet, is_first=is_first_raw)
is_first_raw = False
else:
for repo_name, baseurl in baseurl_map.items():
all_pkg_info_by_repo[repo_name] = get_package_info_from_db1(baseurl)
analysis_df = analyze_binary_distribution_all_repos(all_pkg_info_by_repo)
write_analysis_to_excel(analysis_df, "Merged", analysis_sheet, is_first=True)
updates_url = baseurl_map.get("updates")
base_url = baseurl_map.get("base")
if updates_url and updates_test_url:
added_srpms, removed_srpms, added_bins, removed_bins, focus_srpms = compare_updates_and_test(updates_url, updates_test_url)
write_test_comparison_sheet(wb, added_srpms, removed_srpms, added_bins, removed_bins)
# 外网重点分析base + updates
ext_info = {
'base': get_package_info_from_db1(base_url),
'updates': get_package_info_from_db1(updates_url)
}
ext_df = analyze_binary_distribution_all_repos(ext_info)
write_filtered_analysis_sheet(ext_df, focus_srpms, focus_sheet_external)
# 测试重点分析base + updates_test
test_info = {
'base': get_package_info_from_db1(base_url),
'updates_test': get_package_info_from_db1(updates_test_url)
}
test_df = analyze_binary_distribution_all_repos(test_info)
write_filtered_analysis_sheet(test_df, focus_srpms, focus_sheet_test)
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M")
output_file = f"{filename_prefix}-{timestamp}.xlsx"
wb.save(output_file)
print(f"{product_name} 分析完成,输出文件:{output_file}")
# ---------------------
# 主函数
# ---------------------
def main():
products = load_product_config()
for name, conf in products.items():
run_for_product(
product_name=name,
baseurl_map=conf['baseurls'],
filename_prefix=conf['filename'],
updates_test_url=conf.get('updates_test'),
show_raw=False
)
if __name__ == '__main__':
main()