from collections import defaultdict import bz2 import lzma import shutil import sqlite3 import requests from lxml import etree import configparser import datetime import pandas as pd import openpyxl import gzip import os from openpyxl.styles import Font, PatternFill import urllib3 from test_srpm_up_down import gen_updown_test #from generate_test_summary import generate_test_summary_excel urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # --------------------- # 下载 primary.sqlite 并解压 # --------------------- def Getsqlite_primary(baseurl): head = { "User-Agent": "Mozilla/5.0" } path = f"{baseurl}repodata/" try: response = requests.get(path, headers=head, verify=False) html = response.text html = etree.HTML(html) a_links = html.xpath("//a") for item in a_links: item1 = item.get('href') if item1.endswith("primary.sqlite.bz2"): sqlite_url = '/'.join([path, item1]) print(sqlite_url) response = requests.get(sqlite_url, headers=head, verify=False) un_path = "repotest.sqlite.bz2" with open(un_path, 'wb') as code: code.write(response.content) bz2file = bz2.BZ2File(un_path) data = bz2file.read() newfilepath = "repotest.sqlite" open(newfilepath, 'wb').write(data) return newfilepath elif item1.endswith("primary.sqlite.xz"): sqlite_url = '/'.join([path, item1]) print(sqlite_url) response = requests.get(sqlite_url, headers=head, verify=False) un_path = "repotest.sqlite.xz" with open(un_path, 'wb') as code: code.write(response.content) with lzma.open(un_path, 'rb') as input: with open("repotest.sqlite", 'wb') as output: shutil.copyfileobj(input, output) return "repotest.sqlite" elif item1.endswith("primary.sqlite.gz"): sqlite_url = '/'.join([path, item1]) response = requests.get(sqlite_url, headers=head, verify=False) un_path = "repotest.sqlite.gz" with open(un_path, 'wb') as code: code.write(response.content) with gzip.open(un_path, 'rb') as input: with open("repotest.sqlite", 'wb') as output: shutil.copyfileobj(input, output) return "repotest.sqlite" print("获取数据库文件失败,请检查!") return None except Exception as e: print("发生异常:", e) return None # --------------------- # 获取包信息 # --------------------- def get_package_info_from_db1(baseurl): db_path = Getsqlite_primary(baseurl) conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute("SELECT pkgkey, name, arch, version, release, rpm_sourcerpm FROM packages") result = cursor.fetchall() conn.close() pkg_info = {} for pkgkey, name, arch, version, release, rpm_sourcerpm in result: pkg_info[pkgkey] = { "pkgkey": pkgkey, "name": name, "arch": arch, "version": version, "release": release, "source": rpm_sourcerpm, } return pkg_info def extract_srpm_nvr(pkg_info): return set(info['source'].replace('.src.rpm', '').replace('.nosrc.rpm', '') for info in pkg_info.values() if info['source']) def extract_binary_nvr(pkg_info): return set(f"{info['name']}-{info['version']}-{info['release']}.{info['arch']}" for info in pkg_info.values()) def extract_srpm_names_from_nvr_list(nvr_list): srpm_names = set() for nvr in nvr_list: parts = nvr.rsplit('-', 2) if len(parts) == 3: srpm_names.add(parts[0]) return sorted(srpm_names) def analyze_binary_distribution_all_repos(all_pkg_info_by_repo): srpm_versions = defaultdict(lambda: defaultdict(lambda: defaultdict(set))) for repo_name, pkg_info in all_pkg_info_by_repo.items(): for info in pkg_info.values(): source_rpm = info['source'] binary_name = info['name'] try: srpm_base = source_rpm.replace('.src.rpm', '').replace('.nosrc.rpm', '') srpm_name, srpm_version, srpm_release = srpm_base.rsplit('-', 2) srpm_fullver = f"{srpm_version}-{srpm_release}" except Exception: continue srpm_versions[srpm_name][repo_name][srpm_fullver].add(binary_name) srpm_change_flags = {} srpm_all_binaries = {} for srpm_name, repo_versions in srpm_versions.items(): all_binary_counts = [] all_binaries = set() for version_map in repo_versions.values(): for binaries in version_map.values(): all_binary_counts.append(len(binaries)) all_binaries.update(binaries) srpm_change_flags[srpm_name] = len(set(all_binary_counts)) > 1 srpm_all_binaries[srpm_name] = all_binaries records = [] for srpm_name, repo_versions in srpm_versions.items(): changed = srpm_change_flags[srpm_name] all_binaries = srpm_all_binaries[srpm_name] for repo_name, versions in repo_versions.items(): pkg_info_map = all_pkg_info_by_repo[repo_name] for version, binaries in versions.items(): added = sorted(binaries - all_binaries) removed = sorted(all_binaries - binaries) binary_nvrs = sorted([ f"{info['name']}-{info['version']}-{info['release']}.{info['arch']}" for info in pkg_info_map.values() if info["name"] in binaries and info["source"].replace('.src.rpm', '').replace('.nosrc.rpm', '').endswith(version) ]) records.append({ "Repo": repo_name, "SRPM Name": srpm_name, "SRPM Version": version, "Binary Count": len(binaries), "Binary Package List": ' '.join(sorted(binaries)), "Binary NVR List": ' '.join(binary_nvrs), "Binary Count Changed": "Yes" if changed else "No", "Added Binaries": ' '.join(added), "Removed Binaries": ' '.join(removed), }) df = pd.DataFrame(records) df.sort_values(by=["SRPM Name", "SRPM Version", "Repo"], inplace=True) return df def write_test_comparison_sheet(wb, added_srpms, removed_srpms, added_bins, removed_bins): sheet = wb.create_sheet("测试仓库对比") sheet.append(["新增 SRPM(仅测试仓库对比)", "新增 Binary NVR(仅测试仓库对比)"]) for i in range(max(len(added_srpms), len(added_bins))): sheet.append([added_srpms[i] if i < len(added_srpms) else "", added_bins[i] if i < len(added_bins) else ""]) sheet.append([]) sheet.append(["减少 SRPM(仅测试仓库对比)", "减少 Binary NVR(仅测试仓库对比)"]) for i in range(max(len(removed_srpms), len(removed_bins))): sheet.append([removed_srpms[i] if i < len(removed_srpms) else "", removed_bins[i] if i < len(removed_bins) else ""]) def write_filtered_analysis_sheet(analysis_df, target_srpms, worksheet): filtered_df = analysis_df[analysis_df["SRPM Name"].isin(target_srpms)] worksheet.append(list(filtered_df.columns)) for _, row in filtered_df.iterrows(): worksheet.append(list(row.values)) def write_analysis_to_excel(analysis_df, repo_name, worksheet, is_first=False): if is_first: worksheet.append(["Repo"] + list(analysis_df.columns)) for _, row in analysis_df.iterrows(): worksheet.append([repo_name] + list(row.values)) def load_product_config(): config = configparser.ConfigParser() config.read('config.ini', encoding='utf-8') all_products = {} for section in config.sections(): if not config.has_option(section, 'baseurls'): continue baseurls_raw = config.get(section, 'baseurls', fallback='') baseurl_lines = [line.strip() for line in baseurls_raw.splitlines() if line.strip()] baseurl_dict = {} for line in baseurl_lines: if '=' in line: name, url = line.split('=', 1) baseurl_dict[name.strip()] = url.strip() filename = config.get(section, 'filename', fallback=section) updates_test = config.get(section, 'updates_test', fallback=None) all_products[section] = { "baseurls": baseurl_dict, "filename": filename, "updates_test": updates_test } return all_products def compare_updates_and_test(updates_url, test_url): new_info = get_package_info_from_db1(updates_url) test_info = get_package_info_from_db1(test_url) new_srpms = extract_srpm_nvr(new_info) test_srpms = extract_srpm_nvr(test_info) new_bins = extract_binary_nvr(new_info) test_bins = extract_binary_nvr(test_info) added_srpms = sorted(test_srpms - new_srpms) removed_srpms = sorted(new_srpms - test_srpms) added_bins = sorted(test_bins - new_bins) removed_bins = sorted(new_bins - test_bins) focus_srpms = extract_srpm_names_from_nvr_list(added_srpms + removed_srpms) return added_srpms, removed_srpms, added_bins, removed_bins, focus_srpms def run_for_product(product_name, baseurl_map, filename_prefix, updates_test_url, show_raw=False): print(f"\n🔍 正在分析产品:{product_name}") output_dir = os.path.join("output", product_name) os.makedirs(output_dir, exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M") excel_path = os.path.join(output_dir, f"{filename_prefix}-分析报告-{timestamp}.xlsx") pkglist_path = os.path.join(output_dir, f"Pkglist-{product_name}") wb = openpyxl.Workbook() if show_raw: raw_sheet = wb.create_sheet("仓库原始数据") analysis_sheet = wb.create_sheet("仓库所有SRPM分析") focus_sheet_external = wb.create_sheet("外网仓库本周重点分析的SRPM") focus_sheet_test = wb.create_sheet("测试仓库本周重点分析的SRPM") wb.remove(wb.active) all_pkg_info_by_repo = {} if show_raw: is_first_raw = True for repo_name, baseurl in baseurl_map.items(): info = get_package_info_from_db1(baseurl) all_pkg_info_by_repo[repo_name] = info write_Excel(info, repo_name, raw_sheet, is_first=is_first_raw) is_first_raw = False else: for repo_name, baseurl in baseurl_map.items(): all_pkg_info_by_repo[repo_name] = get_package_info_from_db1(baseurl) analysis_df = analyze_binary_distribution_all_repos(all_pkg_info_by_repo) write_analysis_to_excel(analysis_df, "Merged", analysis_sheet, is_first=True) updates_url = baseurl_map.get("updates") base_url = baseurl_map.get("base") if updates_url and updates_test_url: added_srpms, removed_srpms, added_bins, removed_bins, focus_srpms = compare_updates_and_test(updates_url, updates_test_url) write_test_comparison_sheet(wb, added_srpms, removed_srpms, added_bins, removed_bins) with open(pkglist_path, 'w') as f: for nvr in added_bins: f.write(nvr + '\n') ext_info = { 'base': get_package_info_from_db1(base_url), 'updates': get_package_info_from_db1(updates_url) } ext_df = analyze_binary_distribution_all_repos(ext_info) write_filtered_analysis_sheet(ext_df, focus_srpms, focus_sheet_external) test_info = { 'base': get_package_info_from_db1(base_url), 'updates_test': get_package_info_from_db1(updates_test_url) } test_df = analyze_binary_distribution_all_repos(test_info) write_filtered_analysis_sheet(test_df, focus_srpms, focus_sheet_test) wb.save(excel_path) print(f"✅ {product_name} 分析完成,输出文件:{excel_path}") # ✅ 生成安装卸载测试脚本(不需要传参) install_test_script_path = os.path.join(output_dir, "install_remove_test.sh") with open(install_test_script_path, "w") as f: f.write(f"""#!/bin/bash PACKAGE_LIST=Pkglist-{product_name} OUTPUT_FILE={product_name}_install_results.csv LOG_FILE={product_name}_install_test.log echo \"Package,Install Status,Remove Status,Error\" > "$OUTPUT_FILE" echo \"===== 安装测试日志开始 =====\" > "$LOG_FILE" while IFS= read -r package do if [ -z "$package" ]; then continue fi echo "正在安装: $package" | tee -a "$LOG_FILE" install_output=$(yum --setopt=timeout=300 --setopt=retries=10 install -y "$package" 2>&1) install_status=$? if [ $install_status -eq 0 ]; then install_result=成功 else install_result=失败 fi echo "$install_output" >> "$LOG_FILE" echo "正在卸载: $package" | tee -a "$LOG_FILE" remove_output=$(yum remove -y "$package" 2>&1) remove_status=$? if [ $remove_status -eq 0 ]; then remove_result=成功 else remove_result=失败 fi echo "$remove_output" >> "$LOG_FILE" if [ "$install_result" = "失败" ]; then error_msg="$install_output" elif [ "$remove_result" = "失败" ]; then error_msg="$remove_output" else error_msg="" fi echo "$package,$install_result,$remove_result,\"$error_msg\"" >> "$OUTPUT_FILE" echo "--------------------------------" >> "$LOG_FILE" done < "$PACKAGE_LIST" echo \"===== 安装测试日志结束 =====\" >> "$LOG_FILE" echo \"测试完成,结果保存到 $OUTPUT_FILE,过程日志保存到 $LOG_FILE\" """) os.chmod(install_test_script_path, 0o755) # 🔧 生成 SRPM 升降级测试脚本 success = gen_updown_test([excel_path], filter_choice="All", sheet_name="测试仓库本周重点分析的SRPM") if success: for fname in os.listdir("."): if fname.startswith("test_upgrade_downgrade_") and fname.endswith(".sh"): shutil.move(fname, os.path.join(output_dir, fname)) elif fname.startswith("test_results_") or fname.startswith("test_process_"): shutil.move(fname, os.path.join(output_dir, fname)) # # ✅ 汇总测试结果(安装卸载 + 升级降级) # generate_test_summary_excel(output_dir, product_name) def main(): products = load_product_config() for name, conf in products.items(): run_for_product( product_name=name, baseurl_map=conf['baseurls'], filename_prefix=conf['filename'], updates_test_url=conf.get('updates_test'), show_raw=False ) if __name__ == '__main__': main()