mirror of
https://github.com/Oreomeow/VIP.git
synced 2026-02-03 10:33:15 +08:00
523 lines
17 KiB
Python
523 lines
17 KiB
Python
# -*- coding: utf8 -*-
|
||
|
||
# SPDX-FileCopyrightText: (c) 2020-2021 CokeMine & Its repository contributors
|
||
# SPDX-FileCopyrightText: (c) 2021 A beam of light
|
||
#
|
||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||
|
||
"""
|
||
euserv auto-renew script
|
||
v2021.09.30
|
||
* Captcha automatic recognition using TrueCaptcha API
|
||
* Email notification
|
||
* Add login failure retry mechanism
|
||
* reformat log info
|
||
|
||
Author: cokemine
|
||
Modifier: o0oo0ooo0 & Oreo
|
||
Date: Tue Oct 14 06:47:57 UTC 2021
|
||
cron: 0 10 */7 * *
|
||
new Env('EUserv 续期');
|
||
------------
|
||
环境变量说明
|
||
EUserv_ID: 账号 用户名,邮箱也可,多账号用空格分隔,ql 环境变量填写需用回车隔开
|
||
EUserv_PW: 密码 密码,同上且与账号一一对应
|
||
其他变量看下方注释
|
||
------------
|
||
依赖模块说明
|
||
pip install requests beautifulsoup4 / pip3 install requests beautifulsoup4
|
||
"""
|
||
|
||
import base64
|
||
import json
|
||
import os
|
||
import re
|
||
import time
|
||
from email.mime.application import MIMEApplication
|
||
from email.mime.multipart import MIMEMultipart
|
||
from email.mime.text import MIMEText
|
||
from smtplib import SMTP_SSL, SMTPDataError
|
||
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
|
||
# 强烈建议部署在非大陆区域,例如HK、SG等
|
||
# 常量命名使用全部大写的方式,可以使用下划线。
|
||
EUserv_ID = os.environ.get("EUserv_ID") # 用户名,邮箱也可
|
||
EUserv_PW = os.environ.get("EUserv_PW") # 密码
|
||
|
||
# options: True or False
|
||
CHECK_CAPTCHA_SOLVER_USAGE = os.environ.get("CHECK_CAPTCHA_SOLVER_USAGE", True)
|
||
|
||
# default value is TrueCaptcha demo credential,
|
||
# you can use your own credential via set environment variables:
|
||
# USERID and APIKEY
|
||
# demo: https://apitruecaptcha.org/demo
|
||
# demo2: https://apitruecaptcha.org/demo2
|
||
# demo apikey also has a limit of 100 times per day
|
||
# {
|
||
# 'error': '101.0 above free usage limit 100 per day and no balance',
|
||
# 'requestId': '7690c065-70e0-4757-839b-5fd8381e65c7'
|
||
# }
|
||
USERID = os.environ.get("USERID", "arun56")
|
||
APIKEY = os.environ.get("APIKEY", "wMjXmBIcHcdYqO2RrsVN")
|
||
|
||
# Maximum number of login retry
|
||
LOGIN_MAX_RETRY_COUNT = os.environ.get("LOGIN_MAX_RETRY_COUNT", 5)
|
||
|
||
# 酷推 https://cp.xuthus.cc
|
||
COOL_PUSH_SKEY = os.environ.get("COOL_PUSH_SKEY")
|
||
# 通知类型 CoolPush_MODE的可选项有(默认send):send[QQ私聊]、group[QQ群聊]、wx[个微]、ww[企微]
|
||
COOL_PUSH_MODE = os.environ.get("COOL_PUSH_MODE")
|
||
|
||
# PushPlus https://pushplus.hxtrip.com/message
|
||
PUSH_PLUS_TOKEN = os.environ.get("PUSH_PLUS_TOKEN")
|
||
|
||
# Server酱 http://sc.ftqq.com/?c=code
|
||
SCKEY = os.environ.get("PUSH_KEY") # Server酱的key,无需推送可不填 示例: SCU646xxxxxxxxdacd6a5dc3f6
|
||
|
||
# Telegram Bot Push https://core.telegram.org/bots/api#authorizing-your-bot
|
||
TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN") # 通过 @BotFather 申请获得,示例:1077xxx4424:AAFjv0FcqxxxxxxgEMGfi22B4yh15R5uw
|
||
TG_USER_ID = os.environ.get("TG_USER_ID") # 用户、群组或频道 ID,示例:129xxx206
|
||
TG_API_HOST = os.environ.get("TG_API_HOST", "api.telegram.org") # 自建 API 反代地址,供网络环境无法访问时使用,网络正常则保持默认
|
||
|
||
# wecomchan https://github.com/easychen/wecomchan
|
||
WECOMCHAN_DOMAIN = os.environ.get("WECOMCHAN_DOMAIN") # http(s)://example.com/
|
||
WECOMCHAN_SEND_KEY = os.environ.get("WECOMCHAN_SEND_KEY")
|
||
WECOMCHAN_TO_USER = os.environ.get("WECOMCHAN_TO_USER", "@all") # 默认全部推送, 对个别人推送可用 User1|User2
|
||
# 变量命名使用全部小写的方式,可以使用下划线。
|
||
desp = "" # 不用动
|
||
|
||
# Email notification
|
||
RECEIVER_EMAIL = os.environ.get("RECEIVER_EMAIL", "")
|
||
YD_EMAIL = os.environ.get("YD_EMAIL", "")
|
||
YD_APP_PWD = os.environ.get("YD_APP_PWD", "") # yandex mail 使用第三方 APP 时的授权码
|
||
|
||
|
||
# Magic internet access
|
||
PROXIES = {"http": "http://127.0.0.1:10808", "https": "http://127.0.0.1:10808"}
|
||
user_agent = (
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
|
||
"Chrome/94.0.4606.61 Safari/537.36 "
|
||
)
|
||
desp = "" # 空值
|
||
|
||
|
||
def log(info: str):
|
||
print(info)
|
||
global desp
|
||
desp = desp + info + "\n\n"
|
||
|
||
|
||
def login_retry(*args, **kwargs):
|
||
def wrapper(func):
|
||
def inner(username, password):
|
||
ret, ret_session = func(username, password)
|
||
max_retry = kwargs.get("max_retry")
|
||
# default retry 3 times
|
||
if not max_retry:
|
||
max_retry = 3
|
||
number = 0
|
||
if ret == "-1":
|
||
while number < max_retry:
|
||
number += 1
|
||
if number > 1:
|
||
log("[EUserv] Login tried the {}th time".format(number))
|
||
sess_id, session = func(username, password)
|
||
if sess_id != "-1":
|
||
return sess_id, session
|
||
if number == max_retry:
|
||
return sess_id, session
|
||
else:
|
||
return ret, ret_session
|
||
|
||
return inner
|
||
|
||
return wrapper
|
||
|
||
|
||
def captcha_solver(captcha_image_url: str, session: requests.session) -> dict:
|
||
"""
|
||
TrueCaptcha API doc: https://apitruecaptcha.org/api
|
||
Free to use 100 requests per day.
|
||
"""
|
||
response = session.get(captcha_image_url)
|
||
encoded_string = base64.b64encode(response.content)
|
||
url = "https://api.apitruecaptcha.org/one/gettext"
|
||
|
||
data = {
|
||
"userid": USERID,
|
||
"apikey": APIKEY,
|
||
"case": "mixed",
|
||
"mode": "human",
|
||
"data": str(encoded_string)[2:-1],
|
||
}
|
||
r = requests.post(url=url, json=data)
|
||
return json.loads(r.text)
|
||
|
||
|
||
def handle_captcha_solved_result(solved: dict) -> str:
|
||
"""Since CAPTCHA sometimes appears as a very simple binary arithmetic expression.
|
||
But since recognition sometimes doesn't show the result of the calculation directly,
|
||
that's what this function is for.
|
||
"""
|
||
if "result" in solved:
|
||
solved_text = solved["result"]
|
||
if "RESULT IS" in solved_text:
|
||
log("[Captcha Solver] You are using the demo apikey.")
|
||
print("There is no guarantee that demo apikey will work in the future!")
|
||
# because using demo apikey
|
||
text = re.findall(r"RESULT IS . (.*) .", solved_text)[0]
|
||
else:
|
||
# using your own apikey
|
||
log("[Captcha Solver] You are using your own apikey.")
|
||
text = solved_text
|
||
operators = ["X", "x", "+", "-"]
|
||
if all(x not in text for x in operators):
|
||
return text
|
||
for operator in operators:
|
||
operator_pos = text.find(operator)
|
||
if operator in ["x", "X"]:
|
||
operator = "*"
|
||
if operator_pos != -1:
|
||
left_part = text[:operator_pos]
|
||
right_part = text[operator_pos + 1:]
|
||
return (
|
||
eval(
|
||
"{left} {operator} {right}".format(
|
||
left=left_part, operator=operator, right=right_part
|
||
)
|
||
)
|
||
if left_part.isdigit() and right_part.isdigit()
|
||
else text
|
||
)
|
||
|
||
else:
|
||
print(solved)
|
||
raise KeyError("Failed to find parsed results.")
|
||
|
||
|
||
def get_captcha_solver_usage() -> dict:
|
||
url = "https://api.apitruecaptcha.org/one/getusage"
|
||
|
||
params = {
|
||
"username": USERID,
|
||
"apikey": APIKEY,
|
||
}
|
||
r = requests.get(url=url, params=params)
|
||
return json.loads(r.text)
|
||
|
||
|
||
@login_retry(max_retry=LOGIN_MAX_RETRY_COUNT)
|
||
def login(username: str, password: str) -> (str, requests.session):
|
||
headers = {"user-agent": user_agent, "origin": "https://www.euserv.com"}
|
||
url = "https://support.euserv.com/index.iphp"
|
||
session = requests.Session()
|
||
|
||
sess = session.get(url, headers=headers)
|
||
sess_id = re.findall("PHPSESSID=(\\w{10,100});", str(sess.headers))[0]
|
||
# visit png
|
||
logo_png_url = "https://support.euserv.com/pic/logo_small.png"
|
||
session.get(logo_png_url, headers=headers)
|
||
|
||
login_data = {
|
||
"email": username,
|
||
"password": password,
|
||
"form_selected_language": "en",
|
||
"Submit": "Login",
|
||
"subaction": "login",
|
||
"sess_id": sess_id,
|
||
}
|
||
f = session.post(url, headers=headers, data=login_data)
|
||
f.raise_for_status()
|
||
|
||
if (
|
||
f.text.find("Hello") != -1
|
||
or f.text.find("Confirm or change your customer data here") != -1
|
||
):
|
||
return sess_id, session
|
||
if (
|
||
f.text.find(
|
||
"To finish the login process please solve the following captcha."
|
||
)
|
||
== -1
|
||
):
|
||
return "-1", session
|
||
log("[Captcha Solver] 进行验证码识别...")
|
||
captcha_image_url = "https://support.euserv.com/securimage_show.php"
|
||
solved_result = captcha_solver(captcha_image_url, session)
|
||
captcha_code = handle_captcha_solved_result(solved_result)
|
||
log(f"[Captcha Solver] 识别的验证码是: {captcha_code}")
|
||
|
||
if CHECK_CAPTCHA_SOLVER_USAGE:
|
||
usage = get_captcha_solver_usage()
|
||
log(
|
||
"[Captcha Solver] current date {0} api usage count: {1}".format(
|
||
usage[0]["date"], usage[0]["count"]
|
||
)
|
||
)
|
||
|
||
f2 = session.post(
|
||
url,
|
||
headers=headers,
|
||
data={
|
||
"subaction": "login",
|
||
"sess_id": sess_id,
|
||
"captcha_code": captcha_code,
|
||
},
|
||
)
|
||
if (
|
||
f2.text.find(
|
||
"To finish the login process please solve the following captcha."
|
||
)
|
||
== -1
|
||
):
|
||
log("[Captcha Solver] 验证通过")
|
||
return sess_id, session
|
||
else:
|
||
log("[Captcha Solver] 验证失败")
|
||
return "-1", session
|
||
|
||
|
||
def get_servers(sess_id: str, session: requests.session) -> {}:
|
||
d = {}
|
||
url = f"https://support.euserv.com/index.iphp?sess_id={sess_id}"
|
||
headers = {"user-agent": user_agent, "origin": "https://www.euserv.com"}
|
||
f = session.get(url=url, headers=headers)
|
||
f.raise_for_status()
|
||
soup = BeautifulSoup(f.text, "html.parser")
|
||
for tr in soup.select(
|
||
"#kc2_order_customer_orders_tab_content_1 .kc2_order_table.kc2_content_table tr"
|
||
):
|
||
server_id = tr.select(".td-z1-sp1-kc")
|
||
if len(server_id) != 1:
|
||
continue
|
||
flag = (
|
||
tr.select(".td-z1-sp2-kc .kc2_order_action_container")[0]
|
||
.get_text()
|
||
.find("Contract extension possible from")
|
||
== -1
|
||
)
|
||
|
||
d[server_id[0].get_text()] = flag
|
||
return d
|
||
|
||
|
||
def renew(
|
||
sess_id: str, session: requests.session, password: str, order_id: str
|
||
) -> bool:
|
||
url = "https://support.euserv.com/index.iphp"
|
||
headers = {
|
||
"user-agent": user_agent,
|
||
"Host": "support.euserv.com",
|
||
"origin": "https://support.euserv.com",
|
||
"Referer": "https://support.euserv.com/index.iphp",
|
||
}
|
||
data = {
|
||
"Submit": "Extend contract",
|
||
"sess_id": sess_id,
|
||
"ord_no": order_id,
|
||
"subaction": "choose_order",
|
||
"choose_order_subaction": "show_contract_details",
|
||
}
|
||
session.post(url, headers=headers, data=data)
|
||
data = {
|
||
"sess_id": sess_id,
|
||
"subaction": "kc2_security_password_get_token",
|
||
"prefix": "kc2_customer_contract_details_extend_contract_",
|
||
"password": password,
|
||
}
|
||
f = session.post(url, headers=headers, data=data)
|
||
f.raise_for_status()
|
||
if json.loads(f.text)["rs"] != "success":
|
||
return False
|
||
token = json.loads(f.text)["token"]["value"]
|
||
data = {
|
||
"sess_id": sess_id,
|
||
"ord_id": order_id,
|
||
"subaction": "kc2_customer_contract_details_extend_contract_term",
|
||
"token": token,
|
||
}
|
||
session.post(url, headers=headers, data=data)
|
||
time.sleep(5)
|
||
return True
|
||
|
||
|
||
def check(sess_id: str, session: requests.session):
|
||
print("Checking.......")
|
||
d = get_servers(sess_id, session)
|
||
flag = True
|
||
for key, val in d.items():
|
||
if val:
|
||
flag = False
|
||
log(f"[EUserv] ServerID: {key} Renew Failed!")
|
||
|
||
if flag:
|
||
log("[EUserv] ALL Work Done! Enjoy~")
|
||
|
||
|
||
# 酷推 https://cp.xuthus.cc/
|
||
def coolpush():
|
||
c = "EUserv续费日志\n\n" + desp
|
||
data = json.dumps({"c": c})
|
||
url = f"https://push.xuthus.cc/{COOL_PUSH_MODE}/{COOL_PUSH_SKEY}"
|
||
response = requests.post(url, data=data)
|
||
if response.status_code != 200:
|
||
print("酷推 推送失败")
|
||
else:
|
||
print("酷推 推送成功")
|
||
|
||
|
||
# PushPlus https://pushplus.hxtrip.com/message
|
||
def push_plus():
|
||
data = (
|
||
("token", PUSH_PLUS_TOKEN),
|
||
("title", "EUserv续费日志"),
|
||
("content", desp)
|
||
)
|
||
url = "https://pushplus.hxtrip.com/send"
|
||
response = requests.post(url, data=data)
|
||
if response.status_code != 200:
|
||
print("PushPlus 推送失败")
|
||
else:
|
||
print("PushPlus 推送成功")
|
||
|
||
|
||
# Server酱 http://sc.ftqq.com/?c=code
|
||
def server_chan():
|
||
data = (
|
||
("text", "EUserv续费日志"),
|
||
("desp", desp)
|
||
)
|
||
response = requests.post(f"https://sc.ftqq.com/{SCKEY}.send", data=data)
|
||
if response.status_code != 200:
|
||
print("Server酱 推送失败")
|
||
else:
|
||
print("Server酱 推送成功")
|
||
|
||
|
||
# Telegram Bot Push https://core.telegram.org/bots/api#authorizing-your-bot
|
||
def telegram():
|
||
data = (
|
||
("chat_id", TG_USER_ID),
|
||
("text", "EUserv续费日志\n\n" + desp)
|
||
)
|
||
response = requests.post(
|
||
f"https://{TG_API_HOST}/bot{TG_BOT_TOKEN}/sendMessage", data=data
|
||
)
|
||
|
||
if response.status_code != 200:
|
||
print("Telegram Bot 推送失败")
|
||
else:
|
||
print("Telegram Bot 推送成功")
|
||
|
||
|
||
# wecomchan https://github.com/easychen/wecomchan
|
||
def wecomchan():
|
||
response = requests.get(
|
||
(
|
||
(
|
||
f"{WECOMCHAN_DOMAIN}wecomchan?sendkey={WECOMCHAN_SEND_KEY}&msg_type=text&to_user={WECOMCHAN_TO_USER}&msg="
|
||
+ "EUserv续费日志\n\n"
|
||
)
|
||
+ desp
|
||
)
|
||
)
|
||
|
||
if response.status_code != 200:
|
||
print("wecomchan 推送失败")
|
||
else:
|
||
print("wecomchan 推送成功")
|
||
|
||
|
||
def send_mail_by_yandex(
|
||
to_email, from_email, subject, text, files, sender_email, sender_password
|
||
):
|
||
msg = MIMEMultipart()
|
||
msg["Subject"] = subject
|
||
msg["From"] = from_email
|
||
msg["To"] = to_email
|
||
msg.attach(MIMEText(text, _charset="utf-8"))
|
||
if files is not None:
|
||
for file in files:
|
||
file_name, file_content = file
|
||
# print(file_name)
|
||
part = MIMEApplication(file_content)
|
||
part.add_header(
|
||
"Content-Disposition", "attachment", filename=("gb18030", "", file_name)
|
||
)
|
||
msg.attach(part)
|
||
s = SMTP_SSL("smtp.yandex.ru", 465)
|
||
s.login(sender_email, sender_password)
|
||
try:
|
||
s.sendmail(msg["From"], msg["To"], msg.as_string())
|
||
except SMTPDataError as e:
|
||
raise e
|
||
finally:
|
||
s.close()
|
||
|
||
|
||
def email():
|
||
msg = "EUserv 续费日志\n\n" + desp
|
||
try:
|
||
send_mail_by_yandex(
|
||
RECEIVER_EMAIL, YD_EMAIL, "EUserv 续费日志", msg, None, YD_EMAIL, YD_APP_PWD
|
||
)
|
||
print("eMail 推送成功")
|
||
except requests.exceptions.RequestException as e:
|
||
print(e)
|
||
print("eMail 推送失败")
|
||
except SMTPDataError as e1:
|
||
print(e1)
|
||
print("eMail 推送失败")
|
||
|
||
|
||
def main_handler(event, context):
|
||
if not EUserv_ID or not EUserv_PW:
|
||
log("[EUserv] 你没有添加任何账户")
|
||
exit(1)
|
||
user_list = EUserv_ID.strip().split()
|
||
passwd_list = EUserv_PW.strip().split()
|
||
if len(user_list) != len(passwd_list):
|
||
log("[EUserv] The number of usernames and passwords do not match!")
|
||
exit(1)
|
||
for i in range(len(user_list)):
|
||
print("*" * 30)
|
||
log("[EUserv] 正在续费第 %d 个账号" % (i + 1))
|
||
sessid, s = login(user_list[i], passwd_list[i])
|
||
if sessid == "-1":
|
||
log("[EUserv] 第 %d 个账号登陆失败,请检查登录信息" % (i + 1))
|
||
continue
|
||
SERVERS = get_servers(sessid, s)
|
||
log(f"[EUserv] 检测到第 {i + 1} 个账号有 {len(SERVERS)} 台 VPS,正在尝试续期")
|
||
for k, v in SERVERS.items():
|
||
if v:
|
||
if not renew(sessid, s, passwd_list[i], k):
|
||
log(f"[EUserv] ServerID: {k} Renew Error!")
|
||
else:
|
||
log(f"[EUserv] ServerID: {k} has been successfully renewed!")
|
||
else:
|
||
log(f"[EUserv] ServerID: {k} does not need to be renewed")
|
||
time.sleep(15)
|
||
check(sessid, s)
|
||
time.sleep(5)
|
||
|
||
# 防止 config.sh export TG_API_HOST="" 的情况
|
||
global TG_API_HOST
|
||
if TG_API_HOST == "":
|
||
TG_API_HOST = "api.telegram.org"
|
||
|
||
# 六个通知渠道至少选取一个
|
||
COOL_PUSH_MODE and COOL_PUSH_SKEY and coolpush()
|
||
PUSH_PLUS_TOKEN and push_plus()
|
||
SCKEY and server_chan()
|
||
TG_BOT_TOKEN and TG_USER_ID and TG_API_HOST and telegram()
|
||
WECOMCHAN_DOMAIN and WECOMCHAN_SEND_KEY and WECOMCHAN_TO_USER and wecomchan()
|
||
RECEIVER_EMAIL and YD_EMAIL and YD_APP_PWD and email()
|
||
|
||
print("*" * 30)
|
||
|
||
|
||
if __name__ == "__main__": # 方便我本地调试
|
||
main_handler(None, None)
|