from collections import defaultdict import bz2 import lzma import shutil import sqlite3 import requests from lxml import etree import configparser import datetime import pandas as pd import openpyxl import gzip from openpyxl.styles import Font, PatternFill import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # --------------------- # 下载 primary.sqlite 并解压 # --------------------- def Getsqlite_primary(baseurl): """ 从repodata中获取数据库压缩文件,下载并解压 url 格式 例如 basrutl=https://update.cs2c.com.cn/NS/V10/8U6/os/adv/lic/AppStream/x86_64/os/ """ head = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" } path = f"{baseurl}repodata/" try: response = requests.get(path, headers=head, verify=False) html = response.text html = etree.HTML(html) a_links = html.xpath("//a") for item in a_links: item1 = item.get('href') if item1.endswith("primary.sqlite.bz2"): sqlite_url = '/'.join([path, item1]) print(sqlite_url) response = requests.get(sqlite_url, headers=head, verify=False) un_path = "repotest.sqlite.bz2" with open(un_path, 'wb') as code: code.write(response.content) bz2file = bz2.BZ2File(un_path) data = bz2file.read() newfilepath = "repotest.sqlite" open(newfilepath, 'wb').write(data) return newfilepath elif item1.endswith("primary.sqlite.xz"): sqlite_url = '/'.join([path, item1]) response = requests.get(sqlite_url, headers=head, verify=False) un_path = "repotest.sqlite.xz" with open(un_path, 'wb') as code: code.write(response.content) with lzma.open(un_path, 'rb') as input: with open("repotest.sqlite", 'wb') as output: shutil.copyfileobj(input, output) return "repotest.sqlite" elif item1.endswith("primary.sqlite.gz"): sqlite_url = '/'.join([path, item1]) print(sqlite_url) response = requests.get(sqlite_url, headers=head, verify=False) un_path = "repotest.sqlite.gz" with open(un_path, 'wb') as code: code.write(response.content) with gzip.open(un_path, 'rb') as input: with open("repotest.sqlite", 'wb') as output: shutil.copyfileobj(input, output) return "repotest.sqlite" print("获取数据库文件失败,请检查!") return None except Exception as e: print("发生异常:", e) return None # --------------------- # 获取包信息 # --------------------- def get_package_info_from_db1(baseurl): db_path = Getsqlite_primary(baseurl) conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute("SELECT pkgkey, name, arch, version, release, rpm_sourcerpm FROM packages") result = cursor.fetchall() conn.close() pkg_info = {} for pkgkey, name, arch, version, release, rpm_sourcerpm in result: pkg_info[pkgkey] = { "pkgkey": pkgkey, "name": name, "arch": arch, "version": version, "release": release, "source": rpm_sourcerpm, } return pkg_info # --------------------- # 提取 SRPM 名称/NVR # --------------------- def extract_srpm_nvr(pkg_info): return set(info['source'].replace('.src.rpm', '').replace('.nosrc.rpm', '') for info in pkg_info.values() if info['source']) def extract_binary_nvr(pkg_info): return set(f"{info['name']}-{info['version']}-{info['release']}.{info['arch']}" for info in pkg_info.values()) def extract_srpm_names_from_nvr_list(nvr_list): srpm_names = set() for nvr in nvr_list: parts = nvr.rsplit('-', 2) if len(parts) == 3: srpm_names.add(parts[0]) return sorted(srpm_names) # --------------------- # 差异分析(多仓库) # --------------------- def analyze_binary_distribution_all_repos(all_pkg_info_by_repo): srpm_versions = defaultdict(lambda: defaultdict(lambda: defaultdict(set))) for repo_name, pkg_info in all_pkg_info_by_repo.items(): for info in pkg_info.values(): source_rpm = info['source'] binary_name = info['name'] try: srpm_base = source_rpm.replace('.src.rpm', '').replace('.nosrc.rpm', '') srpm_name, srpm_version, srpm_release = srpm_base.rsplit('-', 2) srpm_fullver = f"{srpm_version}-{srpm_release}" except Exception: continue srpm_versions[srpm_name][repo_name][srpm_fullver].add(binary_name) srpm_change_flags = {} srpm_all_binaries = {} for srpm_name, repo_versions in srpm_versions.items(): all_binary_counts = [] all_binaries = set() for version_map in repo_versions.values(): for binaries in version_map.values(): all_binary_counts.append(len(binaries)) all_binaries.update(binaries) srpm_change_flags[srpm_name] = len(set(all_binary_counts)) > 1 srpm_all_binaries[srpm_name] = all_binaries records = [] for srpm_name, repo_versions in srpm_versions.items(): changed = srpm_change_flags[srpm_name] all_binaries = srpm_all_binaries[srpm_name] for repo_name, versions in repo_versions.items(): pkg_info_map = all_pkg_info_by_repo[repo_name] for version, binaries in versions.items(): added = sorted(binaries - all_binaries) removed = sorted(all_binaries - binaries) binary_nvrs = sorted([ f"{info['name']}-{info['version']}-{info['release']}.{info['arch']}" for info in pkg_info_map.values() if info["name"] in binaries and info["source"].replace('.src.rpm', '').replace('.nosrc.rpm', '').endswith(version) ]) records.append({ "Repo": repo_name, "SRPM Name": srpm_name, "SRPM Version": version, "Binary Count": len(binaries), "Binary Package List": ' '.join(sorted(binaries)), "Binary NVR List": ' '.join(binary_nvrs), "Binary Count Changed": "Yes" if changed else "No", "Added Binaries": ' '.join(added), "Removed Binaries": ' '.join(removed), }) df = pd.DataFrame(records) df.sort_values(by=["SRPM Name", "SRPM Version", "Repo"], inplace=True) return df # --------------------- # 对比测试仓库并返回重点 SRPM # --------------------- def compare_updates_and_test(updates_url, test_url): new_info = get_package_info_from_db1(updates_url) test_info = get_package_info_from_db1(test_url) new_srpms = extract_srpm_nvr(new_info) test_srpms = extract_srpm_nvr(test_info) new_bins = extract_binary_nvr(new_info) test_bins = extract_binary_nvr(test_info) added_srpms = sorted(test_srpms - new_srpms) removed_srpms = sorted(new_srpms - test_srpms) added_bins = sorted(test_bins - new_bins) removed_bins = sorted(new_bins - test_bins) focus_srpms = extract_srpm_names_from_nvr_list(added_srpms + removed_srpms) return added_srpms, removed_srpms, added_bins, removed_bins, focus_srpms def write_test_comparison_sheet(wb, added_srpms, removed_srpms, added_bins, removed_bins): sheet = wb.create_sheet("测试仓库对比") sheet.append(["新增 SRPM(仅测试仓库对比)", "新增 Binary NVR(仅测试仓库对比)"]) for i in range(max(len(added_srpms), len(added_bins))): sheet.append([added_srpms[i] if i < len(added_srpms) else "", added_bins[i] if i < len(added_bins) else ""]) sheet.append([]) sheet.append(["减少 SRPM(仅测试仓库对比)", "减少 Binary NVR(仅测试仓库对比)"]) for i in range(max(len(removed_srpms), len(removed_bins))): sheet.append([removed_srpms[i] if i < len(removed_srpms) else "", removed_bins[i] if i < len(removed_bins) else ""]) # --------------------- # 写入分析表 # --------------------- def write_filtered_analysis_sheet(analysis_df, target_srpms, worksheet): filtered_df = analysis_df[analysis_df["SRPM Name"].isin(target_srpms)] worksheet.append(list(filtered_df.columns)) for _, row in filtered_df.iterrows(): worksheet.append(list(row.values)) def write_analysis_to_excel(analysis_df, repo_name, worksheet, is_first=False): if is_first: worksheet.append(["Repo"] + list(analysis_df.columns)) for _, row in analysis_df.iterrows(): worksheet.append([repo_name] + list(row.values)) # --------------------- # 配置读取 # --------------------- def load_product_config(): config = configparser.ConfigParser() config.read('config.ini', encoding='utf-8') all_products = {} for section in config.sections(): if not config.has_option(section, 'baseurls'): continue baseurls_raw = config.get(section, 'baseurls', fallback='') baseurl_lines = [line.strip() for line in baseurls_raw.splitlines() if line.strip()] baseurl_dict = {} for line in baseurl_lines: if '=' in line: name, url = line.split('=', 1) baseurl_dict[name.strip()] = url.strip() filename = config.get(section, 'filename', fallback=section) updates_test = config.get(section, 'updates_test', fallback=None) all_products[section] = { "baseurls": baseurl_dict, "filename": filename, "updates_test": updates_test } return all_products # --------------------- # 主执行逻辑 # --------------------- def run_for_product(product_name, baseurl_map, filename_prefix, updates_test_url, show_raw=False): print(f"\n🔍 正在分析产品:{product_name}") wb = openpyxl.Workbook() if show_raw: raw_sheet = wb.create_sheet("仓库原始数据") analysis_sheet = wb.create_sheet("仓库所有SRPM分析") focus_sheet_external = wb.create_sheet("外网仓库本周重点分析的SRPM") focus_sheet_test = wb.create_sheet("测试仓库本周重点分析的SRPM") wb.remove(wb.active) all_pkg_info_by_repo = {} if show_raw: is_first_raw = True for repo_name, baseurl in baseurl_map.items(): info = get_package_info_from_db1(baseurl) all_pkg_info_by_repo[repo_name] = info write_Excel(info, repo_name, raw_sheet, is_first=is_first_raw) is_first_raw = False else: for repo_name, baseurl in baseurl_map.items(): all_pkg_info_by_repo[repo_name] = get_package_info_from_db1(baseurl) analysis_df = analyze_binary_distribution_all_repos(all_pkg_info_by_repo) write_analysis_to_excel(analysis_df, "Merged", analysis_sheet, is_first=True) updates_url = baseurl_map.get("updates") base_url = baseurl_map.get("base") if updates_url and updates_test_url: added_srpms, removed_srpms, added_bins, removed_bins, focus_srpms = compare_updates_and_test(updates_url, updates_test_url) write_test_comparison_sheet(wb, added_srpms, removed_srpms, added_bins, removed_bins) # 外网重点分析:base + updates ext_info = { 'base': get_package_info_from_db1(base_url), 'updates': get_package_info_from_db1(updates_url) } ext_df = analyze_binary_distribution_all_repos(ext_info) write_filtered_analysis_sheet(ext_df, focus_srpms, focus_sheet_external) # 测试重点分析:base + updates_test test_info = { 'base': get_package_info_from_db1(base_url), 'updates_test': get_package_info_from_db1(updates_test_url) } test_df = analyze_binary_distribution_all_repos(test_info) write_filtered_analysis_sheet(test_df, focus_srpms, focus_sheet_test) timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M") output_file = f"{filename_prefix}-{timestamp}.xlsx" wb.save(output_file) print(f"✅ {product_name} 分析完成,输出文件:{output_file}") # --------------------- # 主函数 # --------------------- def main(): products = load_product_config() for name, conf in products.items(): run_for_product( product_name=name, baseurl_map=conf['baseurls'], filename_prefix=conf['filename'], updates_test_url=conf.get('updates_test'), show_raw=False ) if __name__ == '__main__': main()