#!/usr/bin/env python
# -*-coding:utf-8-*-
# by 'hollowman6' from Lanzhou University(兰州大学)
# modified by Oreo
import json
import os
import re
import urllib.parse
import requests
sckey = os.getenv("PUSH_KEY")
pptoken = os.getenv("PUSH_PLUS_TOKEN")
pptopic = os.getenv("PUSH_PLUS_TOPIC")
title = os.getenv("TITLE")
message = os.getenv("MSG")
content = os.getenv("CONTENT")
qywx_am = os.getenv("QYWX_AM")
image = os.getenv("IMAGE")
access_token = ""
errorNotify = ""
if not title:
raise Exception("未设置 `TITLE[name]` Actions Secret!")
def exwechat_get_access_token():
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
params = {"corpid": corpid, "corpsecret": corpsecret}
resp = requests.get(url, params=params)
resp.raise_for_status()
resp_json = resp.json()
if "access_token" in resp_json.keys():
return resp_json["access_token"]
else:
raise Exception("请检查CORPID和CORPSECRET是否正确!\n" + resp.text)
def exwechat_get_ShortTimeMedia(img_url):
if img_url:
media_url = f"https://qyapi.weixin.qq.com/cgi-bin/media/upload?access_token={access_token}&type=file"
f = requests.get(img_url).content
r = requests.post(media_url, files={"file": f}, json=True)
return r.json()["media_id"]
else:
return ""
def exwechat_send(title, digest, content):
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
data = {
"touser": touser,
"agentid": agentid,
"safe": 0,
"enable_id_trans": 0,
"enable_duplicate_check": 0,
"duplicate_check_interval": 1800,
}
if content:
data["msgtype"] = "mpnews"
data["mpnews"] = {
"articles": [
{
"title": title,
"thumb_media_id": exwechat_get_ShortTimeMedia(image),
"author": "Hollow Man",
"content_source_url": "https://github.com/HollowMan6/Wechat-Timed-Message/actions",
"content": content,
"digest": digest,
}
]
}
else:
data["msgtype"] = "textcard"
data["textcard"] = {
"title": title,
"description": digest,
"url": "https://github.com/HollowMan6/Wechat-Timed-Message/actions",
}
resp = requests.post(url, data=json.dumps(data))
resp.raise_for_status()
return resp
if sckey:
try:
host = "https://sctapi.ftqq.com/"
title = urllib.parse.quote_plus(title.replace("\n", "\n\n"))
message = urllib.parse.quote_plus(message.replace("\n", "\n\n"))
res = requests.get(host + sckey + ".send?title=" + title + "&desp=" + message)
result = json.loads(res.text)
if result["data"]["errno"] == 0:
print("Server酱推送成功!")
else:
errorNotify += f"Server酱推送错误: {result}" + "\n"
except Exception as e:
print(e)
errorNotify += "Server酱推送错误!\n"
else:
print("未设置SERVERCHANSCKEY,尝试使用PushPlus...")
title = os.getenv("TITLE")
message = os.getenv("MSG")
if pptoken:
try:
host = "http://www.pushplus.plus/"
user = ""
if not message:
message = title
title = ""
title = urllib.parse.quote_plus(title.replace("\n", "
"))
message = urllib.parse.quote_plus(message.replace("\n", "
"))
res = requests.get(
f"{host}send?token={pptoken}&title={title}&content={message}&template=html&topic={pptopic}"
)
result = res.json()
if result["code"] == 200:
print("成功通过PushPlus将结果通知给相关用户!")
else:
errorNotify += f"PushPlus推送错误: {result}" + "\n"
except Exception as e:
print(e)
errorNotify += "PushPlus推送错误!\n"
else:
print("未设置PPTOKEN!")
title = os.getenv("TITLE")
message = os.getenv("MSG")
qywx_am_ay = re.split(",", qywx_am)
corpsecret = qywx_am_ay[1]
touser = qywx_am_ay[2]
agentid = qywx_am_ay[3]
corpid = qywx_am_ay[0]
if corpid:
info = ""
if corpsecret:
if agentid:
try:
access_token = exwechat_get_access_token()
content = content.replace("\n", "
")
res = exwechat_send(title, message, content)
result = res.json()
if result["errcode"] == 0:
print("成功通过企业微信将结果通知给用户!")
else:
errorNotify += f"企业微信推送错误: {res.text}" + "\n"
except Exception as e:
print(e)
errorNotify += "企业微信推送错误!\n"
else:
print("未设置AGENTID,无法推送到企业微信!")
else:
print("未设置CORPSECRET,无法推送到企业微信!")
else:
print("未设置CORPID!")
if errorNotify:
raise Exception(errorNotify)