1
0
mirror of https://github.com/Oreomeow/VIP.git synced 2026-02-03 10:33:15 +08:00
Files
vip/Scripts/py/deleteDuplicateTasksImplement.py
sourcery-ai[bot] 6f437ddef5 ♻️ 'Refactored by Sourcery' (#14)
Co-authored-by: Sourcery AI <>
2022-10-08 14:43:43 +08:00

112 lines
3.1 KiB
Python

# -*- coding:utf-8 -*-
"""
cron: 20 0-23/2 * * *
new Env('清理重复任务');
"""
import json
import os
import sys
import time
import requests
ip = "localhost"
def loadSend():
print("加载推送功能")
global send
cur_path = os.path.abspath(os.path.dirname(__file__))
sys.path.append(cur_path)
if os.path.exists(f"{cur_path}/deleteDuplicateTasksNotify.py"):
try:
from deleteDuplicateTasksNotify import send
except Exception:
print("加载通知服务失败~")
headers = {
"Accept": "application/json",
"Authorization": "Basic YWRtaW46YWRtaW4=",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36",
}
def getTaskList():
t = round(time.time() * 1000)
url = "http://%s:5700/api/crons?searchValue=&t=%d" % (ip, t)
response = requests.get(url=url, headers=headers)
responseContent = json.loads(response.content.decode("utf-8"))
return responseContent["data"] if responseContent["code"] == 200 else []
def getDuplicate(taskList):
wholeNames = {}
duplicateID = []
for task in taskList:
if task["name"] in wholeNames:
duplicateID.append(task["_id"])
else:
wholeNames[task["name"]] = 1
return duplicateID
def getData(duplicateID):
rawData = "["
for count, id in enumerate(duplicateID):
rawData += '"%s"' % id
if count < len(duplicateID) - 1:
rawData += ", "
rawData += "]"
return rawData
def deleteDuplicateTasks(duplicateID):
t = round(time.time() * 1000)
url = "http://%s:5700/api/crons?t=%d" % (ip, t)
data = json.dumps(duplicateID)
headers["Content-Type"] = "application/json;charset=UTF-8"
response = requests.delete(url=url, headers=headers, data=data)
msg = json.loads(response.content.decode("utf-8"))
if msg["code"] != 200:
print(f"出错!,错误信息为:{msg}")
else:
print("成功删除重复任务")
def loadToken():
# cur_path = os.path.abspath(os.path.dirname(__file__))
# send("当前路径:",cur_path)
try:
with open("/ql/config/auth.json", "r", encoding="utf-8") as f:
data = json.load(f)
except Exception:
# pass
send("无法获取token", "")
return data["token"]
if __name__ == "__main__":
print("开始!")
loadSend()
# 直接从 /ql/config/auth.json中读取当前token
token = loadToken()
# send("成功获取token!","")
headers["Authorization"] = f"Bearer {token}"
taskList = getTaskList()
# 如果仍旧是空的,则报警
if len(taskList) == 0:
print("无法获取taskList!")
duplicateID = getDuplicate(taskList)
before = "清除前数量为:%d" % len(taskList)
print(before)
after = "清除重复任务后,数量为:%d" % (len(taskList) - len(duplicateID))
print(after)
if len(duplicateID) == 0:
print("没有重复任务")
else:
deleteDuplicateTasks(duplicateID)
send("清理成功", "\n%s\n%s" % (before, after))
# print("清理结束!")