Refactor eps complete.

Fix api bugs.

Add new module to parse path problem
This commit is contained in:
EstrellaXD
2023-05-19 17:10:37 +08:00
parent c26b6698b4
commit 8f548d8cf8
16 changed files with 256 additions and 295 deletions

View File

@@ -1,3 +1,3 @@
from .eps_complete import FullSeasonGet
from .collector import SeasonCollector, eps_complete
from .renamer import Renamer
from .torrent import TorrentManager

View File

@@ -0,0 +1,55 @@
import logging
from module.downloader import DownloadClient
from module.models import BangumiData
from module.database import BangumiDatabase
from module.searcher import SearchTorrent
logger = logging.getLogger(__name__)
class SeasonCollector(DownloadClient):
def add_season_torrents(self, data: BangumiData, torrents):
for torrent in torrents:
download_info = {
"url": torrent.torrent_link,
"save_path": self._gen_save_path(data),
}
self.add_torrent(download_info)
def collect_season(self, data: BangumiData, link: str = None):
logger.info(f"Start collecting {data.official_title} Season {data.season}...")
with SearchTorrent() as st:
if not link:
torrents = st.search_season(data)
else:
torrents = st.get_torrents(link)
self.add_season_torrents(data, torrents)
logger.info("Completed!")
def subscribe_season(self, data: BangumiData):
data.added = True
data.eps_collect = True
with BangumiDatabase() as db:
db.insert(data)
self.add_rss_feed(data.rss_link[0], item_path=data.official_title)
self.set_rule(data)
def eps_complete():
with BangumiDatabase() as bd:
datas = bd.not_complete()
if datas:
logger.info("Start collecting full season...")
for data in datas:
if not data.eps_collect:
with SeasonCollector() as sc:
sc.collect_season(data)
data.eps_collect = True
bd.update_list(datas)
if __name__ == '__main__':
from module.conf import setup_logger
setup_logger()
eps_complete()

View File

@@ -1,84 +0,0 @@
import os.path
import re
import logging
from module.network import RequestContent
from module.downloader import DownloadClient
from module.models import BangumiData
from module.database import BangumiDatabase
from module.searcher import SearchTorrent
from module.conf import settings
logger = logging.getLogger(__name__)
SEARCH_KEY = [
"group_name",
"title_raw",
"season_raw",
"subtitle",
"source",
"dpi",
]
class FullSeasonGet(DownloadClient):
def init_search_str(self, data: BangumiData):
str_list = []
for key in SEARCH_KEY:
data_dict = data.dict()
if data_dict[key] is not None:
str_list.append(data_dict[key])
return str_list
def get_season_torrents(self, data: BangumiData):
keywords = self.init_search_str(data)
with SearchTorrent() as st:
torrents = st.search_torrents(keywords)
return [torrent for torrent in torrents if data.title_raw in torrent.name]
def collect_season(self, data: BangumiData, torrents):
official_title = f"{data.official_title}({data.year})" if data.year else data.official_title
for torrent in torrents:
download_info = {
"url": torrent.torrent_link,
"save_path": os.path.join(
settings.downloader.path, official_title, f"Season {data.season}"
),
}
self.add_torrent(download_info)
def download_season(self, data: BangumiData):
logger.info(f"Start collecting {data.official_title} Season {data.season}...")
torrents = self.get_season_torrents(data)
self.collect_season(data, torrents)
logger.info("Completed!")
data.eps_collect = True
def eps_complete(self):
with BangumiDatabase() as bd:
datas = bd.not_complete()
if datas:
logger.info("Start collecting full season...")
for data in datas:
if not data.eps_collect:
self.download_season(data)
bd.update_list(datas)
def download_collection(
self, data: BangumiData, link
):
with RequestContent() as req:
torrents = req.get_torrents(link)
logger.info(f"Starting download {data.official_title} Season {data.season}...")
self.collect_season(data, torrents)
logger.info("Completed!")
def add_subscribe(self, data: BangumiData):
self.add_rss_feed(data.rss_link, item_path=data.official_title)
self.set_rule(data)
if __name__ == '__main__':
from module.conf import setup_logger
setup_logger()
with FullSeasonGet() as full_season_get:
full_season_get.eps_complete()

View File

@@ -1,20 +1,13 @@
import re
import logging
from pathlib import PurePath, PureWindowsPath
from module.downloader import DownloadClient
from module.parser import TitleParser
from module.network import PostNotification
from module.models import SubtitleFile, EpisodeFile, Notification
from module.conf import settings, PLATFORM
from module.conf import settings
from module.database import BangumiDatabase
if PLATFORM == "Windows":
import ntpath as path
else:
import os.path as path
logger = logging.getLogger(__name__)
@@ -22,7 +15,7 @@ logger = logging.getLogger(__name__)
class Renamer(DownloadClient):
def __init__(self):
super().__init__()
self._renamer = TitleParser()
self._parser = TitleParser()
@staticmethod
def print_result(torrent_count, rename_count):
@@ -32,24 +25,6 @@ class Renamer(DownloadClient):
)
logger.debug(f"Checked {torrent_count} files")
def rename_info(self, category="Bangumi"):
recent_info = self.get_torrent_info(category=category)
torrent_count = len(recent_info)
return recent_info, torrent_count
@staticmethod
def check_files(info):
media_list = []
subtitle_list = []
for f in info.files:
file_name = f.name
suffix = path.splitext(file_name)[-1]
if suffix.lower() in [".mp4", ".mkv"]:
media_list.append(file_name)
elif suffix.lower() in [".ass", ".srt"]:
subtitle_list.append(file_name)
return media_list, subtitle_list
@staticmethod
def gen_path(file_info: EpisodeFile | SubtitleFile, bangumi_name: str, method: str) -> str:
season = f"0{file_info.season}" if file_info.season < 10 else file_info.season
@@ -91,8 +66,9 @@ class Renamer(DownloadClient):
method: str,
season: int,
_hash: str,
**kwargs
):
ep = self._renamer.torrent_parser(
ep = self._parser.torrent_parser(
torrent_name=torrent_name,
torrent_path=media_path,
season=season,
@@ -111,17 +87,16 @@ class Renamer(DownloadClient):
def rename_collection(
self,
info,
media_list: list[str],
bangumi_name: str,
season: int,
method: str,
_hash: str,
**kwargs
):
_hash = info.hash
for media_path in media_list:
path_len = len(media_path.split(path.sep))
if path_len <= 2:
ep = self._renamer.torrent_parser(
if self.is_ep(media_path):
ep = self._parser.torrent_parser(
torrent_path=media_path,
season=season,
)
@@ -146,10 +121,11 @@ class Renamer(DownloadClient):
season: int,
method: str,
_hash,
**kwargs
):
method = "subtitle_" + method
for subtitle_path in subtitle_list:
sub = self._renamer.torrent_parser(
sub = self._parser.torrent_parser(
torrent_path=subtitle_path,
torrent_name=torrent_name,
season=season,
@@ -164,78 +140,42 @@ class Renamer(DownloadClient):
if not renamed:
logger.warning(f"{subtitle_path} rename failed")
@staticmethod
def get_season_info(save_path: str, download_path: str):
# Split save path and download path
save_parts = save_path.split(path.sep)
download_parts = download_path.split(path.sep)
# Get bangumi name and season
bangumi_name = ""
season = 1
for part in save_parts:
if re.match(r"S\d+|[Ss]eason \d+", part):
season = int(re.findall(r"\d+", part)[0])
elif part not in download_parts:
bangumi_name = part
return bangumi_name, season
@staticmethod
def get_file_name(file_path: str):
# Check windows or linux path
path_parts = (
PurePath(file_path).parts
if PurePath(file_path).name != file_path
else PureWindowsPath(file_path).parts
)
# Get file name
file_name = path_parts[-1]
return file_name
def rename(self):
# Get torrent info
logger.debug("Start rename process.")
download_path = settings.downloader.path
rename_method = settings.bangumi_manage.rename_method
recent_info, torrent_count = self.rename_info()
for info in recent_info:
torrents_info = self.get_torrent_info()
for info in torrents_info:
media_list, subtitle_list = self.check_files(info)
bangumi_name, season = self.get_season_info(info.save_path, download_path)
bangumi_name, season = self._path_to_bangumi(info.save_path)
kwargs = {
"torrent_name": info.name,
"bangumi_name": bangumi_name,
"method": rename_method,
"season": season,
"_hash": info.hash,
}
# Rename single media file
if len(media_list) == 1:
self.rename_file(
media_path=media_list[0],
torrent_name=info.name,
method=rename_method,
bangumi_name=bangumi_name,
season=season,
_hash=info.hash,
)
self.rename_file(media_path=media_list[0], **kwargs)
# Rename subtitle file
if len(subtitle_list) > 0:
self.rename_subtitles(
subtitle_list=subtitle_list,
torrent_name=info.name,
bangumi_name=bangumi_name,
season=season,
method=rename_method,
_hash=info.hash,
)
self.rename_subtitles(subtitle_list=subtitle_list, **kwargs)
# Rename collection
elif len(media_list) > 1:
logger.info("Start rename collection")
self.rename_collection(
info=info,
media_list=media_list,
bangumi_name=bangumi_name,
season=season,
method=rename_method,
)
self.rename_collection(media_list=media_list, **kwargs)
if len(subtitle_list) > 0:
self.rename_subtitles(
subtitle_list=subtitle_list,
torrent_name=info.name,
bangumi_name=bangumi_name,
season=season,
method=rename_method,
_hash=info.hash,
)
self.rename_subtitles(subtitle_list=subtitle_list,**kwargs)
self.set_category(info.hash, "BangumiCollection")
else:
logger.warning(f"{info.name} has no media file")
logger.debug("Rename process finished.")
if __name__ == '__main__':
from module.conf import setup_logger
settings.log.debug_enable = True
setup_logger()
with Renamer() as renamer:
renamer.rename()

View File

@@ -1,35 +0,0 @@
import logging
from module.downloader import DownloadClient
from module.conf import settings
from module.models import BangumiData
logger = logging.getLogger(__name__)
def __gen_path(data: BangumiData):
download_path = settings.downloader.path
if ":\\" in download_path:
import ntpath as path
else:
import os.path as path
folder = f"{data.official_title}({data.year})" if data.year else data.official_title
path = path.join(download_path, folder, f"Season {data.season}")
return path
def match_torrents_list(title_raw: str) -> list:
with DownloadClient() as client:
torrents = client.get_torrent_info()
return [torrent.hash for torrent in torrents if title_raw in torrent.name]
def set_new_path(data: BangumiData):
with DownloadClient() as client:
# set download rule
client.set_rule(data)
# set torrent path
match_list = match_torrents_list(data.title_raw)
path = __gen_path(data)
client.move_torrent(match_list, path)

View File

@@ -4,41 +4,35 @@ from module.downloader import DownloadClient
from module.conf import settings
from module.models import BangumiData
logger = logging.getLogger(__name__)
class TorrentManager(DownloadClient):
@staticmethod
def __gen_path(data: BangumiData):
download_path = settings.downloader.path
if ":\\" in download_path:
import ntpath as path
else:
import os.path as path
folder = f"{data.official_title}({data.year})" if data.year else data.official_title
path = path.join(download_path, folder, f"Season {data.season}")
return path
@staticmethod
def __match_torrents_list(title_raw: str) -> list:
with DownloadClient() as client:
torrents = client.get_torrent_info()
return [torrent.hash for torrent in torrents if title_raw in torrent.name]
def __match_torrents_list(self, data: BangumiData) -> list:
torrents = self.get_torrent_info()
matched_list = []
for torrent in torrents:
save_path = torrent.save_path
bangumi_name, season = self._path_to_bangumi(save_path)
if data.official_title in bangumi_name and data.season == season:
matched_list.append(torrent.hash)
return matched_list
def delete_bangumi(self, data: BangumiData):
hash_list = self.__match_torrents_list(data.title_raw)
hash_list = self.__match_torrents_list(data)
self.delete_torrent(hash_list)
def delete_rule(self, data: BangumiData):
rule_name = f"{data.official_title}({data.year})" if data.year else data.title_raw
if settings.bangumi_manage.group_tag:
rule_name = f"[{data.group_name}] {rule_name}" if settings.bangumi_manage.group_tag else rule_name
rule_name = f"[{data.group_name}] {rule_name}"
self.remove_rule(rule_name)
def set_new_path(self, data: BangumiData):
# set download rule
self.set_rule(data)
# set torrent path
match_list = self.__match_torrents_list(data.title_raw)
path = self.__gen_path(data)
match_list = self.__match_torrents_list(data)
path = self._gen_save_path(data)
self.move_torrent(match_list, path)