Change notification module.

Fix renamer.py
This commit is contained in:
EstrellaXD
2023-05-08 20:50:49 +08:00
parent c26b54281e
commit afd8e343ba
11 changed files with 99 additions and 261 deletions

View File

@@ -96,11 +96,11 @@ class DownloadClient:
def get_torrent_info(self, category="Bangumi"):
return self.client.torrents_info(status_filter="completed", category=category)
def rename_torrent_file(self, _hash, old_path, new_path):
self.client.torrents_rename_file(
def rename_torrent_file(self, _hash, old_path, new_path) -> bool:
logger.debug(f"{old_path} >> {new_path}")
return self.client.torrents_rename_file(
torrent_hash=_hash, old_path=old_path, new_path=new_path
)
logger.info(f"{old_path} >> {new_path}")
def delete_torrent(self, hashes):
self.client.torrents_delete(hashes)

View File

@@ -62,10 +62,15 @@ class QbDownloader:
def torrents_delete(self, hash):
return self._client.torrents_delete(delete_files=True, torrent_hashes=hash)
def torrents_rename_file(self, torrent_hash, old_path, new_path):
self._client.torrents_rename_file(
torrent_hash=torrent_hash, old_path=old_path, new_path=new_path
)
def torrents_rename_file(self, torrent_hash, old_path, new_path) -> bool:
try:
self._client.torrents_rename_file(
torrent_hash=torrent_hash, old_path=old_path, new_path=new_path
)
return True
except Conflict409Error:
logger.debug(f"Conflict409Error: {old_path} -> {new_path}")
return False
def check_rss(self, url, item_path) -> tuple[str | None, bool]:
items = self._client.rss_items()

View File

@@ -7,7 +7,7 @@ from module.core.download_client import DownloadClient
from module.parser import TitleParser
from module.network import PostNotification
from module.models import Config
from module.models import Config, SubtitleFile, EpisodeFile
logger = logging.getLogger(__name__)
@@ -54,30 +54,24 @@ class Renamer(DownloadClient):
season: int,
remove_bad_torrents: bool,
):
torrent_name = info.name
suffix = os.path.splitext(media_path)[-1]
compare_name = self.get_file_name(media_path)
new_path = self._renamer.torrent_parser(
torrent_name=torrent_name,
bangumi_name=bangumi_name,
ep = self._renamer.torrent_parser(
torrent_path=media_path,
season=season,
suffix=suffix,
method=method,
)
if compare_name != new_path:
try:
self.rename_torrent_file(
_hash=info.hash, old_path=media_path, new_path=new_path
)
new_path = self.gen_path(ep, method)
# TODO: rewrite rename file func
if media_path != new_path:
pass
renamed = self.rename_torrent_file(
_hash=info.hash, old_path=media_path, new_path=new_path
)
if renamed:
logger.info(f"{bangumi_name} Season {ep.season} Ep {ep.episode} renamed.")
self._notification.send_msg(bangumi_name, f"{new_path}已经更新,已自动重命名。")
except Exception as e:
logger.warning(f"{torrent_name} rename failed")
logger.warning(
f"Season name: {bangumi_name}, Season: {season}, Suffix: {suffix}"
)
logger.debug(e)
# Delete bad torrent
self.delete_bad_torrent(info, remove_bad_torrents)
else:
logger.warning(f"{bangumi_name} Season {ep.season} Ep {ep.episode} rename failed.")
if remove_bad_torrents:
self.delete_torrent(info.hash)
def rename_collection(
self,
@@ -113,7 +107,7 @@ class Renamer(DownloadClient):
)
logger.debug(e)
# Delete bad torrent.
self.delete_bad_torrent(info, remove_bad_torrents)
self.delete_torrent(_hash, remove_bad_torrents)
self.set_category(category="BangumiCollection", hashes=_hash)
def rename_subtitles(
@@ -145,11 +139,6 @@ class Renamer(DownloadClient):
logger.warning(f"Suffix: {suffix}")
logger.debug(e)
def delete_bad_torrent(self, info, remove_bad_torrent: bool):
if remove_bad_torrent:
self.delete_torrent(info.hash)
logger.info(f"{info.name} have been deleted.")
@staticmethod
def get_season_info(save_path: str, download_path: str):
# Remove default save path

View File

@@ -1,2 +1,3 @@
from .bangumi import *
from .config import Config
from .torrent import EpisodeFile, SubtitleFile

View File

@@ -20,10 +20,13 @@ class BangumiData(BaseModel):
# poster_link: str | None = Field(None, alias="poster_link", title="番剧海报链接")
class ProgramData(BaseModel):
rss_link: str = Field(..., alias="rss_link", title="RSS链接")
data_version: float = Field(..., alias="data_version", title="数据版本")
bangumi_info: list[BangumiData] = Field([], alias="bangumi_info", title="番剧信息")
class Notification(BaseModel):
official_title: str = Field(..., alias="official_title", title="番剧名")
season: int = Field(..., alias="season", title="番剧季度")
episode: int = Field(..., alias="episode", title="番剧集数")
poster_link: str | None = Field(None, alias="poster_link", title="番剧海报链接")
@dataclass

View File

@@ -1,45 +1,51 @@
import logging
from .request_contents import RequestContent
from module.conf import settings
from module.network.request_contents import RequestContent
from module.models import Notification
logger = logging.getLogger(__name__)
class PostNotification:
def __init__(self):
def __init__(self, settings):
self.type: str = settings.notification.type
self.enable: bool = settings.notification.enable
self.token = settings.notification.token
self.chat_id = settings.notification.chat_id
self.client = self.getClient()
@staticmethod
def getClient():
if settings.notification.type.lower() == "telegram":
return TelegramNotification()
elif settings.notification.type.lower() == "server-chan":
return ServerChanNotification()
elif settings.notification.type.lower() == "bark":
return BarkNotification()
def getClient(self):
if self.type.lower() == "telegram":
return TelegramNotification(self.token, self.chat_id)
elif self.type.lower() == "server-chan":
return ServerChanNotification(self.token)
elif self.type.lower() == "bark":
return BarkNotification(self.token)
else:
return None
def send_msg(self, title: str, desp: str) -> bool:
if not settings.notification.enable:
def send_msg(self, info: Notification) -> bool:
text = f"番剧名称:{info.official_title}\n" \
f"季度: 第{info.season}\n" \
f"更新集数: 第{info.episode}\n" \
f"{info.poster_link}\n"
if not self.enable:
return False
if self.client is None:
return False
return self.client.send_msg(title, desp)
return self.client.send_msg(text)
class TelegramNotification:
def __init__(self):
self.token = settings.notification.token
self.chat_id = settings.notification.chat_id
self.notification_url = f"https://api.telegram.org/bot{self.token}/sendMessage"
def __init__(self, token, chat_id):
self.notification_url = f"https://api.telegram.org/bot{token}/sendMessage"
self.chat_id = chat_id
def send_msg(self, title: str, desp: str):
def send_msg(self, text: str) -> bool:
data = {
"chat_id": settings.notification.chat_id,
"text": f"{title}\n{desp}",
"chat_id": self.chat_id,
"text": text,
"disable_notification": True,
}
with RequestContent() as req:
@@ -51,14 +57,13 @@ class TelegramNotification:
class ServerChanNotification:
"""Server酱推送"""
def __init__(self):
self.token = settings.notification.token
self.notification_url = f"https://sctapi.ftqq.com/{self.token}.send"
def __init__(self, token):
self.notification_url = f"https://sctapi.ftqq.com/{token}.send"
def send_msg(self, title: str, desp: str) -> bool:
def send_msg(self, text: str) -> bool:
data = {
"title": title,
"desp": desp,
"title": "AutoBangumi 番剧更新",
"desp": text,
}
with RequestContent() as req:
resp = req.post_data(self.notification_url, data)
@@ -67,13 +72,26 @@ class ServerChanNotification:
class BarkNotification:
def __init__(self):
self.token = settings.notification.token
def __init__(self, token):
self.token = token
self.notification_url = "https://api.day.app/push"
def send_msg(self, title: str, desp: str):
data = {"title": title, "body": desp, "device_key": self.token}
def send_msg(self, text) -> bool:
data = {"title": "AutoBangumi 番剧更新", "body": text, "device_key": self.token}
with RequestContent() as req:
resp = req.post_data(self.notification_url, data)
logger.debug(f"Bark notification: {resp.status_code}")
return resp.status_code == 200
if __name__ == '__main__':
from module.conf import settings
print(settings.notification)
notification = PostNotification(settings=settings)
info = Notification(
official_title="魔法纪录 魔法少女小圆外传",
season=2,
episode=1,
poster_link="https://mikanani.me/images/Bangumi/202107/3788b33f.jpg",
)
notification.send_msg(info)

View File

@@ -1,24 +1,14 @@
import re
import logging
from dataclasses import dataclass
import os.path as unix_path
import ntpath as win_path
from module.models.torrent import EpisodeFile, SubtitleFile
from module.models import EpisodeFile, SubtitleFile
logger = logging.getLogger(__name__)
PLATFORM = "Unix"
@dataclass
class DownloadInfo:
name: str
season: int
suffix: str
file_name: str
folder_name: str
RULES = [
r"(.*) - (\d{1,4}|\d{1,4}\.\d{1,2})(?:v\d{1,2})?(?: )?(?:END)?(.*)",
r"(.*)[\[\ E](\d{1,4}|\d{1,4}\.\d{1,2})(?:v\d{1,2})?(?: )?(?:END)?[\]\ ](.*)",
@@ -33,18 +23,6 @@ SUBTITLE_LANG = {
}
def rename_init(name, folder_name, season, suffix) -> DownloadInfo:
n = re.split(r"[\[\]()【】()]", name)
suffix = suffix if suffix else n[-1]
if len(n) > 1:
file_name = name.replace(f"[{n[1]}]", "")
else:
file_name = name
if season < 10:
season = f"0{season}"
return DownloadInfo(name, season, suffix, file_name, folder_name)
def split_path(torrent_path: str) -> str:
if PLATFORM == "Windows":
return win_path.split(torrent_path)[-1]
@@ -78,7 +56,7 @@ def get_subtitle_lang(subtitle_name: str) -> str:
return key
def parse_torrent(torrent_path: str, season: int | None = None, file_type: str = "media") -> EpisodeFile | SubtitleFile:
def torrent_parser(torrent_path: str, season: int | None = None, file_type: str = "media") -> EpisodeFile | SubtitleFile:
media_path = split_path(torrent_path)
for rule in RULES:
match_obj = re.match(rule, media_path, re.I)
@@ -108,133 +86,3 @@ def parse_torrent(torrent_path: str, season: int | None = None, file_type: str =
episode=episode,
suffix=suffix
)
def rename_normal(info: DownloadInfo):
for rule in RULES:
match_obj = re.match(rule, info.name, re.I)
if match_obj is not None:
episode = match_obj.group(2)
title = re.sub(r"([Ss]|Season )\d{1,3}", "", match_obj.group(1)).strip()
new_name = f"{title} S{info.season}E{episode}{match_obj.group(3)}"
return new_name
def rename_pn(info: DownloadInfo):
for rule in RULES:
match_obj = re.match(rule, info.file_name, re.I)
if match_obj is not None:
title = re.sub(r"([Ss]|Season )\d{1,3}", "", match_obj.group(1)).strip()
title = title if title != "" else info.folder_name
episode = match_obj.group(2)
new_name = re.sub(
r"[\[\]]",
"",
f"{title} S{info.season}E{episode}{info.suffix}",
)
return new_name
def rename_advance(info: DownloadInfo):
for rule in RULES:
match_obj = re.match(rule, info.file_name, re.I)
if match_obj is not None:
episode = match_obj.group(2)
new_name = re.sub(
r"[\[\]]",
"",
f"{info.folder_name} S{info.season}E{episode}{info.suffix}",
)
return new_name
def rename_no_season_pn(info: DownloadInfo):
for rule in RULES:
match_obj = re.match(rule, info.file_name, re.I)
if match_obj is not None:
title = match_obj.group(1).strip()
episode = match_obj.group(2)
new_name = re.sub(
r"[\[\]]",
"",
f"{title} E{episode}{info.suffix}",
)
return new_name
def rename_none(info: DownloadInfo):
return info.name
def rename_subtitle(info: DownloadInfo):
subtitle_lang = "zh"
break_flag = False
for key, value in SUBTITLE_LANG.items():
for lang in value:
if lang in info.name:
subtitle_lang = key
break_flag = True
break
if break_flag:
break
for rule in RULES:
match_obj = re.match(rule, info.file_name, re.I)
if match_obj is not None:
title = re.sub(r"([Ss]|Season )\d{1,3}", "", match_obj.group(1)).strip()
title = title if title != "" else info.folder_name
new_name = re.sub(
r"[\[\]]",
"",
f"{title} S{info.season}E{match_obj.group(2)}.{subtitle_lang}{info.suffix}",
)
return new_name
def rename_subtitle_advance(info: DownloadInfo):
subtitle_lang = "zh"
break_flag = False
for key, value in SUBTITLE_LANG.items():
for lang in value:
if lang in info.name:
subtitle_lang = key
break_flag = True
break
if break_flag:
break
for rule in RULES:
match_obj = re.match(rule, info.file_name, re.I)
if match_obj is not None:
new_name = re.sub(
r"[\[\]]",
"",
f"{info.folder_name} S{info.season}E{match_obj.group(2)}.{subtitle_lang}{info.suffix}",
)
return new_name
METHODS = {
"normal": rename_normal,
"pn": rename_pn,
"advance": rename_advance,
"no_season_pn": rename_no_season_pn,
"none": rename_none,
"subtitle_pn": rename_subtitle,
"subtitle_advance": rename_subtitle_advance,
}
def torrent_parser(
file_name: str,
folder_name: str,
season: int,
suffix: str,
method: str = "pn",
):
info = rename_init(file_name, folder_name, season, suffix)
return METHODS[method.lower()](info)
if __name__ == '__main__':
title = "[Lilith-Raws] Boku no Kokoro no Yabai Yatsu - 01 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4].mp4"
sub = parse_torrent(title, season=1)
print(sub)

View File

@@ -13,13 +13,10 @@ class TitleParser:
@staticmethod
def torrent_parser(
method: str,
torrent_name: str,
bangumi_name: str | None = None,
torrent_path: str,
season: int | None = None,
suffix: str | None = None,
):
return torrent_parser(torrent_name, bangumi_name, season, suffix, method)
return torrent_parser(torrent_path, season)
def tmdb_parser(self, title: str, season: int, language: str):
official_title, tmdb_season = None, None

View File

@@ -1 +1 @@
from .bangumi_data import load_program_data, save_program_data

View File

@@ -1,29 +1,6 @@
import logging
from .json_config import save, load
from module.models import ProgramData
logger = logging.getLogger(__name__)
def load_program_data(path: str) -> ProgramData:
data = load(path)
try:
data = ProgramData(**data)
logger.info("Data file loaded")
except Exception as e:
logger.warning(
"Data file is not compatible with the current version, rebuilding..."
)
logger.debug(e)
data = ProgramData(
rss_link=data["rss_link"],
data_version=data["data_version"],
bangumi_info=[],
)
return data
def save_program_data(path: str, data: ProgramData):
save(path, data.dict())
logger.debug("Data file saved")

View File

@@ -1,50 +1,50 @@
from module.parser.analyser.torrent_parser import parse_torrent
from module.parser.analyser import torrent_parser
def test_torrent_parser():
file_name = "[Lilith-Raws] Boku no Kokoro no Yabai Yatsu - 01 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4].mp4"
bf = parse_torrent(file_name)
bf = torrent_parser(file_name)
assert bf.title == "Boku no Kokoro no Yabai Yatsu"
assert bf.group == "Lilith-Raws"
assert bf.episode == 1
assert bf.season == 1
file_name = "[Sakurato] Tonikaku Kawaii S2 [01][AVC-8bit 1080p AAC][CHS].mp4"
bf = parse_torrent(file_name)
bf = torrent_parser(file_name)
assert bf.title == "Tonikaku Kawaii"
assert bf.group == "Sakurato"
assert bf.episode == 1
assert bf.season == 2
file_name = "[SweetSub&LoliHouse] Heavenly Delusion - 01 [WebRip 1080p HEVC-10bit AAC ASSx2].mkv"
bf = parse_torrent(file_name)
bf = torrent_parser(file_name)
assert bf.title == "Heavenly Delusion"
assert bf.group == "SweetSub&LoliHouse"
assert bf.episode == 1
assert bf.season == 1
file_name = "[SBSUB][CONAN][1082][V2][1080P][AVC_AAC][CHS_JP](C1E4E331).mp4"
bf = parse_torrent(file_name)
bf = torrent_parser(file_name)
assert bf.title == "CONAN"
assert bf.group == "SBSUB"
assert bf.episode == 1082
assert bf.season == 1
file_name = "海盗战记/海盗战记 S01E01.mp4"
bf = parse_torrent(file_name)
bf = torrent_parser(file_name)
assert bf.title == "海盗战记"
assert bf.episode == 1
assert bf.season == 1
file_name = "海盗战记 S01E01.zh-tw.ass"
sf = parse_torrent(file_name, file_type="subtitle")
sf = torrent_parser(file_name, file_type="subtitle")
assert sf.title == "海盗战记"
assert sf.episode == 1
assert sf.season == 1
assert sf.language == "zh-tw"
file_name = "海盗战记 S01E01.SC.ass"
sf = parse_torrent(file_name, file_type="subtitle")
sf = torrent_parser(file_name, file_type="subtitle")
assert sf.title == "海盗战记"
assert sf.season == 1
assert sf.episode == 1