mirror of
https://github.com/EstrellaXD/Auto_Bangumi.git
synced 2026-04-26 11:32:16 +08:00
fix: fix decode token problem, decoupling rename and notifi, rss parser and database
This commit is contained in:
@@ -3,10 +3,10 @@ import logging
|
||||
from module.downloader import DownloadClient
|
||||
|
||||
from module.parser import TitleParser
|
||||
from module.network import PostNotification
|
||||
# from module.network import PostNotification
|
||||
from module.models import SubtitleFile, EpisodeFile, Notification
|
||||
from module.conf import settings
|
||||
from module.database import BangumiDatabase
|
||||
# from module.database import BangumiDatabase
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -47,23 +47,23 @@ class Renamer(DownloadClient):
|
||||
logger.error(f"[Renamer] Unknown rename method: {method}")
|
||||
return file_info.media_path
|
||||
|
||||
@staticmethod
|
||||
def send_notification(bangumi_name, ep: EpisodeFile):
|
||||
with BangumiDatabase() as db:
|
||||
poster_path = db.match_poster(bangumi_name)
|
||||
poster_link = "https://mikanani.me" + poster_path
|
||||
n = Notification(
|
||||
official_title=bangumi_name,
|
||||
season=ep.season,
|
||||
episode=ep.episode,
|
||||
poster_link=poster_link,
|
||||
)
|
||||
with PostNotification() as notificator:
|
||||
status = notificator.send_msg(n)
|
||||
if status:
|
||||
logger.info(f"[Renamer] Notification sent: {ep.title} S{ep.season}E{ep.episode}")
|
||||
else:
|
||||
logger.warning(f"[Renamer] Notification failed: {ep.title} S{ep.season}E{ep.episode}")
|
||||
# @staticmethod
|
||||
# def send_notification(bangumi_name, ep: EpisodeFile):
|
||||
# with BangumiDatabase() as db:
|
||||
# poster_path = db.match_poster(bangumi_name)
|
||||
# poster_link = "https://mikanani.me" + poster_path
|
||||
# n = Notification(
|
||||
# official_title=bangumi_name,
|
||||
# season=ep.season,
|
||||
# episode=ep.episode,
|
||||
# poster_link=poster_link,
|
||||
# )
|
||||
# with PostNotification() as notificator:
|
||||
# status = notificator.send_msg(n)
|
||||
# if status:
|
||||
# logger.info(f"[Renamer] Notification sent: {ep.title} S{ep.season}E{ep.episode}")
|
||||
# else:
|
||||
# logger.warning(f"[Renamer] Notification failed: {ep.title} S{ep.season}E{ep.episode}")
|
||||
|
||||
def rename_file(
|
||||
self,
|
||||
@@ -87,12 +87,14 @@ class Renamer(DownloadClient):
|
||||
_hash=_hash, old_path=media_path, new_path=new_path
|
||||
)
|
||||
if renamed:
|
||||
if settings.notification.enable:
|
||||
self.send_notification(bangumi_name, ep)
|
||||
return True
|
||||
logger.warning(f"[Renamer] {media_path} parse failed")
|
||||
if settings.bangumi_manage.remove_bad_torrent:
|
||||
self.delete_torrent(hashes=_hash)
|
||||
# if settings.notification.enable:
|
||||
# self.send_notification(bangumi_name, ep)
|
||||
return ep
|
||||
else:
|
||||
logger.warning(f"[Renamer] {media_path} parse failed")
|
||||
if settings.bangumi_manage.remove_bad_torrent:
|
||||
self.delete_torrent(hashes=_hash)
|
||||
return None
|
||||
|
||||
def rename_collection(
|
||||
self,
|
||||
@@ -154,6 +156,7 @@ class Renamer(DownloadClient):
|
||||
logger.debug("[Renamer] Start rename process.")
|
||||
rename_method = settings.bangumi_manage.rename_method
|
||||
torrents_info = self.get_torrent_info()
|
||||
renamed_info = []
|
||||
for info in torrents_info:
|
||||
media_list, subtitle_list = self.check_files(info)
|
||||
bangumi_name, season = self._path_to_bangumi(info.save_path)
|
||||
@@ -166,7 +169,9 @@ class Renamer(DownloadClient):
|
||||
}
|
||||
# Rename single media file
|
||||
if len(media_list) == 1:
|
||||
self.rename_file(media_path=media_list[0], **kwargs)
|
||||
ep_info = self.rename_file(media_path=media_list[0], **kwargs)
|
||||
if ep_info:
|
||||
renamed_info.append(ep_info)
|
||||
# Rename subtitle file
|
||||
if len(subtitle_list) > 0:
|
||||
self.rename_subtitles(subtitle_list=subtitle_list, **kwargs)
|
||||
@@ -180,11 +185,11 @@ class Renamer(DownloadClient):
|
||||
else:
|
||||
logger.warning(f"[Renamer] {info.name} has no media file")
|
||||
logger.debug("[Renamer] Rename process finished.")
|
||||
return renamed_info
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from module.conf import setup_logger
|
||||
|
||||
settings.log.debug_enable = True
|
||||
setup_logger()
|
||||
with Renamer() as renamer:
|
||||
|
||||
@@ -13,62 +13,77 @@ class TorrentManager(BangumiDatabase):
|
||||
def __match_torrents_list(data: BangumiData) -> list:
|
||||
with DownloadClient() as client:
|
||||
torrents = client.get_torrent_info()
|
||||
matched_list = []
|
||||
for torrent in torrents:
|
||||
if data.save_path == torrent.save_path:
|
||||
matched_list.append(torrent.hash)
|
||||
return matched_list
|
||||
return [torrent.hash for torrent in torrents if torrent.save_path == data.save_path]
|
||||
|
||||
def delete_torrents(self, _id: int | str):
|
||||
data = self.search_one(int(_id))
|
||||
if isinstance(data, BangumiData):
|
||||
hash_list = self.__match_torrents_list(data)
|
||||
with DownloadClient() as client:
|
||||
client.delete_torrent(hash_list)
|
||||
def delete_torrents(self, data: BangumiData, client: DownloadClient):
|
||||
hash_list = self.__match_torrents_list(data)
|
||||
if hash_list:
|
||||
client.delete_torrent(hash_list)
|
||||
logger.info(f"Delete rule and torrents for {data.official_title}")
|
||||
return {
|
||||
"status": "success",
|
||||
"msg": f"Delete torrents for {data.official_title}",
|
||||
}
|
||||
else:
|
||||
return data
|
||||
return {
|
||||
"status": "error",
|
||||
"msg": f"Can't find torrents for {data.official_title}",
|
||||
}
|
||||
|
||||
def delete_rule(self, _id: int | str, file: bool = False):
|
||||
data = self.search_id(int(_id))
|
||||
if isinstance(data, BangumiData):
|
||||
self.delete_one(int(_id))
|
||||
if file:
|
||||
self.delete_torrents(data.id)
|
||||
logger.info(f"Delete {data.official_title} and torrents.")
|
||||
with DownloadClient() as client:
|
||||
client.remove_rule(data.rule_name)
|
||||
self.delete_one(int(_id))
|
||||
if file:
|
||||
self.delete_torrents(data, client)
|
||||
return {
|
||||
"status": "success",
|
||||
"msg": f"Delete rule and torrents for {data.official_title}",
|
||||
}
|
||||
logger.info(f"Delete rule for {data.official_title}")
|
||||
return {
|
||||
"status": "success",
|
||||
"msg": f"Delete {data.official_title} and torrents.",
|
||||
"msg": f"Delete rule for {data.official_title}",
|
||||
}
|
||||
logger.info(f"Delete {data.official_title}")
|
||||
return {"status": "success", "msg": f"Delete {data.official_title}"}
|
||||
else:
|
||||
return data
|
||||
return {"status": "error", "msg": f"Can't find id {_id}"}
|
||||
# data = self.search_id(int(_id))
|
||||
# if isinstance(data, BangumiData):
|
||||
# self.delete_one(int(_id))
|
||||
# if file:
|
||||
# self.delete_torrents(data)
|
||||
# logger.info(f"Delete {data.official_title} and torrents.")
|
||||
# return {
|
||||
# "status": "success",
|
||||
# "msg": f"Delete {data.official_title} and torrents.",
|
||||
# }
|
||||
# logger.info(f"Delete {data.official_title}")
|
||||
# return {"status": "success", "msg": f"Delete {data.official_title}"}
|
||||
# else:
|
||||
# return data
|
||||
|
||||
def disable_rule(self, _id: str | int, file: bool = False):
|
||||
data = self.search_id(int(_id))
|
||||
if isinstance(data, BangumiData):
|
||||
with DownloadClient() as client:
|
||||
client.remove_rule(data.rule_name)
|
||||
data.deleted = True
|
||||
self.update_one(data)
|
||||
if file:
|
||||
self.delete_torrents(data.id)
|
||||
logger.info(f"Delete rule and torrents for {data.official_title}")
|
||||
data.deleted = True
|
||||
self.update_one(data)
|
||||
if file:
|
||||
self.delete_torrents(data, client)
|
||||
return {
|
||||
"status": "success",
|
||||
"msg": f"Disable rule and delete torrents for {data.official_title}",
|
||||
}
|
||||
logger.info(f"Disable rule for {data.official_title}")
|
||||
return {
|
||||
"status": "success",
|
||||
"msg": f"Disable rule and delete torrents for {data.official_title}",
|
||||
"msg": f"Disable rule for {data.official_title}",
|
||||
}
|
||||
logger.info(f"Disable rule for {data.official_title}")
|
||||
return {
|
||||
"status": "success",
|
||||
"msg": f"Disable rule for {data.official_title}",
|
||||
}
|
||||
else:
|
||||
return data
|
||||
return {"status": "error", "msg": f"Can't find data with id {_id}"}
|
||||
|
||||
def enable_rule(self, _id: str | int):
|
||||
data = self.search_id(int(_id))
|
||||
|
||||
Reference in New Issue
Block a user