Files
Auto_Bangumi/src/module/manager/renamer.py

239 lines
8.5 KiB
Python

import logging
import os.path
import re
from pathlib import PurePath, PureWindowsPath
from module.core.download_client import DownloadClient
from module.parser import TitleParser
from module.network import PostNotification
from module.models import Config
logger = logging.getLogger(__name__)
class Renamer(DownloadClient):
def __init__(self, settings: Config):
super().__init__(settings)
self._renamer = TitleParser()
self._notification = PostNotification()
self.settings = settings
@staticmethod
def print_result(torrent_count, rename_count):
if rename_count != 0:
logger.info(
f"Finished checking {torrent_count} files' name, renamed {rename_count} files."
)
logger.debug(f"Checked {torrent_count} files")
def get_torrent_info(self, category="Bangumi"):
recent_info = self.get_torrent_info(category=category)
torrent_count = len(recent_info)
return recent_info, torrent_count
@staticmethod
def check_files(info):
media_list = []
subtitle_list = []
for f in info.files:
file_name = f.name
suffix = os.path.splitext(file_name)[-1]
if suffix.lower() in [".mp4", ".mkv"]:
media_list.append(file_name)
elif suffix.lower() in [".ass", ".srt"]:
subtitle_list.append(file_name)
return media_list, subtitle_list
def rename_file(
self,
info,
media_path: str,
method: str,
bangumi_name: str,
season: int,
remove_bad_torrents: bool,
):
torrent_name = info.name
suffix = os.path.splitext(media_path)[-1]
compare_name = self.get_file_name(media_path)
new_path = self._renamer.torrent_parser(
torrent_name=torrent_name,
bangumi_name=bangumi_name,
season=season,
suffix=suffix,
method=method,
)
if compare_name != new_path:
try:
self.rename_torrent_file(
_hash=info.hash, old_path=media_path, new_path=new_path
)
self._notification.send_msg(bangumi_name, f"{new_path}已经更新,已自动重命名。")
except Exception as e:
logger.warning(f"{torrent_name} rename failed")
logger.warning(
f"Season name: {bangumi_name}, Season: {season}, Suffix: {suffix}"
)
logger.debug(e)
# Delete bad torrent
self.delete_bad_torrent(info, remove_bad_torrents)
def rename_collection(
self,
info,
media_list: list[str],
bangumi_name: str,
season: int,
remove_bad_torrents: bool,
method: str,
):
_hash = info.hash
for media_path in media_list:
path_len = len(media_path.split(os.path.sep))
if path_len <= 2:
suffix = os.path.splitext(media_path)[-1]
torrent_name = self.get_file_name(media_path)
new_name = self._renamer.torrent_parser(
torrent_name=torrent_name,
bangumi_name=bangumi_name,
season=season,
suffix=suffix,
method=method,
)
if torrent_name != new_name:
try:
self.rename_torrent_file(
_hash=_hash, old_path=media_path, new_path=new_name
)
except Exception as e:
logger.warning(f"{torrent_name} rename failed")
logger.warning(
f"Bangumi name: {bangumi_name}, Season: {season}, Suffix: {suffix}"
)
logger.debug(e)
# Delete bad torrent.
self.delete_bad_torrent(info, remove_bad_torrents)
self.set_category(category="BangumiCollection", hashes=_hash)
def rename_subtitles(
self,
subtitle_list: list[str],
bangumi_name: str,
season: int,
method: str,
_hash,
):
method = "subtitle_" + method
for subtitle_path in subtitle_list:
suffix = os.path.splitext(subtitle_path)[-1]
old_name = self.get_file_name(subtitle_path)
new_name = self._renamer.torrent_parser(
method=method,
torrent_name=old_name,
bangumi_name=bangumi_name,
season=season,
suffix=suffix,
)
if old_name != new_name:
try:
self.rename_torrent_file(
_hash=_hash, old_path=subtitle_path, new_path=new_name
)
except Exception as e:
logger.warning(f"{old_name} rename failed")
logger.warning(f"Suffix: {suffix}")
logger.debug(e)
def delete_bad_torrent(self, info, remove_bad_torrent: bool):
if remove_bad_torrent:
self.delete_torrent(info.hash)
logger.info(f"{info.name} have been deleted.")
@staticmethod
def get_season_info(save_path: str, download_path: str):
# Remove default save path
save_path = save_path.replace(download_path, "")
# Check windows or linux path
path_parts = (
PurePath(save_path).parts
if PurePath(save_path).name != save_path
else PureWindowsPath(save_path).parts
)
# Get folder name
folder_name = (
path_parts[1]
if path_parts[0] == "/" or path_parts[0] == "\\"
else path_parts[0]
)
# Get season
try:
if re.search(r"S\d{1,2}|[Ss]eason", path_parts[-1]) is not None:
season = int(re.search(r"\d{1,2}", path_parts[-1]).group())
else:
season = 1
except Exception as e:
logger.debug(e)
logger.debug("No Season info")
season = 1
return folder_name, season
@staticmethod
def get_file_name(file_path: str):
# Check windows or linux path
path_parts = (
PurePath(file_path).parts
if PurePath(file_path).name != file_path
else PureWindowsPath(file_path).parts
)
# Get file name
file_name = path_parts[-1]
return file_name
def rename(self):
# Get torrent info
download_path = self.settings.downloader.path
rename_method = self.settings.bangumi_manage.rename_method
remove_bad_torrents = self.settings.bangumi_manage.remove_bad_torrent
recent_info, torrent_count = self.get_torrent_info()
for info in recent_info:
media_list, subtitle_list = self.check_files(info)
bangumi_name, season = self.get_season_info(info.save_path, download_path)
if len(media_list) == 1:
self.rename_file(
info=info,
media_path=media_list[0],
method=rename_method,
bangumi_name=bangumi_name,
season=season,
remove_bad_torrents=remove_bad_torrents,
)
if len(subtitle_list) > 0:
self.rename_subtitles(
subtitle_list=subtitle_list,
bangumi_name=bangumi_name,
season=season,
method=rename_method,
_hash=info.hash,
)
elif len(media_list) > 1:
logger.info("Start rename collection")
self.rename_collection(
info=info,
media_list=media_list,
bangumi_name=bangumi_name,
season=season,
remove_bad_torrents=remove_bad_torrents,
method=rename_method,
)
if len(subtitle_list) > 0:
self.rename_subtitles(
subtitle_list=subtitle_list,
bangumi_name=bangumi_name,
season=season,
method=rename_method,
_hash=info.hash,
)
else:
logger.warning(f"{info.name} has no media file")