Files
smzdm_script/notify/notify.py
2022-12-18 13:32:18 +08:00

88 lines
3.0 KiB
Python

import json
from typing import Dict
from urllib.parse import urljoin
import requests
from loguru import logger
class NotifyBot(object):
def __init__(self, content, title="什么值得买签到", **kwargs: Dict) -> None:
self.content = content
self.title = title
self.kwargs = kwargs
self.push_plus()
self.server_chain()
self.tg_bot()
def push_plus(self, template="html"):
try:
if self.kwargs.get("PUSH_PLUS_TOKEN", None):
PUSH_PLUS_TOKEN = self.kwargs.get("PUSH_PLUS_TOKEN")
else:
logger.info("⚠️ PUSH_PLUS_TOKEN not set, skip PushPlus nofitication")
return
url = "https://www.pushplus.plus/send"
body = {
"token": PUSH_PLUS_TOKEN,
"title": self.title,
"content": self.content,
"template": template,
}
data = json.dumps(body).encode(encoding="utf-8")
headers = {"Content-Type": "application/json"}
resp = requests.post(url, data=data, headers=headers)
if resp.status_code == 200:
logger.info("✅ Push Plus notified")
return resp.json()
except Exception as e:
logger.error(e)
def server_chain(self):
try:
if self.kwargs.get("SC_KEY", None):
SC_KEY = self.kwargs.get("SC_KEY")
else:
logger.info("⚠️ SC_KEY not set, skip ServerChain notification")
return
url = f"http://sc.ftqq.com/{SC_KEY}.send"
data = {"text": self.title, "desp": self.content}
resp = requests.post(url, data=data)
if resp.status_code == 200:
logger.info("✅ Server Chain notified")
return resp.json()
except Exception as e:
logger.error(e)
def tg_bot(self):
try:
if self.kwargs.get("TG_BOT_TOKEN", None) and self.kwargs.get(
"TG_USER_ID", None
):
TG_BOT_TOKEN = self.kwargs.get("TG_BOT_TOKEN")
TG_USER_ID = self.kwargs.get("TG_USER_ID")
else:
logger.info(
"⚠️ TG_BOT_TOKEN & TG_USER_ID not set, skip TelegramBot notification"
)
return
TG_BOT_API = self.kwargs.get("TG_BOT_API", "https://api.telegram.org")
url = urljoin(TG_BOT_API, f"/bot{TG_BOT_TOKEN}/sendMessage")
headers = {"Content-Type": "application/x-www-form-urlencoded"}
payload = {
"chat_id": str(TG_USER_ID),
"text": f"{self.title}\n\n{self.content}",
"disable_web_page_preview": "true",
}
resp = requests.post(url=url, headers=headers, params=payload)
if resp.status_code == 200:
logger.info("✅ Telegram Bot notified")
return resp.json()
except Exception as e:
logger.error(e)