Files
2025-07-23 20:41:13 +08:00

372 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from collections import defaultdict
import bz2
import lzma
import shutil
import sqlite3
import requests
from lxml import etree
import configparser
import datetime
import pandas as pd
import openpyxl
import gzip
import os
from openpyxl.styles import Font, PatternFill
import urllib3
from test_srpm_up_down import gen_updown_test
#from generate_test_summary import generate_test_summary_excel
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ---------------------
# 下载 primary.sqlite 并解压
# ---------------------
def Getsqlite_primary(baseurl):
head = {
"User-Agent": "Mozilla/5.0"
}
path = f"{baseurl}repodata/"
try:
response = requests.get(path, headers=head, verify=False)
html = response.text
html = etree.HTML(html)
a_links = html.xpath("//a")
for item in a_links:
item1 = item.get('href')
if item1.endswith("primary.sqlite.bz2"):
sqlite_url = '/'.join([path, item1])
print(sqlite_url)
response = requests.get(sqlite_url, headers=head, verify=False)
un_path = "repotest.sqlite.bz2"
with open(un_path, 'wb') as code:
code.write(response.content)
bz2file = bz2.BZ2File(un_path)
data = bz2file.read()
newfilepath = "repotest.sqlite"
open(newfilepath, 'wb').write(data)
return newfilepath
elif item1.endswith("primary.sqlite.xz"):
sqlite_url = '/'.join([path, item1])
print(sqlite_url)
response = requests.get(sqlite_url, headers=head, verify=False)
un_path = "repotest.sqlite.xz"
with open(un_path, 'wb') as code:
code.write(response.content)
with lzma.open(un_path, 'rb') as input:
with open("repotest.sqlite", 'wb') as output:
shutil.copyfileobj(input, output)
return "repotest.sqlite"
elif item1.endswith("primary.sqlite.gz"):
sqlite_url = '/'.join([path, item1])
response = requests.get(sqlite_url, headers=head, verify=False)
un_path = "repotest.sqlite.gz"
with open(un_path, 'wb') as code:
code.write(response.content)
with gzip.open(un_path, 'rb') as input:
with open("repotest.sqlite", 'wb') as output:
shutil.copyfileobj(input, output)
return "repotest.sqlite"
print("获取数据库文件失败,请检查!")
return None
except Exception as e:
print("发生异常:", e)
return None
# ---------------------
# 获取包信息
# ---------------------
def get_package_info_from_db1(baseurl):
db_path = Getsqlite_primary(baseurl)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT pkgkey, name, arch, version, release, rpm_sourcerpm FROM packages")
result = cursor.fetchall()
conn.close()
pkg_info = {}
for pkgkey, name, arch, version, release, rpm_sourcerpm in result:
pkg_info[pkgkey] = {
"pkgkey": pkgkey,
"name": name,
"arch": arch,
"version": version,
"release": release,
"source": rpm_sourcerpm,
}
return pkg_info
def extract_srpm_nvr(pkg_info):
return set(info['source'].replace('.src.rpm', '').replace('.nosrc.rpm', '') for info in pkg_info.values() if info['source'])
def extract_binary_nvr(pkg_info):
return set(f"{info['name']}-{info['version']}-{info['release']}.{info['arch']}" for info in pkg_info.values())
def extract_srpm_names_from_nvr_list(nvr_list):
srpm_names = set()
for nvr in nvr_list:
parts = nvr.rsplit('-', 2)
if len(parts) == 3:
srpm_names.add(parts[0])
return sorted(srpm_names)
def analyze_binary_distribution_all_repos(all_pkg_info_by_repo):
srpm_versions = defaultdict(lambda: defaultdict(lambda: defaultdict(set)))
for repo_name, pkg_info in all_pkg_info_by_repo.items():
for info in pkg_info.values():
source_rpm = info['source']
binary_name = info['name']
try:
srpm_base = source_rpm.replace('.src.rpm', '').replace('.nosrc.rpm', '')
srpm_name, srpm_version, srpm_release = srpm_base.rsplit('-', 2)
srpm_fullver = f"{srpm_version}-{srpm_release}"
except Exception:
continue
srpm_versions[srpm_name][repo_name][srpm_fullver].add(binary_name)
srpm_change_flags = {}
srpm_all_binaries = {}
for srpm_name, repo_versions in srpm_versions.items():
all_binary_counts = []
all_binaries = set()
for version_map in repo_versions.values():
for binaries in version_map.values():
all_binary_counts.append(len(binaries))
all_binaries.update(binaries)
srpm_change_flags[srpm_name] = len(set(all_binary_counts)) > 1
srpm_all_binaries[srpm_name] = all_binaries
records = []
for srpm_name, repo_versions in srpm_versions.items():
changed = srpm_change_flags[srpm_name]
all_binaries = srpm_all_binaries[srpm_name]
for repo_name, versions in repo_versions.items():
pkg_info_map = all_pkg_info_by_repo[repo_name]
for version, binaries in versions.items():
added = sorted(binaries - all_binaries)
removed = sorted(all_binaries - binaries)
binary_nvrs = sorted([
f"{info['name']}-{info['version']}-{info['release']}.{info['arch']}"
for info in pkg_info_map.values()
if info["name"] in binaries and
info["source"].replace('.src.rpm', '').replace('.nosrc.rpm', '').endswith(version)
])
records.append({
"Repo": repo_name,
"SRPM Name": srpm_name,
"SRPM Version": version,
"Binary Count": len(binaries),
"Binary Package List": ' '.join(sorted(binaries)),
"Binary NVR List": ' '.join(binary_nvrs),
"Binary Count Changed": "Yes" if changed else "No",
"Added Binaries": ' '.join(added),
"Removed Binaries": ' '.join(removed),
})
df = pd.DataFrame(records)
df.sort_values(by=["SRPM Name", "SRPM Version", "Repo"], inplace=True)
return df
def write_test_comparison_sheet(wb, added_srpms, removed_srpms, added_bins, removed_bins):
sheet = wb.create_sheet("测试仓库对比")
sheet.append(["新增 SRPM仅测试仓库对比", "新增 Binary NVR仅测试仓库对比"])
for i in range(max(len(added_srpms), len(added_bins))):
sheet.append([added_srpms[i] if i < len(added_srpms) else "", added_bins[i] if i < len(added_bins) else ""])
sheet.append([])
sheet.append(["减少 SRPM仅测试仓库对比", "减少 Binary NVR仅测试仓库对比"])
for i in range(max(len(removed_srpms), len(removed_bins))):
sheet.append([removed_srpms[i] if i < len(removed_srpms) else "", removed_bins[i] if i < len(removed_bins) else ""])
def write_filtered_analysis_sheet(analysis_df, target_srpms, worksheet):
filtered_df = analysis_df[analysis_df["SRPM Name"].isin(target_srpms)]
worksheet.append(list(filtered_df.columns))
for _, row in filtered_df.iterrows():
worksheet.append(list(row.values))
def write_analysis_to_excel(analysis_df, repo_name, worksheet, is_first=False):
if is_first:
worksheet.append(["Repo"] + list(analysis_df.columns))
for _, row in analysis_df.iterrows():
worksheet.append([repo_name] + list(row.values))
def load_product_config():
config = configparser.ConfigParser()
config.read('config.ini', encoding='utf-8')
all_products = {}
for section in config.sections():
if not config.has_option(section, 'baseurls'):
continue
baseurls_raw = config.get(section, 'baseurls', fallback='')
baseurl_lines = [line.strip() for line in baseurls_raw.splitlines() if line.strip()]
baseurl_dict = {}
for line in baseurl_lines:
if '=' in line:
name, url = line.split('=', 1)
baseurl_dict[name.strip()] = url.strip()
filename = config.get(section, 'filename', fallback=section)
updates_test = config.get(section, 'updates_test', fallback=None)
all_products[section] = {
"baseurls": baseurl_dict,
"filename": filename,
"updates_test": updates_test
}
return all_products
def compare_updates_and_test(updates_url, test_url):
new_info = get_package_info_from_db1(updates_url)
test_info = get_package_info_from_db1(test_url)
new_srpms = extract_srpm_nvr(new_info)
test_srpms = extract_srpm_nvr(test_info)
new_bins = extract_binary_nvr(new_info)
test_bins = extract_binary_nvr(test_info)
added_srpms = sorted(test_srpms - new_srpms)
removed_srpms = sorted(new_srpms - test_srpms)
added_bins = sorted(test_bins - new_bins)
removed_bins = sorted(new_bins - test_bins)
focus_srpms = extract_srpm_names_from_nvr_list(added_srpms + removed_srpms)
return added_srpms, removed_srpms, added_bins, removed_bins, focus_srpms
def run_for_product(product_name, baseurl_map, filename_prefix, updates_test_url, show_raw=False):
print(f"\n🔍 正在分析产品:{product_name}")
output_dir = os.path.join("output", product_name)
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M")
excel_path = os.path.join(output_dir, f"{filename_prefix}-分析报告-{timestamp}.xlsx")
pkglist_path = os.path.join(output_dir, f"Pkglist-{product_name}")
wb = openpyxl.Workbook()
if show_raw:
raw_sheet = wb.create_sheet("仓库原始数据")
analysis_sheet = wb.create_sheet("仓库所有SRPM分析")
focus_sheet_external = wb.create_sheet("外网仓库本周重点分析的SRPM")
focus_sheet_test = wb.create_sheet("测试仓库本周重点分析的SRPM")
wb.remove(wb.active)
all_pkg_info_by_repo = {}
if show_raw:
is_first_raw = True
for repo_name, baseurl in baseurl_map.items():
info = get_package_info_from_db1(baseurl)
all_pkg_info_by_repo[repo_name] = info
write_Excel(info, repo_name, raw_sheet, is_first=is_first_raw)
is_first_raw = False
else:
for repo_name, baseurl in baseurl_map.items():
all_pkg_info_by_repo[repo_name] = get_package_info_from_db1(baseurl)
analysis_df = analyze_binary_distribution_all_repos(all_pkg_info_by_repo)
write_analysis_to_excel(analysis_df, "Merged", analysis_sheet, is_first=True)
updates_url = baseurl_map.get("updates")
base_url = baseurl_map.get("base")
if updates_url and updates_test_url:
added_srpms, removed_srpms, added_bins, removed_bins, focus_srpms = compare_updates_and_test(updates_url, updates_test_url)
write_test_comparison_sheet(wb, added_srpms, removed_srpms, added_bins, removed_bins)
with open(pkglist_path, 'w') as f:
for nvr in added_bins:
f.write(nvr + '\n')
ext_info = {
'base': get_package_info_from_db1(base_url),
'updates': get_package_info_from_db1(updates_url)
}
ext_df = analyze_binary_distribution_all_repos(ext_info)
write_filtered_analysis_sheet(ext_df, focus_srpms, focus_sheet_external)
test_info = {
'base': get_package_info_from_db1(base_url),
'updates_test': get_package_info_from_db1(updates_test_url)
}
test_df = analyze_binary_distribution_all_repos(test_info)
write_filtered_analysis_sheet(test_df, focus_srpms, focus_sheet_test)
wb.save(excel_path)
print(f"{product_name} 分析完成,输出文件:{excel_path}")
# ✅ 生成安装卸载测试脚本(不需要传参)
install_test_script_path = os.path.join(output_dir, "install_remove_test.sh")
with open(install_test_script_path, "w") as f:
f.write(f"""#!/bin/bash
PACKAGE_LIST=Pkglist-{product_name}
OUTPUT_FILE={product_name}_install_results.csv
LOG_FILE={product_name}_install_test.log
echo \"Package,Install Status,Remove Status,Error\" > "$OUTPUT_FILE"
echo \"===== 安装测试日志开始 =====\" > "$LOG_FILE"
while IFS= read -r package
do
if [ -z "$package" ]; then
continue
fi
echo "正在安装: $package" | tee -a "$LOG_FILE"
install_output=$(yum --setopt=timeout=300 --setopt=retries=10 install -y "$package" 2>&1)
install_status=$?
if [ $install_status -eq 0 ]; then
install_result=成功
else
install_result=失败
fi
echo "$install_output" >> "$LOG_FILE"
echo "正在卸载: $package" | tee -a "$LOG_FILE"
remove_output=$(yum remove -y "$package" 2>&1)
remove_status=$?
if [ $remove_status -eq 0 ]; then
remove_result=成功
else
remove_result=失败
fi
echo "$remove_output" >> "$LOG_FILE"
if [ "$install_result" = "失败" ]; then
error_msg="$install_output"
elif [ "$remove_result" = "失败" ]; then
error_msg="$remove_output"
else
error_msg=""
fi
echo "$package,$install_result,$remove_result,\"$error_msg\"" >> "$OUTPUT_FILE"
echo "--------------------------------" >> "$LOG_FILE"
done < "$PACKAGE_LIST"
echo \"===== 安装测试日志结束 =====\" >> "$LOG_FILE"
echo \"测试完成,结果保存到 $OUTPUT_FILE过程日志保存到 $LOG_FILE\"
""")
os.chmod(install_test_script_path, 0o755)
# 🔧 生成 SRPM 升降级测试脚本
success = gen_updown_test([excel_path], filter_choice="All", sheet_name="测试仓库本周重点分析的SRPM")
if success:
for fname in os.listdir("."):
if fname.startswith("test_upgrade_downgrade_") and fname.endswith(".sh"):
shutil.move(fname, os.path.join(output_dir, fname))
elif fname.startswith("test_results_") or fname.startswith("test_process_"):
shutil.move(fname, os.path.join(output_dir, fname))
# # ✅ 汇总测试结果(安装卸载 + 升级降级)
# generate_test_summary_excel(output_dir, product_name)
def main():
products = load_product_config()
for name, conf in products.items():
run_for_product(
product_name=name,
baseurl_map=conf['baseurls'],
filename_prefix=conf['filename'],
updates_test_url=conf.get('updates_test'),
show_raw=False
)
if __name__ == '__main__':
main()