From 6f437ddef505bc1fa31555788dc3ac5bb9bbef24 Mon Sep 17 00:00:00 2001 From: "sourcery-ai[bot]" <58596630+sourcery-ai[bot]@users.noreply.github.com> Date: Sat, 8 Oct 2022 14:43:43 +0800 Subject: [PATCH] :recycle: 'Refactored by Sourcery' (#14) Co-authored-by: Sourcery AI <> --- Scripts/py/EUserv_extend.py | 167 ++++++++++---------- Scripts/py/FNplus.py | 4 +- Scripts/py/Wechat-Timed-Message.py | 24 +-- Scripts/py/airport.py | 16 +- Scripts/py/deleteDuplicateTasksImplement.py | 19 +-- Scripts/py/deleteDuplicateTasksNotify.py | 25 ++- Scripts/py/disable.py | 22 +-- Scripts/py/getBean.py | 13 +- Scripts/py/getcookie.py | 49 +++--- Scripts/py/hostloc2tg_api.py | 41 ++--- Scripts/py/jdcar.py | 17 +- Scripts/py/mimotion.py | 27 ++-- Scripts/py/mimotion2.py | 3 +- Scripts/py/netease.py | 32 ++-- Scripts/py/sendNotify.py | 19 +-- Scripts/py/tg_channel_downloader.py | 42 +++-- Scripts/py/win10-PyAuto-deploy.py | 10 +- 17 files changed, 232 insertions(+), 298 deletions(-) diff --git a/Scripts/py/EUserv_extend.py b/Scripts/py/EUserv_extend.py index 07f9e77..ceb91d3 100644 --- a/Scripts/py/EUserv_extend.py +++ b/Scripts/py/EUserv_extend.py @@ -126,9 +126,8 @@ def login_retry(*args, **kwargs): sess_id, session = func(username, password) if sess_id != "-1": return sess_id, session - else: - if number == max_retry: - return sess_id, session + if number == max_retry: + return sess_id, session else: return ret, ret_session @@ -154,8 +153,7 @@ def captcha_solver(captcha_image_url: str, session: requests.session) -> dict: "data": str(encoded_string)[2:-1], } r = requests.post(url=url, json=data) - j = json.loads(r.text) - return j + return json.loads(r.text) def handle_captcha_solved_result(solved: dict) -> str: @@ -175,26 +173,25 @@ def handle_captcha_solved_result(solved: dict) -> str: log("[Captcha Solver] You are using your own apikey.") text = solved_text operators = ["X", "x", "+", "-"] - if any(x in text for x in operators): - for operator in operators: - operator_pos = text.find(operator) - if operator == "x" or operator == "X": - operator = "*" - if operator_pos != -1: - left_part = text[:operator_pos] - right_part = text[operator_pos + 1:] - if left_part.isdigit() and right_part.isdigit(): - return eval( - "{left} {operator} {right}".format( - left=left_part, operator=operator, right=right_part - ) - ) - else: - # Because these symbols("X", "x", "+", "-") do not appear at the same time, - # it just contains an arithmetic symbol. - return text - else: + if all(x not in text for x in operators): return text + for operator in operators: + operator_pos = text.find(operator) + if operator in ["x", "X"]: + operator = "*" + if operator_pos != -1: + left_part = text[:operator_pos] + right_part = text[operator_pos + 1:] + return ( + eval( + "{left} {operator} {right}".format( + left=left_part, operator=operator, right=right_part + ) + ) + if left_part.isdigit() and right_part.isdigit() + else text + ) + else: print(solved) raise KeyError("Failed to find parsed results.") @@ -208,15 +205,13 @@ def get_captcha_solver_usage() -> dict: "apikey": APIKEY, } r = requests.get(url=url, params=params) - j = json.loads(r.text) - return j + return json.loads(r.text) @login_retry(max_retry=LOGIN_MAX_RETRY_COUNT) def login(username: str, password: str) -> (str, requests.session): headers = {"user-agent": user_agent, "origin": "https://www.euserv.com"} url = "https://support.euserv.com/index.iphp" - captcha_image_url = "https://support.euserv.com/securimage_show.php" session = requests.Session() sess = session.get(url, headers=headers) @@ -237,58 +232,56 @@ def login(username: str, password: str) -> (str, requests.session): f.raise_for_status() if ( - f.text.find("Hello") == -1 - and f.text.find("Confirm or change your customer data here") == -1 + f.text.find("Hello") != -1 + or f.text.find("Confirm or change your customer data here") != -1 ): - if ( + return sess_id, session + if ( f.text.find( "To finish the login process please solve the following captcha." ) == -1 ): - return "-1", session - else: - log("[Captcha Solver] 进行验证码识别...") - solved_result = captcha_solver(captcha_image_url, session) - captcha_code = handle_captcha_solved_result(solved_result) - log("[Captcha Solver] 识别的验证码是: {}".format(captcha_code)) + return "-1", session + log("[Captcha Solver] 进行验证码识别...") + captcha_image_url = "https://support.euserv.com/securimage_show.php" + solved_result = captcha_solver(captcha_image_url, session) + captcha_code = handle_captcha_solved_result(solved_result) + log(f"[Captcha Solver] 识别的验证码是: {captcha_code}") - if CHECK_CAPTCHA_SOLVER_USAGE: - usage = get_captcha_solver_usage() - log( - "[Captcha Solver] current date {0} api usage count: {1}".format( - usage[0]["date"], usage[0]["count"] - ) - ) - - f2 = session.post( - url, - headers=headers, - data={ - "subaction": "login", - "sess_id": sess_id, - "captcha_code": captcha_code, - }, + if CHECK_CAPTCHA_SOLVER_USAGE: + usage = get_captcha_solver_usage() + log( + "[Captcha Solver] current date {0} api usage count: {1}".format( + usage[0]["date"], usage[0]["count"] ) - if ( - f2.text.find( - "To finish the login process please solve the following captcha." - ) - == -1 - ): - log("[Captcha Solver] 验证通过") - return sess_id, session - else: - log("[Captcha Solver] 验证失败") - return "-1", session + ) - else: + f2 = session.post( + url, + headers=headers, + data={ + "subaction": "login", + "sess_id": sess_id, + "captcha_code": captcha_code, + }, + ) + if ( + f2.text.find( + "To finish the login process please solve the following captcha." + ) + == -1 + ): + log("[Captcha Solver] 验证通过") return sess_id, session + else: + log("[Captcha Solver] 验证失败") + return "-1", session def get_servers(sess_id: str, session: requests.session) -> {}: d = {} - url = "https://support.euserv.com/index.iphp?sess_id=" + sess_id + url = f"https://support.euserv.com/index.iphp?sess_id={sess_id}" headers = {"user-agent": user_agent, "origin": "https://www.euserv.com"} f = session.get(url=url, headers=headers) f.raise_for_status() @@ -297,16 +290,15 @@ def get_servers(sess_id: str, session: requests.session) -> {}: "#kc2_order_customer_orders_tab_content_1 .kc2_order_table.kc2_content_table tr" ): server_id = tr.select(".td-z1-sp1-kc") - if not len(server_id) == 1: + if len(server_id) != 1: continue flag = ( - True - if tr.select(".td-z1-sp2-kc .kc2_order_action_container")[0] + tr.select(".td-z1-sp2-kc .kc2_order_action_container")[0] .get_text() .find("Contract extension possible from") == -1 - else False ) + d[server_id[0].get_text()] = flag return d @@ -337,7 +329,7 @@ def renew( } f = session.post(url, headers=headers, data=data) f.raise_for_status() - if not json.loads(f.text)["rs"] == "success": + if json.loads(f.text)["rs"] != "success": return False token = json.loads(f.text)["token"]["value"] data = { @@ -358,7 +350,7 @@ def check(sess_id: str, session: requests.session): for key, val in d.items(): if val: flag = False - log("[EUserv] ServerID: %s Renew Failed!" % key) + log(f"[EUserv] ServerID: {key} Renew Failed!") if flag: log("[EUserv] ALL Work Done! Enjoy~") @@ -368,7 +360,7 @@ def check(sess_id: str, session: requests.session): def coolpush(): c = "EUserv续费日志\n\n" + desp data = json.dumps({"c": c}) - url = "https://push.xuthus.cc/" + COOL_PUSH_MODE + "/" + COOL_PUSH_SKEY + url = f"https://push.xuthus.cc/{COOL_PUSH_MODE}/{COOL_PUSH_SKEY}" response = requests.post(url, data=data) if response.status_code != 200: print("酷推 推送失败") @@ -397,7 +389,7 @@ def server_chan(): ("text", "EUserv续费日志"), ("desp", desp) ) - response = requests.post("https://sc.ftqq.com/" + SCKEY + ".send", data=data) + response = requests.post(f"https://sc.ftqq.com/{SCKEY}.send", data=data) if response.status_code != 200: print("Server酱 推送失败") else: @@ -410,7 +402,10 @@ def telegram(): ("chat_id", TG_USER_ID), ("text", "EUserv续费日志\n\n" + desp) ) - response = requests.post("https://" + TG_API_HOST + "/bot" + TG_BOT_TOKEN + "/sendMessage", data=data) + response = requests.post( + f"https://{TG_API_HOST}/bot{TG_BOT_TOKEN}/sendMessage", data=data + ) + if response.status_code != 200: print("Telegram Bot 推送失败") else: @@ -419,8 +414,16 @@ def telegram(): # wecomchan https://github.com/easychen/wecomchan def wecomchan(): - response = requests.get(WECOMCHAN_DOMAIN + "wecomchan?sendkey=" + WECOMCHAN_SEND_KEY + "&msg_type=text" + "&to_user=" + - WECOMCHAN_TO_USER + "&msg=" + "EUserv续费日志\n\n" + desp) + response = requests.get( + ( + ( + f"{WECOMCHAN_DOMAIN}wecomchan?sendkey={WECOMCHAN_SEND_KEY}&msg_type=text&to_user={WECOMCHAN_TO_USER}&msg=" + + "EUserv续费日志\n\n" + ) + + desp + ) + ) + if response.status_code != 200: print("wecomchan 推送失败") else: @@ -462,10 +465,10 @@ def email(): ) print("eMail 推送成功") except requests.exceptions.RequestException as e: - print(str(e)) + print(e) print("eMail 推送失败") except SMTPDataError as e1: - print(str(e1)) + print(e1) print("eMail 推送失败") @@ -486,15 +489,15 @@ def main_handler(event, context): log("[EUserv] 第 %d 个账号登陆失败,请检查登录信息" % (i + 1)) continue SERVERS = get_servers(sessid, s) - log("[EUserv] 检测到第 {} 个账号有 {} 台 VPS,正在尝试续期".format(i + 1, len(SERVERS))) + log(f"[EUserv] 检测到第 {i + 1} 个账号有 {len(SERVERS)} 台 VPS,正在尝试续期") for k, v in SERVERS.items(): if v: if not renew(sessid, s, passwd_list[i], k): - log("[EUserv] ServerID: %s Renew Error!" % k) + log(f"[EUserv] ServerID: {k} Renew Error!") else: - log("[EUserv] ServerID: %s has been successfully renewed!" % k) + log(f"[EUserv] ServerID: {k} has been successfully renewed!") else: - log("[EUserv] ServerID: %s does not need to be renewed" % k) + log(f"[EUserv] ServerID: {k} does not need to be renewed") time.sleep(15) check(sessid, s) time.sleep(5) diff --git a/Scripts/py/FNplus.py b/Scripts/py/FNplus.py index 3e6a0d6..8773596 100644 --- a/Scripts/py/FNplus.py +++ b/Scripts/py/FNplus.py @@ -41,7 +41,7 @@ password = args.password def qlnotify(desp): cur_path = os.path.abspath(os.path.dirname(__file__)) - if os.path.exists(cur_path + "/notify.py"): + if os.path.exists(f"{cur_path}/notify.py"): try: from notify import send except Exception: @@ -97,7 +97,7 @@ class FreeNom: msg = "get page token failed" print(msg) return - token = match.group(1) + token = match[1] # domains domains = re.findall(domain_info_ptn, r.text) diff --git a/Scripts/py/Wechat-Timed-Message.py b/Scripts/py/Wechat-Timed-Message.py index d9b3116..8df6947 100644 --- a/Scripts/py/Wechat-Timed-Message.py +++ b/Scripts/py/Wechat-Timed-Message.py @@ -49,9 +49,8 @@ def exwechat_get_ShortTimeMedia(img_url): def exwechat_send(title, digest, content): - url = ( - "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=" + access_token - ) + url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}" + data = { "touser": touser, "agentid": agentid, @@ -96,7 +95,7 @@ if sckey: if result["data"]["errno"] == 0: print("Server酱推送成功!") else: - errorNotify += "Server酱推送错误: " + result + "\n" + errorNotify += f"Server酱推送错误: {result}" + "\n" except Exception as e: print(e) errorNotify += "Server酱推送错误!\n" @@ -116,21 +115,14 @@ if pptoken: title = urllib.parse.quote_plus(title.replace("\n", "
")) message = urllib.parse.quote_plus(message.replace("\n", "
")) res = requests.get( - host - + "send?token=" - + pptoken - + "&title=" - + title - + "&content=" - + message - + "&template=html&topic=" - + pptopic + f"{host}send?token={pptoken}&title={title}&content={message}&template=html&topic={pptopic}" ) + result = res.json() if result["code"] == 200: print("成功通过PushPlus将结果通知给相关用户!") else: - errorNotify += "PushPlus推送错误: " + result + "\n" + errorNotify += f"PushPlus推送错误: {result}" + "\n" except Exception as e: print(e) errorNotify += "PushPlus推送错误!\n" @@ -141,11 +133,11 @@ title = os.getenv("TITLE") message = os.getenv("MSG") qywx_am_ay = re.split(",", qywx_am) -corpid = qywx_am_ay[0] corpsecret = qywx_am_ay[1] touser = qywx_am_ay[2] agentid = qywx_am_ay[3] +corpid = qywx_am_ay[0] if corpid: info = "" if corpsecret: @@ -158,7 +150,7 @@ if corpid: if result["errcode"] == 0: print("成功通过企业微信将结果通知给用户!") else: - errorNotify += "企业微信推送错误: " + res.text + "\n" + errorNotify += f"企业微信推送错误: {res.text}" + "\n" except Exception as e: print(e) errorNotify += "企业微信推送错误!\n" diff --git a/Scripts/py/airport.py b/Scripts/py/airport.py index 0c7d946..0046b44 100644 --- a/Scripts/py/airport.py +++ b/Scripts/py/airport.py @@ -20,7 +20,7 @@ requests.packages.urllib3.disable_warnings() def qlnotify(desp): cur_path = os.path.abspath(os.path.dirname(__file__)) - if os.path.exists(cur_path + "/notify.py"): + if os.path.exists(f"{cur_path}/notify.py"): try: from notify import send except Exception: @@ -44,7 +44,7 @@ class SspanelQd(object): msgall = "" for i in range(len(self.base_url)): email = self.email[i].split("@") - email = email[0] + "%40" + email[1] + email = f"{email[0]}%40{email[1]}" password = self.password[i] session = requests.session() @@ -70,30 +70,32 @@ class SspanelQd(object): print(msg) continue - login_url = self.base_url[i] + "/auth/login" + login_url = f"{self.base_url[i]}/auth/login" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36", "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", } - post_data = "email=" + email + "&passwd=" + password + "&code=" + post_data = f"email={email}&passwd={password}&code=" post_data = post_data.encode() response = session.post(login_url, post_data, headers=headers, verify=False) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36", - "Referer": self.base_url[i] + "/user", + "Referer": f"{self.base_url[i]}/user", } + response = session.post( - self.base_url[i] + "/user/checkin", headers=headers, verify=False + f"{self.base_url[i]}/user/checkin", headers=headers, verify=False ) + msg = (response.json()).get("msg") msgall = msgall + self.base_url[i] + "\n\n" + msg + "\n\n" print(msg) - info_url = self.base_url[i] + "/user" + info_url = f"{self.base_url[i]}/user" response = session.get(info_url, verify=False) return msgall diff --git a/Scripts/py/deleteDuplicateTasksImplement.py b/Scripts/py/deleteDuplicateTasksImplement.py index e68fe58..e3837e5 100644 --- a/Scripts/py/deleteDuplicateTasksImplement.py +++ b/Scripts/py/deleteDuplicateTasksImplement.py @@ -19,7 +19,7 @@ def loadSend(): global send cur_path = os.path.abspath(os.path.dirname(__file__)) sys.path.append(cur_path) - if os.path.exists(cur_path + "/deleteDuplicateTasksNotify.py"): + if os.path.exists(f"{cur_path}/deleteDuplicateTasksNotify.py"): try: from deleteDuplicateTasksNotify import send except Exception: @@ -38,19 +38,14 @@ def getTaskList(): url = "http://%s:5700/api/crons?searchValue=&t=%d" % (ip, t) response = requests.get(url=url, headers=headers) responseContent = json.loads(response.content.decode("utf-8")) - if responseContent["code"] == 200: - taskList = responseContent["data"] - return taskList - else: - # 没有获取到taskList,返回空 - return [] + return responseContent["data"] if responseContent["code"] == 200 else [] def getDuplicate(taskList): wholeNames = {} duplicateID = [] for task in taskList: - if task["name"] in wholeNames.keys(): + if task["name"] in wholeNames: duplicateID.append(task["_id"]) else: wholeNames[task["name"]] = 1 @@ -59,12 +54,10 @@ def getDuplicate(taskList): def getData(duplicateID): rawData = "[" - count = 0 - for id in duplicateID: + for count, id in enumerate(duplicateID): rawData += '"%s"' % id if count < len(duplicateID) - 1: rawData += ", " - count += 1 rawData += "]" return rawData @@ -77,7 +70,7 @@ def deleteDuplicateTasks(duplicateID): response = requests.delete(url=url, headers=headers, data=data) msg = json.loads(response.content.decode("utf-8")) if msg["code"] != 200: - print("出错!,错误信息为:%s" % msg) + print(f"出错!,错误信息为:{msg}") else: print("成功删除重复任务") @@ -100,7 +93,7 @@ if __name__ == "__main__": # 直接从 /ql/config/auth.json中读取当前token token = loadToken() # send("成功获取token!","") - headers["Authorization"] = "Bearer %s" % token + headers["Authorization"] = f"Bearer {token}" taskList = getTaskList() # 如果仍旧是空的,则报警 if len(taskList) == 0: diff --git a/Scripts/py/deleteDuplicateTasksNotify.py b/Scripts/py/deleteDuplicateTasksNotify.py index 62c8db9..f41d333 100644 --- a/Scripts/py/deleteDuplicateTasksNotify.py +++ b/Scripts/py/deleteDuplicateTasksNotify.py @@ -53,16 +53,11 @@ if "QQ_SKEY" in os.environ and os.environ["QQ_SKEY"] and "QQ_MODE" in os.environ QQ_SKEY = os.environ["QQ_SKEY"] QQ_MODE = os.environ["QQ_MODE"] # 获取pushplus+ PUSH_PLUS_TOKEN -if "PUSH_PLUS_TOKEN" in os.environ: - if len(os.environ["PUSH_PLUS_TOKEN"]) > 1: - PUSH_PLUS_TOKEN = os.environ["PUSH_PLUS_TOKEN"] - # print("已获取并使用Env环境 PUSH_PLUS_TOKEN") +if "PUSH_PLUS_TOKEN" in os.environ and len(os.environ["PUSH_PLUS_TOKEN"]) > 1: + PUSH_PLUS_TOKEN = os.environ["PUSH_PLUS_TOKEN"] # 获取企业微信应用推送 QYWX_AM -if "QYWX_AM" in os.environ: - if len(os.environ["QYWX_AM"]) > 1: - QYWX_AM = os.environ["QYWX_AM"] - # print("已获取并使用Env环境 QYWX_AM") - +if "QYWX_AM" in os.environ and len(os.environ["QYWX_AM"]) > 1: + QYWX_AM = os.environ["QYWX_AM"] if BARK: notify_mode.append('bark') # print("BARK 推送打开") @@ -90,7 +85,7 @@ if QYWX_AM: def message(str_msg): global message_info print(str_msg) - message_info = "{}\n{}".format(message_info, str_msg) + message_info = f"{message_info}\n{str_msg}" sys.stdout.flush() @@ -150,7 +145,7 @@ def telegram_bot(title, content): payload = {'chat_id': str(TG_USER_ID), 'text': f'{title}\n\n{content}', 'disable_web_page_preview': 'true'} proxies = None if TG_PROXY_IP and TG_PROXY_PORT: - proxyStr = "http://{}:{}".format(TG_PROXY_IP, TG_PROXY_PORT) + proxyStr = f"http://{TG_PROXY_IP}:{TG_PROXY_PORT}" proxies = {"http": proxyStr, "https": proxyStr} try: response = requests.post(url=url, headers=headers, params=payload, proxies=proxies).json() @@ -167,7 +162,7 @@ def telegram_bot(title, content): def dingding_bot(title, content): timestamp = str(round(time.time() * 1000)) # 时间戳 secret_enc = DD_BOT_SECRET.encode('utf-8') - string_to_sign = '{}\n{}'.format(timestamp, DD_BOT_SECRET) + string_to_sign = f'{timestamp}\n{DD_BOT_SECRET}' string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) # 签名 @@ -274,7 +269,8 @@ class WeCom: return data["access_token"] def send_text(self, message, touser="@all"): - send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + self.get_access_token() + send_url = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={self.get_access_token()}' + send_values = { "touser": touser, "msgtype": "text", @@ -290,7 +286,8 @@ class WeCom: return respone["errmsg"] def send_mpnews(self, title, message, media_id, touser="@all"): - send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + self.get_access_token() + send_url = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={self.get_access_token()}' + send_values = { "touser": touser, "msgtype": "mpnews", diff --git a/Scripts/py/disable.py b/Scripts/py/disable.py index 5e1a947..f173463 100644 --- a/Scripts/py/disable.py +++ b/Scripts/py/disable.py @@ -4,6 +4,7 @@ cron: 20 10 */7 * * new Env('禁用重复任务'); """ + import json import logging import os @@ -17,17 +18,13 @@ logger = logging.getLogger(name=None) # 创建一个日志对象 logging.Formatter("%(message)s") # 日志内容格式化 logger.setLevel(logging.INFO) # 设置日志等级 logger.addHandler(logging.StreamHandler()) # 添加控制台日志 -# logger.addHandler(logging.FileHandler(filename="text.log", mode="w")) # 添加文件日志 - - -ipport = os.getenv("IPPORT") -if not ipport: +if ipport := os.getenv("IPPORT"): + ipport = ipport.lstrip("http://").rstrip("/") +else: logger.info( "如果报错请在环境变量中添加你的真实 IP:端口\n名称:IPPORT\t值:127.0.0.1:5700\n或在 config.sh 中添加 export IPPORT='127.0.0.1:5700'" ) ipport = "localhost:5700" -else: - ipport = ipport.lstrip("http://").rstrip("/") sub_str = os.getenv("RES_SUB", "Aaron-lv_sync") sub_list = sub_str.split("&") res_only = os.getenv("RES_ONLY", True) @@ -43,7 +40,7 @@ def load_send() -> None: send = None cur_path = os.path.abspath(os.path.dirname(__file__)) sys.path.append(cur_path) - if os.path.exists(cur_path + "/notify.py"): + if os.path.exists(f"{cur_path}/notify.py"): try: from notify import send except Exception: @@ -52,14 +49,11 @@ def load_send() -> None: def get_tasklist() -> list: - tasklist = [] t = round(time.time() * 1000) url = f"http://{ipport}/api/crons?searchValue=&t={t}" response = requests.get(url=url, headers=headers) datas = json.loads(response.content.decode("utf-8")) - if datas.get("code") == 200: - tasklist = datas.get("data") - return tasklist + return datas.get("data") if datas.get("code") == 200 else [] def filter_res_sub(tasklist: list) -> tuple: @@ -95,7 +89,7 @@ def get_duplicate_list(tasklist: list) -> tuple: cmds.append(task.get("command")) name_list = [] - for i, name in enumerate(names): + for name in names: if name not in name_list: name_list.append(name) @@ -122,7 +116,7 @@ def get_duplicate_list(tasklist: list) -> tuple: def reserve_task_only( tem_ids: list, tem_tasks: list, dup_ids: list, res_list: list ) -> list: - if len(tem_ids) == 0: + if not tem_ids: return tem_ids logger.info("\n=== 最终筛选开始 ===") diff --git a/Scripts/py/getBean.py b/Scripts/py/getBean.py index b0d683f..c70f89d 100644 --- a/Scripts/py/getBean.py +++ b/Scripts/py/getBean.py @@ -45,10 +45,7 @@ while count: from telethon.sessions import StringSession break except Exception: - if count == 2: - pip = 'pip3' - else: - pip = 'pip' + pip = 'pip3' if count == 2 else 'pip' print(f'检测到没有 telethon 库,开始换源进行安装,将使用 {pip} 命令') os.system(f'{pip} install telethon -i https://pypi.tuna.tsinghua.edu.cn/simple') count -= 1 @@ -59,10 +56,7 @@ while count: import requests break except Exception: - if count == 2: - pip = 'pip3' - else: - pip = 'pip' + pip = 'pip3' if count == 2 else 'pip' print(f'检测到没有 requests 库,开始换源进行安装,将使用 {pip} 命令') os.system(f'{pip} install requests -i https://pypi.tuna.tsinghua.edu.cn/simple') count -= 1 @@ -137,8 +131,7 @@ def accountBean(docker_file_path): for line in f: bean_account_line = re.findall(r'获得 [0-9]{2} 京豆', line, re.DOTALL) if bean_account_line != []: - bean_account = int(re.findall(r"\d{2,}", bean_account_line[0])[0]) - return bean_account + return int(re.findall(r"\d{2,}", bean_account_line[0])[0]) if line.find("'code': '1'") != -1: return 20 diff --git a/Scripts/py/getcookie.py b/Scripts/py/getcookie.py index 63f491f..5e2fa19 100644 --- a/Scripts/py/getcookie.py +++ b/Scripts/py/getcookie.py @@ -25,20 +25,18 @@ jd_cookie = "" def getSToken(): time_stamp = int(time.time() * 1000) - get_url = ( - "https://plogin.m.jd.com/cgi-bin/mm/new_login_entrance?lang=chs&appid=300&returnurl=https://wq.jd.com/passport/LoginRedirect?state=%s&returnurl=https://home.m.jd.com/myJd/newhome.action?sceneval=2&ufc=&/myJd/home.action&source=wq_passport" - % time_stamp - ) + get_url = f"https://plogin.m.jd.com/cgi-bin/mm/new_login_entrance?lang=chs&appid=300&returnurl=https://wq.jd.com/passport/LoginRedirect?state={time_stamp}&returnurl=https://home.m.jd.com/myJd/newhome.action?sceneval=2&ufc=&/myJd/home.action&source=wq_passport" + get_header = { "Connection": "Keep-Alive", "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-cn", - "Referer": "https://plogin.m.jd.com/login/login?appid=300&returnurl=https://wq.jd.com/passport/LoginRedirect?state=%s&returnurl=https://home.m.jd.com/myJd/newhome.action?sceneval=2&ufc=&/myJd/home.action&source=wq_passport" - % time_stamp, + "Referer": f"https://plogin.m.jd.com/login/login?appid=300&returnurl=https://wq.jd.com/passport/LoginRedirect?state={time_stamp}&returnurl=https://home.m.jd.com/myJd/newhome.action?sceneval=2&ufc=&/myJd/home.action&source=wq_passport", "User-Agent": "Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_2 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8H7 Safari/6533.18.5 UCBrowser/13.4.2.1122", "Host": "plogin.m.jd.com", } + resp = requests.get(url=get_url, headers=get_header) parseGetRespCookie(resp.headers, resp.json()) @@ -56,27 +54,25 @@ def parseGetRespCookie(headers, get_resp): def getOKLToken(): post_time_stamp = int(time.time() * 1000) - post_url = ( - "https://plogin.m.jd.com/cgi-bin/m/tmauthreflogurl?s_token=%s&v=%s&remember=true" - % (s_token, post_time_stamp) - ) + post_url = f"https://plogin.m.jd.com/cgi-bin/m/tmauthreflogurl?s_token={s_token}&v={post_time_stamp}&remember=true" + post_data = { "lang": "chs", "appid": 300, - "returnurl": "https://wqlogin2.jd.com/passport/LoginRedirect?state=%s&returnurl=//home.m.jd.com/myJd/newhome.action?sceneval=2&ufc=&/myJd/home.action" - % post_time_stamp, + "returnurl": f"https://wqlogin2.jd.com/passport/LoginRedirect?state={post_time_stamp}&returnurl=//home.m.jd.com/myJd/newhome.action?sceneval=2&ufc=&/myJd/home.action", "source": "wq_passport", } + post_header = { "Connection": "Keep-Alive", "Content-Type": "application/x-www-form-urlencoded; Charset=UTF-8", "Accept": "application/json, text/plain, */*", "Cookie": cookies, - "Referer": "https://plogin.m.jd.com/login/login?appid=300&returnurl=https://wqlogin2.jd.com/passport/LoginRedirect?state=%s&returnurl=//home.m.jd.com/myJd/newhome.action?sceneval=2&ufc=&/myJd/home.action&source=wq_passport" - % post_time_stamp, + "Referer": f"https://plogin.m.jd.com/login/login?appid=300&returnurl=https://wqlogin2.jd.com/passport/LoginRedirect?state={post_time_stamp}&returnurl=//home.m.jd.com/myJd/newhome.action?sceneval=2&ufc=&/myJd/home.action&source=wq_passport", "User-Agent": "Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_2 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8H7 Safari/6533.18.5 UCBrowser/13.4.2.1122", "Host": "plogin.m.jd.com", } + try: global okl_token resp = requests.post( @@ -146,37 +142,34 @@ async def my_cookie(event): ) convdata = await conv.wait_event(press_event(SENDER)) res = bytes.decode(convdata.data) - if res == "cancel": - login = False - await jdbot.delete_messages(chat_id, cookiemsg) - msg = await conv.send_message("对话已取消") - conv.cancel() - else: + if res != "cancel": raise exceptions.TimeoutError() + login = False + await jdbot.delete_messages(chat_id, cookiemsg) + msg = await conv.send_message("对话已取消") + conv.cancel() except exceptions.TimeoutError: expired_time = time.time() + 60 * 2 while login: check_time_stamp = int(time.time() * 1000) - check_url = ( - "https://plogin.m.jd.com/cgi-bin/m/tmauthchecktoken?&token=%s&ou_state=0&okl_token=%s" - % (token, okl_token) - ) + check_url = f"https://plogin.m.jd.com/cgi-bin/m/tmauthchecktoken?&token={token}&ou_state=0&okl_token={okl_token}" + check_data = { "lang": "chs", "appid": 300, - "returnurl": "https://wqlogin2.jd.com/passport/LoginRedirect?state=%s&returnurl=//home.m.jd.com/myJd/newhome.action?sceneval=2&ufc=&/myJd/home.action" - % check_time_stamp, + "returnurl": f"https://wqlogin2.jd.com/passport/LoginRedirect?state={check_time_stamp}&returnurl=//home.m.jd.com/myJd/newhome.action?sceneval=2&ufc=&/myJd/home.action", "source": "wq_passport", } + check_header = { - "Referer": "https://plogin.m.jd.com/login/login?appid=300&returnurl=https://wqlogin2.jd.com/passport/LoginRedirect?state=%s&returnurl=//home.m.jd.com/myJd/newhome.action?sceneval=2&ufc=&/myJd/home.action&source=wq_passport" - % check_time_stamp, + "Referer": f"https://plogin.m.jd.com/login/login?appid=300&returnurl=https://wqlogin2.jd.com/passport/LoginRedirect?state={check_time_stamp}&returnurl=//home.m.jd.com/myJd/newhome.action?sceneval=2&ufc=&/myJd/home.action&source=wq_passport", "Cookie": cookies, "Connection": "Keep-Alive", "Content-Type": "application/x-www-form-urlencoded; Charset=UTF-8", "Accept": "application/json, text/plain, */*", "User-Agent": "Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_2 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8H7 Safari/6533.18.5 UCBrowser/13.4.2.1122", } + resp = requests.post( url=check_url, headers=check_header, data=check_data, timeout=30 ) diff --git a/Scripts/py/hostloc2tg_api.py b/Scripts/py/hostloc2tg_api.py index dde50a8..72e26e5 100644 --- a/Scripts/py/hostloc2tg_api.py +++ b/Scripts/py/hostloc2tg_api.py @@ -20,12 +20,9 @@ def post_tg(url, count): r = requests.get(url) if '"ok":true,' in r.text: print("发送成功!") - pass else: count = count + 1 - if count > 5: - pass - else: + if count <= 5: time.sleep(3) print("发送失败,正在重试") post_tg(url, count) @@ -85,38 +82,31 @@ while True: with requests.get("https://hostloc.cherbim.ml/", stream=True, timeout=5) as r: print(time.strftime("%m-%d %H:%M:%S", time.localtime())) for i in r.json()["new_data"][0][15:]: - if i["主题ID"] in hostloc_list or i["主题"] in hostloc_title: - pass - else: + if ( + i["主题ID"] not in hostloc_list + and i["主题"] not in hostloc_title + ): hostloc_list = hostloc_list[1::] hostloc_list.append(i["主题ID"]) hostloc_title = hostloc_title[1::] hostloc_title.append(i["主题"]) a = "https://www.hostloc.com/thread-{0}-1-1.html".format(i["主题ID"]) time_1 = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - if "论坛bug,此贴内容无法查看~" not in i["主题内容"][0:100]: - a = a - else: - a = f"{a}" + a = a if "论坛bug,此贴内容无法查看~" not in i["主题内容"][:100] else f"{a}" text = ( - "主 题:" - + "{}".format( - i["主题"] - .replace("&", "%26") - .replace("<", "%26lt%3b") - .replace(">", "%26gt%3b") - .replace("#", " ") - ) + f'主 题:{i["主题"].replace("&", "%26").replace("<", "%26lt%3b").replace(">", "%26gt%3b").replace("#", " ")}' + "\n" + "发 布 者:" - + """{1}""".format(i["发布者链接"], i["发布者"]) + + """{1}""".format( + i["发布者链接"], i["发布者"] + ) + "\n" + "时 间:" + time_1 + "\n" + "内容预览:" + """{0}""".format( - i["主题内容"][0:100] + i["主题内容"][:100] .replace("&", "%26") .replace("<", "%26lt%3b") .replace(">", "%26gt%3b") @@ -126,16 +116,13 @@ while True: + "直达链接: " + a ) + print(text) # 修改为你自己的bot api token和chat_id(可以是用户也可以是频道) chat_id = os.environ.get("HOST_GROUP_ID") bot_api_token = os.environ.get("HOST_BOT_TOKEN") - tg_url = ( - f"https://api.telegram.org/bot{bot_api_token}/sendMessage?parse_mode=HTML&chat_id=" - + chat_id - + "&text=" - + text - ) + tg_url = f"https://api.telegram.org/bot{bot_api_token}/sendMessage?parse_mode=HTML&chat_id={chat_id}&text={text}" + b = 0 post_tg(tg_url, b) time.sleep(2) diff --git a/Scripts/py/jdcar.py b/Scripts/py/jdcar.py index 4d5e8c9..a56b406 100644 --- a/Scripts/py/jdcar.py +++ b/Scripts/py/jdcar.py @@ -3,6 +3,7 @@ pip3 install telethon pysocks httpx """ + import os import time @@ -36,31 +37,31 @@ jdhealth = os.environ.get("jdhealth") for num in range(len(api_id_list)): - session_name = ["id_" + str(i) for i in api_id_list] + session_name = [f"id_{str(i)}" for i in api_id_list] client = TelegramClient(session_name[num], api_id_list[num], api_hash_list[num]) client.start() # 第一项是机器人ID,第二项是发送的文字 # 种豆得豆 if jdplantbean is not None: - client.send_message("@BotFather", "/bean " + jdplantbean) + client.send_message("@BotFather", f"/bean {jdplantbean}") # 东东农场 if jdfruit is not None: - client.send_message("@BotFather", "/farm " + jdfruit) + client.send_message("@BotFather", f"/farm {jdfruit}") # 京喜工厂 if jxfactory is not None: - client.send_message("@BotFather", "/jxfactory " + jxfactory) + client.send_message("@BotFather", f"/jxfactory {jxfactory}") # 闪购盲盒 if jdsgmh is not None: - client.send_message("@BotFather", "/sgmh " + jdsgmh) + client.send_message("@BotFather", f"/sgmh {jdsgmh}") # 东东工厂 if jdfactory is not None: - client.send_message("@BotFather", "/ddfactory " + jdfactory) + client.send_message("@BotFather", f"/ddfactory {jdfactory}") # 东东萌宠 if jdpet is not None: - client.send_message("@BotFather", "/pet " + jdpet) + client.send_message("@BotFather", f"/pet {jdpet}") # 东东健康 if jdhealth is not None: - client.send_message("@BotFather", "/health " + jdhealth) + client.send_message("@BotFather", f"/health {jdhealth}") time.sleep(5) # 延时5秒,等待机器人回应(一般是秒回应,但也有发生阻塞的可能) client.send_read_acknowledge("@BotFather") # 将机器人回应设为已读 diff --git a/Scripts/py/mimotion.py b/Scripts/py/mimotion.py index ed07826..7bc1f83 100644 --- a/Scripts/py/mimotion.py +++ b/Scripts/py/mimotion.py @@ -36,13 +36,12 @@ headers = {"User-Agent": "Dalvik/2.1.0 (Linux; U; Android 9; MI 6 MIUI/20.6.18)" # 获取登录 code def get_code(location): code_pattern = re.compile("(?<=access=).*?(?=&)") - code = code_pattern.findall(location)[0] - return code + return code_pattern.findall(location)[0] # 登录 def login(_user, password): - url1 = "https://api-user.huami.com/registrations/+86" + _user + "/tokens" + url1 = f"https://api-user.huami.com/registrations/+86{_user}/tokens" _headers = { "Content-Type": "application/x-www-form-urlencoded;charset=UTF-8", "User-Agent": "MiFit/4.6.0 (iPhone; iOS 14.0.1; Scale/2.00)", @@ -89,11 +88,11 @@ def main(_user, _passwd, _step): _user = str(_user) password = str(_passwd) _step = str(_step) - if _user == "" or password == "": + if not _user or not password: print("用户名或密码不能为空!") return "user and passwd not empty!" - if _step == "": + if not _step: print("已设置为随机步数(18000-25000)") _step = str(random.randint(18000, 25000)) login_token, userid = login(_user, password) @@ -111,7 +110,7 @@ def main(_user, _passwd, _step): find_date = re.compile(r".*?date%22%3A%22(.*?)%22%2C%22data.*?") find_step = re.compile(r".*?ttl%5C%22%3A(.*?)%2C%5C%22dis.*?") - data_json = re.sub(find_date.findall(data_json)[0], today, str(data_json)) + data_json = re.sub(find_date.findall(data_json)[0], today, data_json) data_json = re.sub(find_step.findall(data_json)[0], _step, str(data_json)) url = f"https://api-mifit-cn.huami.com/v1/data/band_data.json?&t={t}" @@ -130,18 +129,16 @@ def main(_user, _passwd, _step): def get_time(): url = "http://api.m.taobao.com/rest/api3.do?api=mtop.common.getTimestamp" response = requests.get(url, headers=headers).json() - t = response["data"]["t"] - return t + return response["data"]["t"] # 获取app_token def get_app_token(login_token): url = f"https://account-cn.huami.com/v1/client/app_tokens?app_name=com.xiaomi.hm.health&dn=api-user.huami.com%2Capi-mifit.huami.com%2Capp-analytics.huami.com&login_token={login_token}" response = requests.get(url, headers=headers).json() - app_token = response["token_info"]["app_token"] # print("app_token获取成功!") # print(app_token) - return app_token + return response["token_info"]["app_token"] # 推送 server 酱 @@ -227,7 +224,7 @@ def wxpush(msg, usr, corpid, corpsecret, agentid=1000002): # 获取 access_token,每次的 access_token 都不一样,所以需要运行一次请求一次 def get_access_token(_base_url, _corpid, _corpsecret): - urls = _base_url + "corpid=" + _corpid + "&corpsecret=" + _corpsecret + urls = f"{_base_url}corpid={_corpid}&corpsecret={_corpsecret}" resp = requests.get(urls).json() access_token = resp["access_token"] return access_token @@ -271,7 +268,7 @@ def wxpush(msg, usr, corpid, corpsecret, agentid=1000002): if __name__ == "__main__": # Push Mode Pm = os.environ.get("PMODE") - if Pm == "wx" or Pm == "nwx": + if Pm in ["wx", "nwx"]: _sckey = os.environ.get("PKEY") if _sckey == "": print("未提供 sckey,不进行推送!") @@ -309,11 +306,11 @@ if __name__ == "__main__": if len(user_list) == len(passwd_list): push = "" - for line in range(0, len(user_list)): + for line in range(len(user_list)): if len(step_array) == 2: step = str(random.randint(int(step_array[0]), int(step_array[1]))) print(f"已设置为随机步数({step_array[0]}-{step_array[1]})") - elif str(step) == "": + elif not str(step): step = "" push += main(user_list[line], passwd_list[line], step) + "\n" if Pm == "wx": @@ -329,7 +326,5 @@ if __name__ == "__main__": wxpush(push, sl[0], sl[1], sl[2]) elif Pm == "pp": push_pushplus(token, push) - elif Pm == "off": - pass else: print("用户名和密码数量不对") diff --git a/Scripts/py/mimotion2.py b/Scripts/py/mimotion2.py index eb72d8a..c2c7a93 100644 --- a/Scripts/py/mimotion2.py +++ b/Scripts/py/mimotion2.py @@ -14,6 +14,7 @@ STEP: 步数 空或不填则为 18000-25000 之间随机,自定义示 MI_API: api 接口 """ + import os import random @@ -32,7 +33,7 @@ else: if len(step_array) == 2: step = str(random.randint(int(step_array[0]), int(step_array[1]))) print(f"已设置为随机步数({step_array[0]}-{step_array[1]})") -elif str(step) == "": +elif not str(step): step = int(random.uniform(18000, 25000)) diff --git a/Scripts/py/netease.py b/Scripts/py/netease.py index 4ee858e..b9f14bf 100644 --- a/Scripts/py/netease.py +++ b/Scripts/py/netease.py @@ -88,7 +88,7 @@ def get_args(): def get_envs(): - infos = { + return { "phone": os.getenv("NETEASE_USER"), "password": os.getenv("NETEASE_PWD"), "sc_key": os.getenv("PUSH_KEY"), @@ -103,14 +103,12 @@ def get_envs(): "qmsg_key": os.getenv("QMSG_KEY"), "ding_token": os.getenv("DD_BOT_TOKEN"), } - return infos # Calculate the MD5 value of text # 计算字符串的32位小写MD5值 def calc_md5(text): - md5_text = hashlib.md5(text.encode(encoding="utf-8")).hexdigest() - return md5_text + return hashlib.md5(text.encode(encoding="utf-8")).hexdigest() # AES Encrypt @@ -185,7 +183,7 @@ class Push: headers = {"Content-type": "application/x-www-form-urlencoded"} content = {"title": "网易云打卡", "desp": self.text} ret = requests.post(url, headers=headers, data=content) - print("ServerChan: " + ret.text) + print(f"ServerChan: {ret.text}") # Telegram Bot Push def telegram_push(self): @@ -198,7 +196,7 @@ class Push: "text": self.text, } ret = requests.post(url, data=data) - print("Telegram: " + ret.text) + print(f"Telegram: {ret.text}") # Bark Push def bark_push(self): @@ -209,7 +207,7 @@ class Push: headers = {"Content-Type": "application/json;charset=utf-8"} url = "https://api.day.app/{0}/?isArchive={1}".format(arg[0], arg[1]) ret = requests.post(url, json=data, headers=headers) - print("Bark: " + ret.text) + print(f"Bark: {ret.text}") # PushPlus Push def push_plus_push(self): @@ -220,7 +218,7 @@ class Push: arg[0], "网易云打卡", self.text, "html" ) ret = requests.get(url) - print("pushplus: " + ret.text) + print(f"pushplus: {ret.text}") # Wecom Push def wecom_id_push(self): @@ -252,7 +250,7 @@ class Push: if ret["errcode"] != 0: print("微信推送配置错误") else: - print("Wecom: " + ret) + print(f"Wecom: {ret}") # Qmsg Push def qmsg_push(self): @@ -261,7 +259,7 @@ class Push: arg = self.info["qmsg_key"] url = "https://qmsg.zendee.cn/send/{0}?msg={1}".format(arg[0], self.text) ret = requests.post(url) - print("Qmsg: " + ret.text) + print(f"Qmsg: {ret.text}") # Ding Talk Push def ding_talk_push(self): @@ -274,7 +272,7 @@ class Push: {"msgtype": "text", "text": {"content": "【CMLU】\n\n" + self.text}} ) ret = requests.post(url, headers=header, data=data) - print("Ding: " + ret.text) + print(f"Ding: {ret.text}") # 加密类,实现网易云音乐前端加密流程 @@ -398,8 +396,9 @@ class CloudMusic: # 获取用户的收藏歌单 def get_subscribe_playlists(self): private_url = ( - "https://music.163.com/weapi/user/playlist?csrf_token=" + self.csrf + f"https://music.163.com/weapi/user/playlist?csrf_token={self.csrf}" ) + res = self.session.post( url=private_url, data=self.enc.encrypt( @@ -417,18 +416,15 @@ class CloudMusic: ret = json.loads(res.text) subscribed_lists = [] if ret["code"] == 200: - for li in ret["playlist"]: - if li["subscribed"]: - subscribed_lists.append(li["id"]) + subscribed_lists.extend(li["id"] for li in ret["playlist"] if li["subscribed"]) else: print("个人订阅歌单获取失败 " + str(ret["code"]) + ":" + ret["message"]) return subscribed_lists # 获取某一歌单内的所有音乐ID def get_list_musics(self, mlist): - detail_url = ( - "https://music.163.com/weapi/v6/playlist/detail?csrf_token=" + self.csrf - ) + detail_url = f"https://music.163.com/weapi/v6/playlist/detail?csrf_token={self.csrf}" + musics = [] for m in mlist: res = self.session.post( diff --git a/Scripts/py/sendNotify.py b/Scripts/py/sendNotify.py index 0846900..4b643d7 100644 --- a/Scripts/py/sendNotify.py +++ b/Scripts/py/sendNotify.py @@ -70,16 +70,11 @@ if ( QQ_SKEY = os.environ["QQ_SKEY"] QQ_MODE = os.environ["QQ_MODE"] # 获取pushplus+ PUSH_PLUS_TOKEN -if "PUSH_PLUS_TOKEN" in os.environ: - if len(os.environ["PUSH_PLUS_TOKEN"]) > 1: - PUSH_PLUS_TOKEN = os.environ["PUSH_PLUS_TOKEN"] - # print("已获取并使用Env环境 PUSH_PLUS_TOKEN") +if "PUSH_PLUS_TOKEN" in os.environ and len(os.environ["PUSH_PLUS_TOKEN"]) > 1: + PUSH_PLUS_TOKEN = os.environ["PUSH_PLUS_TOKEN"] # 获取企业微信应用推送 QYWX_AM -if "QYWX_AM" in os.environ: - if len(os.environ["QYWX_AM"]) > 1: - QYWX_AM = os.environ["QYWX_AM"] - # print("已获取并使用Env环境 QYWX_AM") - +if "QYWX_AM" in os.environ and len(os.environ["QYWX_AM"]) > 1: + QYWX_AM = os.environ["QYWX_AM"] if BARK: notify_mode.append("bark") # print("BARK 推送打开") @@ -107,7 +102,7 @@ if QYWX_AM: def message(str_msg): global message_info print(str_msg) - message_info = "{}\n{}".format(message_info, str_msg) + message_info = f"{message_info}\n{str_msg}" sys.stdout.flush() @@ -171,7 +166,7 @@ def telegram_bot(title, content): } proxies = None if TG_PROXY_IP and TG_PROXY_PORT: - proxyStr = "http://{}:{}".format(TG_PROXY_IP, TG_PROXY_PORT) + proxyStr = f"http://{TG_PROXY_IP}:{TG_PROXY_PORT}" proxies = {"http": proxyStr, "https": proxyStr} try: response = requests.post( @@ -190,7 +185,7 @@ def telegram_bot(title, content): def dingding_bot(title, content): timestamp = str(round(time.time() * 1000)) # 时间戳 secret_enc = DD_BOT_SECRET.encode("utf-8") - string_to_sign = "{}\n{}".format(timestamp, DD_BOT_SECRET) + string_to_sign = f"{timestamp}\n{DD_BOT_SECRET}" string_to_sign_enc = string_to_sign.encode("utf-8") hmac_code = hmac.new( secret_enc, string_to_sign_enc, digestmod=hashlib.sha256 diff --git a/Scripts/py/tg_channel_downloader.py b/Scripts/py/tg_channel_downloader.py index ee85347..4b4fa10 100644 --- a/Scripts/py/tg_channel_downloader.py +++ b/Scripts/py/tg_channel_downloader.py @@ -41,8 +41,7 @@ queue = asyncio.Queue() # 文件夹/文件名称处理 def validate_title(title): r_str = r"[\/\\\:\*\?\"\<\>\|\n]" # '/ \ : * ? " < > |' - new_title = re.sub(r_str, "_", title) # 替换为下划线 - return new_title + return re.sub(r_str, "_", title) # 获取相册标题 @@ -52,10 +51,9 @@ async def get_group_caption(message): async for msg in client.iter_messages( entity=entity, reverse=True, offset_id=message.id - 9, limit=10 ): - if msg.grouped_id == message.grouped_id: - if msg.text != "": - group_caption = msg.text - return group_caption + if msg.grouped_id == message.grouped_id and msg.text != "": + group_caption = msg.text + return group_caption return group_caption @@ -140,15 +138,15 @@ async def worker(name): @events.register(events.NewMessage(pattern="/start", from_users=admin_id)) async def handler(update): text = update.message.text.split(" ") - msg = ( - "参数错误,请按照参考格式输入:\n\n" - "1.普通群组\n" - "/start https://t.me/fkdhlg 0 \n\n" - "2.私密群组(频道) 链接为随便复制一条群组消息链接\n" - "/start https://t.me/12000000/1 0 \n\n" - "Tips:如果不输入offset_id,默认从第一条开始下载" - ) - if len(text) == 1: + if len(text) == 1 or len(text) not in [2, 3]: + msg = ( + "参数错误,请按照参考格式输入:\n\n" + "1.普通群组\n" + "/start https://t.me/fkdhlg 0 \n\n" + "2.私密群组(频道) 链接为随便复制一条群组消息链接\n" + "/start https://t.me/12000000/1 0 \n\n" + "Tips:如果不输入offset_id,默认从第一条开始下载" + ) await bot.send_message(admin_id, msg, parse_mode="HTML") return elif len(text) == 2: @@ -168,7 +166,7 @@ async def handler(update): "chat输入错误,请输入频道或群组的链接\n\n" f"错误类型:{e.__class__}" f"异常消息:{e}" ) return - elif len(text) == 3: + else: chat_id = text[1] offset_id = int(text[2]) try: @@ -185,9 +183,6 @@ async def handler(update): "chat输入错误,请输入频道或群组的链接\n\n" f"错误类型:{type(e).__class__}" f"异常消息:{e}" ) return - else: - await bot.send_message(admin_id, msg, parse_mode="HTML") - return if chat_title: print(f"{get_local_time()} - 开始下载:{chat_title}({entity.id}) - {offset_id}") last_msg_id = 0 @@ -266,10 +261,13 @@ async def all_chat_download(update): try: if type(message.media) == MessageMediaWebPage: return - if message.media.document.mime_type == "image/webp": - file_name = f"{message.media.document.id}.webp" - if message.media.document.mime_type == "application/x-tgsticker": + if ( + message.media.document.mime_type + == "application/x-tgsticker" + ): file_name = f"{message.media.document.id}.tgs" + elif message.media.document.mime_type == "image/webp": + file_name = f"{message.media.document.id}.webp" for i in message.document.attributes: try: file_name = i.file_name diff --git a/Scripts/py/win10-PyAuto-deploy.py b/Scripts/py/win10-PyAuto-deploy.py index 6c145b1..4471d60 100644 --- a/Scripts/py/win10-PyAuto-deploy.py +++ b/Scripts/py/win10-PyAuto-deploy.py @@ -123,7 +123,7 @@ def run_M_N(): os.chdir(node_p) # print(node_d) # print(os.path.dirname(node_p)) - command = "%s app.js -p %s -f %s" % (node_d, userport, ip) + command = f"{node_d} app.js -p {userport} -f {ip}" subprocess.Popen(command, shell=False, creationflags=CREATE_NO_WINDOW) print("成功启动 NODE 代理,命令行:", command) @@ -136,12 +136,7 @@ def procressexist(processname="cloudmusic.exe"): # 检测进程是否健在(默 time.sleep(sleep) pl = psutil.pids() try: - for pid in pl: - if psutil.Process(pid).name() == processname: - return True - break - else: - return False + return any(psutil.Process(pid).name() == processname for pid in pl) except: return False @@ -166,5 +161,4 @@ run_M_N() while procressexist() and procressexist("node.exe"): print("网易云音乐进程和 NODE 进程健在!") - pass killprocess()