1
0
mirror of https://github.com/Oreomeow/VIP.git synced 2026-02-03 10:33:15 +08:00
Files
vip/Scripts/py/EUserv_extend.py
sourcery-ai[bot] 6f437ddef5 ♻️ 'Refactored by Sourcery' (#14)
Co-authored-by: Sourcery AI <>
2022-10-08 14:43:43 +08:00

523 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf8 -*-
# SPDX-FileCopyrightText: (c) 2020-2021 CokeMine & Its repository contributors
# SPDX-FileCopyrightText: (c) 2021 A beam of light
#
# SPDX-License-Identifier: GPL-3.0-or-later
"""
euserv auto-renew script
v2021.09.30
* Captcha automatic recognition using TrueCaptcha API
* Email notification
* Add login failure retry mechanism
* reformat log info
Author: cokemine
Modifier: o0oo0ooo0 & Oreo
Date: Tue Oct 14 06:47:57 UTC 2021
cron: 0 10 */7 * *
new Env('EUserv 续期');
------------
环境变量说明
EUserv_ID: 账号 用户名邮箱也可多账号用空格分隔ql 环境变量填写需用回车隔开
EUserv_PW: 密码 密码,同上且与账号一一对应
其他变量看下方注释
------------
依赖模块说明
pip install requests beautifulsoup4 / pip3 install requests beautifulsoup4
"""
import base64
import json
import os
import re
import time
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from smtplib import SMTP_SSL, SMTPDataError
import requests
from bs4 import BeautifulSoup
# 强烈建议部署在非大陆区域例如HK、SG等
# 常量命名使用全部大写的方式,可以使用下划线。
EUserv_ID = os.environ.get("EUserv_ID") # 用户名,邮箱也可
EUserv_PW = os.environ.get("EUserv_PW") # 密码
# options: True or False
CHECK_CAPTCHA_SOLVER_USAGE = os.environ.get("CHECK_CAPTCHA_SOLVER_USAGE", True)
# default value is TrueCaptcha demo credential,
# you can use your own credential via set environment variables:
# USERID and APIKEY
# demo: https://apitruecaptcha.org/demo
# demo2: https://apitruecaptcha.org/demo2
# demo apikey also has a limit of 100 times per day
# {
# 'error': '101.0 above free usage limit 100 per day and no balance',
# 'requestId': '7690c065-70e0-4757-839b-5fd8381e65c7'
# }
USERID = os.environ.get("USERID", "arun56")
APIKEY = os.environ.get("APIKEY", "wMjXmBIcHcdYqO2RrsVN")
# Maximum number of login retry
LOGIN_MAX_RETRY_COUNT = os.environ.get("LOGIN_MAX_RETRY_COUNT", 5)
# 酷推 https://cp.xuthus.cc
COOL_PUSH_SKEY = os.environ.get("COOL_PUSH_SKEY")
# 通知类型 CoolPush_MODE的可选项有默认sendsend[QQ私聊]、group[QQ群聊]、wx[个微]、ww[企微]
COOL_PUSH_MODE = os.environ.get("COOL_PUSH_MODE")
# PushPlus https://pushplus.hxtrip.com/message
PUSH_PLUS_TOKEN = os.environ.get("PUSH_PLUS_TOKEN")
# Server酱 http://sc.ftqq.com/?c=code
SCKEY = os.environ.get("PUSH_KEY") # Server酱的key无需推送可不填 示例: SCU646xxxxxxxxdacd6a5dc3f6
# Telegram Bot Push https://core.telegram.org/bots/api#authorizing-your-bot
TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN") # 通过 @BotFather 申请获得示例1077xxx4424:AAFjv0FcqxxxxxxgEMGfi22B4yh15R5uw
TG_USER_ID = os.environ.get("TG_USER_ID") # 用户、群组或频道 ID示例129xxx206
TG_API_HOST = os.environ.get("TG_API_HOST", "api.telegram.org") # 自建 API 反代地址,供网络环境无法访问时使用,网络正常则保持默认
# wecomchan https://github.com/easychen/wecomchan
WECOMCHAN_DOMAIN = os.environ.get("WECOMCHAN_DOMAIN") # http(s)://example.com/
WECOMCHAN_SEND_KEY = os.environ.get("WECOMCHAN_SEND_KEY")
WECOMCHAN_TO_USER = os.environ.get("WECOMCHAN_TO_USER", "@all") # 默认全部推送, 对个别人推送可用 User1|User2
# 变量命名使用全部小写的方式,可以使用下划线。
desp = "" # 不用动
# Email notification
RECEIVER_EMAIL = os.environ.get("RECEIVER_EMAIL", "")
YD_EMAIL = os.environ.get("YD_EMAIL", "")
YD_APP_PWD = os.environ.get("YD_APP_PWD", "") # yandex mail 使用第三方 APP 时的授权码
# Magic internet access
PROXIES = {"http": "http://127.0.0.1:10808", "https": "http://127.0.0.1:10808"}
user_agent = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/94.0.4606.61 Safari/537.36 "
)
desp = "" # 空值
def log(info: str):
print(info)
global desp
desp = desp + info + "\n\n"
def login_retry(*args, **kwargs):
def wrapper(func):
def inner(username, password):
ret, ret_session = func(username, password)
max_retry = kwargs.get("max_retry")
# default retry 3 times
if not max_retry:
max_retry = 3
number = 0
if ret == "-1":
while number < max_retry:
number += 1
if number > 1:
log("[EUserv] Login tried the {}th time".format(number))
sess_id, session = func(username, password)
if sess_id != "-1":
return sess_id, session
if number == max_retry:
return sess_id, session
else:
return ret, ret_session
return inner
return wrapper
def captcha_solver(captcha_image_url: str, session: requests.session) -> dict:
"""
TrueCaptcha API doc: https://apitruecaptcha.org/api
Free to use 100 requests per day.
"""
response = session.get(captcha_image_url)
encoded_string = base64.b64encode(response.content)
url = "https://api.apitruecaptcha.org/one/gettext"
data = {
"userid": USERID,
"apikey": APIKEY,
"case": "mixed",
"mode": "human",
"data": str(encoded_string)[2:-1],
}
r = requests.post(url=url, json=data)
return json.loads(r.text)
def handle_captcha_solved_result(solved: dict) -> str:
"""Since CAPTCHA sometimes appears as a very simple binary arithmetic expression.
But since recognition sometimes doesn't show the result of the calculation directly,
that's what this function is for.
"""
if "result" in solved:
solved_text = solved["result"]
if "RESULT IS" in solved_text:
log("[Captcha Solver] You are using the demo apikey.")
print("There is no guarantee that demo apikey will work in the future!")
# because using demo apikey
text = re.findall(r"RESULT IS . (.*) .", solved_text)[0]
else:
# using your own apikey
log("[Captcha Solver] You are using your own apikey.")
text = solved_text
operators = ["X", "x", "+", "-"]
if all(x not in text for x in operators):
return text
for operator in operators:
operator_pos = text.find(operator)
if operator in ["x", "X"]:
operator = "*"
if operator_pos != -1:
left_part = text[:operator_pos]
right_part = text[operator_pos + 1:]
return (
eval(
"{left} {operator} {right}".format(
left=left_part, operator=operator, right=right_part
)
)
if left_part.isdigit() and right_part.isdigit()
else text
)
else:
print(solved)
raise KeyError("Failed to find parsed results.")
def get_captcha_solver_usage() -> dict:
url = "https://api.apitruecaptcha.org/one/getusage"
params = {
"username": USERID,
"apikey": APIKEY,
}
r = requests.get(url=url, params=params)
return json.loads(r.text)
@login_retry(max_retry=LOGIN_MAX_RETRY_COUNT)
def login(username: str, password: str) -> (str, requests.session):
headers = {"user-agent": user_agent, "origin": "https://www.euserv.com"}
url = "https://support.euserv.com/index.iphp"
session = requests.Session()
sess = session.get(url, headers=headers)
sess_id = re.findall("PHPSESSID=(\\w{10,100});", str(sess.headers))[0]
# visit png
logo_png_url = "https://support.euserv.com/pic/logo_small.png"
session.get(logo_png_url, headers=headers)
login_data = {
"email": username,
"password": password,
"form_selected_language": "en",
"Submit": "Login",
"subaction": "login",
"sess_id": sess_id,
}
f = session.post(url, headers=headers, data=login_data)
f.raise_for_status()
if (
f.text.find("Hello") != -1
or f.text.find("Confirm or change your customer data here") != -1
):
return sess_id, session
if (
f.text.find(
"To finish the login process please solve the following captcha."
)
== -1
):
return "-1", session
log("[Captcha Solver] 进行验证码识别...")
captcha_image_url = "https://support.euserv.com/securimage_show.php"
solved_result = captcha_solver(captcha_image_url, session)
captcha_code = handle_captcha_solved_result(solved_result)
log(f"[Captcha Solver] 识别的验证码是: {captcha_code}")
if CHECK_CAPTCHA_SOLVER_USAGE:
usage = get_captcha_solver_usage()
log(
"[Captcha Solver] current date {0} api usage count: {1}".format(
usage[0]["date"], usage[0]["count"]
)
)
f2 = session.post(
url,
headers=headers,
data={
"subaction": "login",
"sess_id": sess_id,
"captcha_code": captcha_code,
},
)
if (
f2.text.find(
"To finish the login process please solve the following captcha."
)
== -1
):
log("[Captcha Solver] 验证通过")
return sess_id, session
else:
log("[Captcha Solver] 验证失败")
return "-1", session
def get_servers(sess_id: str, session: requests.session) -> {}:
d = {}
url = f"https://support.euserv.com/index.iphp?sess_id={sess_id}"
headers = {"user-agent": user_agent, "origin": "https://www.euserv.com"}
f = session.get(url=url, headers=headers)
f.raise_for_status()
soup = BeautifulSoup(f.text, "html.parser")
for tr in soup.select(
"#kc2_order_customer_orders_tab_content_1 .kc2_order_table.kc2_content_table tr"
):
server_id = tr.select(".td-z1-sp1-kc")
if len(server_id) != 1:
continue
flag = (
tr.select(".td-z1-sp2-kc .kc2_order_action_container")[0]
.get_text()
.find("Contract extension possible from")
== -1
)
d[server_id[0].get_text()] = flag
return d
def renew(
sess_id: str, session: requests.session, password: str, order_id: str
) -> bool:
url = "https://support.euserv.com/index.iphp"
headers = {
"user-agent": user_agent,
"Host": "support.euserv.com",
"origin": "https://support.euserv.com",
"Referer": "https://support.euserv.com/index.iphp",
}
data = {
"Submit": "Extend contract",
"sess_id": sess_id,
"ord_no": order_id,
"subaction": "choose_order",
"choose_order_subaction": "show_contract_details",
}
session.post(url, headers=headers, data=data)
data = {
"sess_id": sess_id,
"subaction": "kc2_security_password_get_token",
"prefix": "kc2_customer_contract_details_extend_contract_",
"password": password,
}
f = session.post(url, headers=headers, data=data)
f.raise_for_status()
if json.loads(f.text)["rs"] != "success":
return False
token = json.loads(f.text)["token"]["value"]
data = {
"sess_id": sess_id,
"ord_id": order_id,
"subaction": "kc2_customer_contract_details_extend_contract_term",
"token": token,
}
session.post(url, headers=headers, data=data)
time.sleep(5)
return True
def check(sess_id: str, session: requests.session):
print("Checking.......")
d = get_servers(sess_id, session)
flag = True
for key, val in d.items():
if val:
flag = False
log(f"[EUserv] ServerID: {key} Renew Failed!")
if flag:
log("[EUserv] ALL Work Done! Enjoy~")
# 酷推 https://cp.xuthus.cc/
def coolpush():
c = "EUserv续费日志\n\n" + desp
data = json.dumps({"c": c})
url = f"https://push.xuthus.cc/{COOL_PUSH_MODE}/{COOL_PUSH_SKEY}"
response = requests.post(url, data=data)
if response.status_code != 200:
print("酷推 推送失败")
else:
print("酷推 推送成功")
# PushPlus https://pushplus.hxtrip.com/message
def push_plus():
data = (
("token", PUSH_PLUS_TOKEN),
("title", "EUserv续费日志"),
("content", desp)
)
url = "https://pushplus.hxtrip.com/send"
response = requests.post(url, data=data)
if response.status_code != 200:
print("PushPlus 推送失败")
else:
print("PushPlus 推送成功")
# Server酱 http://sc.ftqq.com/?c=code
def server_chan():
data = (
("text", "EUserv续费日志"),
("desp", desp)
)
response = requests.post(f"https://sc.ftqq.com/{SCKEY}.send", data=data)
if response.status_code != 200:
print("Server酱 推送失败")
else:
print("Server酱 推送成功")
# Telegram Bot Push https://core.telegram.org/bots/api#authorizing-your-bot
def telegram():
data = (
("chat_id", TG_USER_ID),
("text", "EUserv续费日志\n\n" + desp)
)
response = requests.post(
f"https://{TG_API_HOST}/bot{TG_BOT_TOKEN}/sendMessage", data=data
)
if response.status_code != 200:
print("Telegram Bot 推送失败")
else:
print("Telegram Bot 推送成功")
# wecomchan https://github.com/easychen/wecomchan
def wecomchan():
response = requests.get(
(
(
f"{WECOMCHAN_DOMAIN}wecomchan?sendkey={WECOMCHAN_SEND_KEY}&msg_type=text&to_user={WECOMCHAN_TO_USER}&msg="
+ "EUserv续费日志\n\n"
)
+ desp
)
)
if response.status_code != 200:
print("wecomchan 推送失败")
else:
print("wecomchan 推送成功")
def send_mail_by_yandex(
to_email, from_email, subject, text, files, sender_email, sender_password
):
msg = MIMEMultipart()
msg["Subject"] = subject
msg["From"] = from_email
msg["To"] = to_email
msg.attach(MIMEText(text, _charset="utf-8"))
if files is not None:
for file in files:
file_name, file_content = file
# print(file_name)
part = MIMEApplication(file_content)
part.add_header(
"Content-Disposition", "attachment", filename=("gb18030", "", file_name)
)
msg.attach(part)
s = SMTP_SSL("smtp.yandex.ru", 465)
s.login(sender_email, sender_password)
try:
s.sendmail(msg["From"], msg["To"], msg.as_string())
except SMTPDataError as e:
raise e
finally:
s.close()
def email():
msg = "EUserv 续费日志\n\n" + desp
try:
send_mail_by_yandex(
RECEIVER_EMAIL, YD_EMAIL, "EUserv 续费日志", msg, None, YD_EMAIL, YD_APP_PWD
)
print("eMail 推送成功")
except requests.exceptions.RequestException as e:
print(e)
print("eMail 推送失败")
except SMTPDataError as e1:
print(e1)
print("eMail 推送失败")
def main_handler(event, context):
if not EUserv_ID or not EUserv_PW:
log("[EUserv] 你没有添加任何账户")
exit(1)
user_list = EUserv_ID.strip().split()
passwd_list = EUserv_PW.strip().split()
if len(user_list) != len(passwd_list):
log("[EUserv] The number of usernames and passwords do not match!")
exit(1)
for i in range(len(user_list)):
print("*" * 30)
log("[EUserv] 正在续费第 %d 个账号" % (i + 1))
sessid, s = login(user_list[i], passwd_list[i])
if sessid == "-1":
log("[EUserv] 第 %d 个账号登陆失败,请检查登录信息" % (i + 1))
continue
SERVERS = get_servers(sessid, s)
log(f"[EUserv] 检测到第 {i + 1} 个账号有 {len(SERVERS)} 台 VPS正在尝试续期")
for k, v in SERVERS.items():
if v:
if not renew(sessid, s, passwd_list[i], k):
log(f"[EUserv] ServerID: {k} Renew Error!")
else:
log(f"[EUserv] ServerID: {k} has been successfully renewed!")
else:
log(f"[EUserv] ServerID: {k} does not need to be renewed")
time.sleep(15)
check(sessid, s)
time.sleep(5)
# 防止 config.sh export TG_API_HOST="" 的情况
global TG_API_HOST
if TG_API_HOST == "":
TG_API_HOST = "api.telegram.org"
# 六个通知渠道至少选取一个
COOL_PUSH_MODE and COOL_PUSH_SKEY and coolpush()
PUSH_PLUS_TOKEN and push_plus()
SCKEY and server_chan()
TG_BOT_TOKEN and TG_USER_ID and TG_API_HOST and telegram()
WECOMCHAN_DOMAIN and WECOMCHAN_SEND_KEY and WECOMCHAN_TO_USER and wecomchan()
RECEIVER_EMAIL and YD_EMAIL and YD_APP_PWD and email()
print("*" * 30)
if __name__ == "__main__": # 方便我本地调试
main_handler(None, None)