Files
2025-07-23 20:41:13 +08:00

180 lines
5.8 KiB
Python

# -*- coding: utf-8 -*-
"""compat.py"""
import concurrent.futures
import pandas as pd
import configparser
import subprocess
import threading
from datetime import datetime
from itertools import zip_longest
import time
import shutil
import json
import os
import re
config = configparser.ConfigParser()
config.read('repolist')
RESULT_DIR = "/root/result/common/"
RESULT_DIR1 = "/root/result/migrate/"
def prepare_dirs(dirs):
if os.path.exists(dirs):
shutil.rmtree(dirs)
os.makedirs(dirs)
def test_task(section, arch):
baseurl = config.get(section, 'baseurl')
updateurl = config.get(section, 'updateurl')
testurl = config.get(section, 'testurl')
origurl = ','.join([baseurl, updateurl])
work_dir = RESULT_DIR1 + section + '/' + arch
prepare_dirs(work_dir)
cmd = "/usr/bin/kylin_pkgchecker -output %s repo -l 2 -o %s -t %s -a %s" % (work_dir, updateurl, origurl, arch)
popen = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
text=True
)
stdout, stderr = popen.communicate()
sections = ['sp1', 'sp2', 'sp3', 'sp3-2403']
def test_repo_update(index):
start_index = index + 1
orig_arch = config.get(sections[index], 'arch')
orig_arch_list = re.split(',', orig_arch)
orig_baseurl = config.get(sections[index], 'baseurl')
orig_updateurl = config.get(sections[index], 'updateurl')
orig_testurl = ','.join([orig_baseurl, orig_updateurl])
for j in range(start_index,len(sections)):
target_baseurl = config.get(sections[j], 'baseurl')
target_updateurl = config.get(sections[j], 'updateurl')
target_arch = config.get(sections[j], 'arch')
target_testurl = ','.join([target_baseurl, target_updateurl])
target_arch_list = re.split(',',target_arch)
arch_list = list(set(orig_arch_list) & set(target_arch_list))
# print(arch_list)
for arch in arch_list:
work_dir = RESULT_DIR
# prepare_dirs(work_dir)
# print(sections[index],sections[j],arch)
cmd = "/usr/bin/kylin_pkgchecker -output %s repo -l 3 -o %s -t %s -a %s" % (work_dir, target_testurl, orig_testurl, arch)
popen = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
text=True
)
stdout, stderr = popen.communicate()
def test_repo_migrations():
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(test_repo_update, section) for section in range(len(sections))]
results = [future.result() for future in concurrent.futures.as_completed(futures)]
def parser_config(section):
arch = config.get(section,'arch')
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(test_task, section, a) for a in arch.split(',')]
results = [future.result() for future in concurrent.futures.as_completed(futures)]
def get_current_date():
timestamp = time.time()
localtime = time.localtime(timestamp)
now = time.strftime("%Y%m%d", localtime)
return now
def get_max_len():
pass
def write_excel(data):
col = ["产品", "架构", "依赖失败", "兼容失败", "源仓库", "目标仓库"]
whitedata = list(zip_longest(data[0], data[1], data[2] , list(set(data[3])), data[4] , data[5], fillvalue = None))
now = get_current_date()
excel_file = "仓库兼容性检测%s.xlsx" % now
sheet_name = "失败列表"
df = pd.DataFrame(whitedata, columns = col)
df.to_excel(
excel_file,
sheet_name = sheet_name,
index = False,
startrow = 0,
engine = "openpyxl"
)
def append_sheet_excel(data):
book = load_workbook()
sheet = book['失败列表']
last_row = sheet.max_row
append_df = pd.DataFrame(data)
def parser_repo_json():
"""
{"兼容":0,"NA":1,"不兼容":2,"待分析":3}
{"依赖完整":0,"依赖缺失":1,"NA":"NA"}
:return:
"""
#缺少依赖包列表
requirefaild = []
#兼容性失败包列表
compatfaild = []
data_list = []
# outfile = "/opt/repotest/repo.json"
filename = "repo.json"
for dirpath, dirname, files in os.walk("/var/www/html/output/20250425/result/"):
if filename in files:
outfile = os.path.join(dirpath, filename)
with open(outfile) as f:
data = json.load(f)
for k,v in data.items():
if k == 'repo_data':
for r in v:
if r["otherRequiresRet"] == 1:
requirefaild.append(r["name"])
if r["fileRet"] == 2 or r["cmdRet"] == 2 or r["serviceRet"] == 2 or r["configRet"] == 2 or r["abiRet"] == 2 or r["apiRet"] == 2:
compatfaild.append(r["name"])
if k == 'summary_data':
arch = v["sys_arch"]
origin_repo_url = v["origin_repo"]
target_repo_url = v["target_repo"]
maxlen = max(len(requirefaild), len(compatfaild))
archlist = [arch] * maxlen
origin_repo_url_list = [origin_repo_url] * maxlen
target_repo_url_list = [target_repo_url] * maxlen
product = ["Kylin-v7"] * maxlen
data_slist.append(product)
data_list.append(archlist)
data_list.append(requirefaild)
data_list.append(compatfaild)
data_list.append(origin_repo_url_list)
data_list.append(target_repo_url_list)
print(data_list)
write_excel(data_list)
def test_single_repo():
sections = config.sections()
prepare_dirs(RESULT_DIR)
for section in sections:
parser_config(section)
if __name__ == "__main__":
test_single_repo()