# -*- coding: utf-8 -*- """compat.py""" import concurrent.futures import pandas as pd import configparser import subprocess import threading from datetime import datetime from itertools import zip_longest import time import shutil import json import os import re config = configparser.ConfigParser() config.read('repolist') RESULT_DIR = "/root/result/common/" RESULT_DIR1 = "/root/result/migrate/" def prepare_dirs(dirs): if os.path.exists(dirs): shutil.rmtree(dirs) os.makedirs(dirs) def test_task(section, arch): baseurl = config.get(section, 'baseurl') updateurl = config.get(section, 'updateurl') testurl = config.get(section, 'testurl') origurl = ','.join([baseurl, updateurl]) work_dir = RESULT_DIR1 + section + '/' + arch prepare_dirs(work_dir) cmd = "/usr/bin/kylin_pkgchecker -output %s repo -l 2 -o %s -t %s -a %s" % (work_dir, updateurl, origurl, arch) popen = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, text=True ) stdout, stderr = popen.communicate() sections = ['sp1', 'sp2', 'sp3', 'sp3-2403'] def test_repo_update(index): start_index = index + 1 orig_arch = config.get(sections[index], 'arch') orig_arch_list = re.split(',', orig_arch) orig_baseurl = config.get(sections[index], 'baseurl') orig_updateurl = config.get(sections[index], 'updateurl') orig_testurl = ','.join([orig_baseurl, orig_updateurl]) for j in range(start_index,len(sections)): target_baseurl = config.get(sections[j], 'baseurl') target_updateurl = config.get(sections[j], 'updateurl') target_arch = config.get(sections[j], 'arch') target_testurl = ','.join([target_baseurl, target_updateurl]) target_arch_list = re.split(',',target_arch) arch_list = list(set(orig_arch_list) & set(target_arch_list)) # print(arch_list) for arch in arch_list: work_dir = RESULT_DIR # prepare_dirs(work_dir) # print(sections[index],sections[j],arch) cmd = "/usr/bin/kylin_pkgchecker -output %s repo -l 3 -o %s -t %s -a %s" % (work_dir, target_testurl, orig_testurl, arch) popen = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, text=True ) stdout, stderr = popen.communicate() def test_repo_migrations(): with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: futures = [executor.submit(test_repo_update, section) for section in range(len(sections))] results = [future.result() for future in concurrent.futures.as_completed(futures)] def parser_config(section): arch = config.get(section,'arch') with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: futures = [executor.submit(test_task, section, a) for a in arch.split(',')] results = [future.result() for future in concurrent.futures.as_completed(futures)] def get_current_date(): timestamp = time.time() localtime = time.localtime(timestamp) now = time.strftime("%Y%m%d", localtime) return now def get_max_len(): pass def write_excel(data): col = ["产品", "架构", "依赖失败", "兼容失败", "源仓库", "目标仓库"] whitedata = list(zip_longest(data[0], data[1], data[2] , list(set(data[3])), data[4] , data[5], fillvalue = None)) now = get_current_date() excel_file = "仓库兼容性检测%s.xlsx" % now sheet_name = "失败列表" df = pd.DataFrame(whitedata, columns = col) df.to_excel( excel_file, sheet_name = sheet_name, index = False, startrow = 0, engine = "openpyxl" ) def append_sheet_excel(data): book = load_workbook() sheet = book['失败列表'] last_row = sheet.max_row append_df = pd.DataFrame(data) def parser_repo_json(): """ {"兼容":0,"NA":1,"不兼容":2,"待分析":3} {"依赖完整":0,"依赖缺失":1,"NA":"NA"} :return: """ #缺少依赖包列表 requirefaild = [] #兼容性失败包列表 compatfaild = [] data_list = [] # outfile = "/opt/repotest/repo.json" filename = "repo.json" for dirpath, dirname, files in os.walk("/var/www/html/output/20250425/result/"): if filename in files: outfile = os.path.join(dirpath, filename) with open(outfile) as f: data = json.load(f) for k,v in data.items(): if k == 'repo_data': for r in v: if r["otherRequiresRet"] == 1: requirefaild.append(r["name"]) if r["fileRet"] == 2 or r["cmdRet"] == 2 or r["serviceRet"] == 2 or r["configRet"] == 2 or r["abiRet"] == 2 or r["apiRet"] == 2: compatfaild.append(r["name"]) if k == 'summary_data': arch = v["sys_arch"] origin_repo_url = v["origin_repo"] target_repo_url = v["target_repo"] maxlen = max(len(requirefaild), len(compatfaild)) archlist = [arch] * maxlen origin_repo_url_list = [origin_repo_url] * maxlen target_repo_url_list = [target_repo_url] * maxlen product = ["Kylin-v7"] * maxlen data_slist.append(product) data_list.append(archlist) data_list.append(requirefaild) data_list.append(compatfaild) data_list.append(origin_repo_url_list) data_list.append(target_repo_url_list) print(data_list) write_excel(data_list) def test_single_repo(): sections = config.sections() prepare_dirs(RESULT_DIR) for section in sections: parser_config(section) if __name__ == "__main__": test_single_repo()