From ebb5e0995c04d738419ab81158ea2ac63f811e2d Mon Sep 17 00:00:00 2001 From: 100gle Date: Tue, 13 Jun 2023 23:44:38 +0800 Subject: [PATCH 01/11] feat: use `pathlib` to replace `os.path` for handling file path --- backend/.vscode/settings.json | 8 + backend/src/module/api/log.py | 7 +- backend/src/module/conf/__init__.py | 17 +- backend/src/module/conf/config.py | 180 +++++++++--------- backend/src/module/conf/log.py | 60 +++--- backend/src/module/core/status.py | 108 +++++------ backend/src/module/database/connector.py | 29 ++- backend/src/module/manager/collector.py | 2 - .../src/module/notification/notification.py | 3 +- .../src/module/notification/plugin/wecom.py | 6 +- .../src/module/parser/analyser/raw_parser.py | 2 +- .../src/module/parser/analyser/tmdb_parser.py | 16 +- .../module/parser/analyser/torrent_parser.py | 22 ++- backend/src/module/update/data_migration.py | 42 ++-- 14 files changed, 264 insertions(+), 238 deletions(-) create mode 100644 backend/.vscode/settings.json diff --git a/backend/.vscode/settings.json b/backend/.vscode/settings.json new file mode 100644 index 00000000..24dec46e --- /dev/null +++ b/backend/.vscode/settings.json @@ -0,0 +1,8 @@ +{ + "python.formatting.provider": "none", + "python.formatting.blackPath": "black", + "editor.formatOnSave": true, + "[python]": { + "editor.defaultFormatter": "ms-python.black-formatter" + } +} diff --git a/backend/src/module/api/log.py b/backend/src/module/api/log.py index 9956dffe..88ec2c57 100644 --- a/backend/src/module/api/log.py +++ b/backend/src/module/api/log.py @@ -14,7 +14,7 @@ async def get_log(current_user=Depends(get_current_user)): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid token" ) - if os.path.isfile(LOG_PATH): + if LOG_PATH.exists(): with open(LOG_PATH, "rb") as f: return Response(f.read(), media_type="text/plain") else: @@ -27,9 +27,8 @@ async def clear_log(current_user=Depends(get_current_user)): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid token" ) - if os.path.isfile(LOG_PATH): - with open(LOG_PATH, "w") as f: - f.write("") + if LOG_PATH.exists(): + LOG_PATH.write_text("") return {"status": "ok"} else: return Response("Log file not found", status_code=404) diff --git a/backend/src/module/conf/__init__.py b/backend/src/module/conf/__init__.py index afb47703..8af4f5ea 100644 --- a/backend/src/module/conf/__init__.py +++ b/backend/src/module/conf/__init__.py @@ -1,7 +1,10 @@ -from .config import VERSION, settings -from .log import LOG_PATH, setup_logger - -TMDB_API = "32b19d6a05b512190a056fa4e747cbbc" -DATA_PATH = "data/data.db" - -PLATFORM = "Windows" if "\\" in settings.downloader.path else "Unix" +import pathlib + +from .config import VERSION, settings +from .log import LOG_PATH, setup_logger + +TMDB_API = "32b19d6a05b512190a056fa4e747cbbc" +DATA_PATH = pathlib.Path("data/data.db") +LEGACY_DATA_PATH = pathlib.Path("data/data.json") + +PLATFORM = "Windows" if "\\" in settings.downloader.path else "Unix" diff --git a/backend/src/module/conf/config.py b/backend/src/module/conf/config.py index 051feeca..5b3cf67f 100644 --- a/backend/src/module/conf/config.py +++ b/backend/src/module/conf/config.py @@ -1,89 +1,91 @@ -import json -import logging -import os - -from dotenv import load_dotenv - -from module.models.config import Config - -from .const import ENV_TO_ATTR - -logger = logging.getLogger(__name__) - -try: - from module.__version__ import VERSION - - if VERSION == "DEV_VERSION": - logger.info("Can't find version info, use DEV_VERSION instead") - CONFIG_PATH = "config/config_dev.json" - else: - CONFIG_PATH = "config/config.json" -except ImportError: - logger.info("Can't find version info, use DEV_VERSION instead") - VERSION = "DEV_VERSION" - CONFIG_PATH = "config/config_dev.json" - - -class Settings(Config): - def __init__(self): - super().__init__() - if os.path.exists(CONFIG_PATH): - self.load() - self.save() - else: - self.init() - - def load(self): - with open(CONFIG_PATH, "r", encoding="utf-8") as f: - config = json.load(f) - config_obj = Config.parse_obj(config) - self.__dict__.update(config_obj.__dict__) - logger.info("Config loaded") - - def save(self, config_dict: dict | None = None): - if not config_dict: - config_dict = self.dict() - with open(CONFIG_PATH, "w", encoding="utf-8") as f: - json.dump(config_dict, f, indent=4, ensure_ascii=False) - - def init(self): - load_dotenv(".env") - self.__load_from_env() - self.save() - - @property - def rss_link(self) -> str: - if "://" not in self.rss_parser.custom_url: - return f"https://{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}" - return ( - f"{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}" - ) - - def __load_from_env(self): - config_dict = self.dict() - for key, section in ENV_TO_ATTR.items(): - for env, attr in section.items(): - if env in os.environ: - if isinstance(attr, list): - for _attr in attr: - attr_name = _attr[0] if isinstance(_attr, tuple) else _attr - config_dict[key][attr_name] = self.__val_from_env( - env, _attr - ) - else: - attr_name = attr[0] if isinstance(attr, tuple) else attr - config_dict[key][attr_name] = self.__val_from_env(env, attr) - config_obj = Config.parse_obj(config_dict) - self.__dict__.update(config_obj.__dict__) - logger.info("Config loaded from env") - - @staticmethod - def __val_from_env(env: str, attr: tuple): - if isinstance(attr, tuple): - conv_func = attr[1] - return conv_func(os.environ[env]) - else: - return os.environ[env] - - -settings = Settings() +import json +import logging +import os +import pathlib + +from dotenv import load_dotenv + +from module.models.config import Config + +from .const import ENV_TO_ATTR + +logger = logging.getLogger(__name__) +CONFIG_ROOT = pathlib.Path("src/config") + + +try: + from module.__version__ import VERSION +except ImportError: + logger.info("Can't find version info, use DEV_VERSION instead") + VERSION = "DEV_VERSION" + +CONFIG_PATH = ( + CONFIG_ROOT / "config_dev.json" + if VERSION == "DEV_VERSION" + else CONFIG_ROOT / "config.json" +).resolve() + + +class Settings(Config): + def __init__(self): + super().__init__() + if CONFIG_PATH.exists(): + self.load() + self.save() + else: + self.init() + + def load(self): + with open(CONFIG_PATH, "r", encoding="utf-8") as f: + config = json.load(f) + config_obj = Config.parse_obj(config) + self.__dict__.update(config_obj.__dict__) + logger.info("Config loaded") + + def save(self, config_dict: dict | None = None): + if not config_dict: + config_dict = self.dict() + with open(CONFIG_PATH, "w", encoding="utf-8") as f: + json.dump(config_dict, f, indent=4, ensure_ascii=False) + + def init(self): + load_dotenv(".env") + self.__load_from_env() + self.save() + + @property + def rss_link(self) -> str: + if "://" not in self.rss_parser.custom_url: + return f"https://{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}" + return ( + f"{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}" + ) + + def __load_from_env(self): + config_dict = self.dict() + for key, section in ENV_TO_ATTR.items(): + for env, attr in section.items(): + if env in os.environ: + if isinstance(attr, list): + for _attr in attr: + attr_name = _attr[0] if isinstance(_attr, tuple) else _attr + config_dict[key][attr_name] = self.__val_from_env( + env, _attr + ) + else: + attr_name = attr[0] if isinstance(attr, tuple) else attr + config_dict[key][attr_name] = self.__val_from_env(env, attr) + config_obj = Config.parse_obj(config_dict) + self.__dict__.update(config_obj.__dict__) + logger.info("Config loaded from env") + + @staticmethod + def __val_from_env(env: str, attr: tuple): + if isinstance(attr, tuple): + conv_func = attr[1] + return conv_func(os.environ[env]) + else: + return os.environ[env] + + +settings = Settings() diff --git a/backend/src/module/conf/log.py b/backend/src/module/conf/log.py index 2d503e74..9d28fe83 100644 --- a/backend/src/module/conf/log.py +++ b/backend/src/module/conf/log.py @@ -1,29 +1,31 @@ -import logging -import os - -from .config import settings - -LOG_PATH = "data/log.txt" - - -def setup_logger(level: int = logging.INFO, reset: bool = False): - level = logging.DEBUG if settings.log.debug_enable else level - if not os.path.isdir("data"): - os.mkdir("data") - if reset and os.path.isfile(LOG_PATH): - os.remove(LOG_PATH) - logging.addLevelName(logging.DEBUG, "DEBUG:") - logging.addLevelName(logging.INFO, "INFO:") - logging.addLevelName(logging.WARNING, "WARNING:") - LOGGING_FORMAT = "[%(asctime)s] %(levelname)-8s %(message)s" - TIME_FORMAT = "%Y-%m-%d %H:%M:%S" - logging.basicConfig( - level=level, - format=LOGGING_FORMAT, - datefmt=TIME_FORMAT, - encoding="utf-8", - handlers=[ - logging.FileHandler(LOG_PATH, encoding="utf-8"), - logging.StreamHandler(), - ], - ) +import logging +import pathlib + +from .config import settings + +LOG_ROOT = pathlib.Path("src/data") +LOG_PATH = LOG_ROOT / "log.txt" + + +def setup_logger(level: int = logging.INFO, reset: bool = False): + level = logging.DEBUG if settings.log.debug_enable else level + LOG_ROOT.mkdir(exist_ok=True) + + if reset and LOG_PATH.exists(): + LOG_PATH.unlink(missing_ok=True) + + logging.addLevelName(logging.DEBUG, "DEBUG:") + logging.addLevelName(logging.INFO, "INFO:") + logging.addLevelName(logging.WARNING, "WARNING:") + LOGGING_FORMAT = "[%(asctime)s] %(levelname)-8s %(message)s" + TIME_FORMAT = "%Y-%m-%d %H:%M:%S" + logging.basicConfig( + level=level, + format=LOGGING_FORMAT, + datefmt=TIME_FORMAT, + encoding="utf-8", + handlers=[ + logging.FileHandler(LOG_PATH, encoding="utf-8"), + logging.StreamHandler(), + ], + ) diff --git a/backend/src/module/core/status.py b/backend/src/module/core/status.py index 6f056463..674e568b 100644 --- a/backend/src/module/core/status.py +++ b/backend/src/module/core/status.py @@ -1,54 +1,54 @@ -import asyncio -import os.path -import threading - -from module.checker import Checker - - -class ProgramStatus(Checker): - def __init__(self): - super().__init__() - self.stop_event = threading.Event() - self.lock = threading.Lock() - self._downloader_status = False - self._torrents_status = False - self.event = asyncio.Event() - - @property - def is_running(self): - if self.stop_event.is_set() or self.check_first_run(): - return False - else: - return True - - @property - def is_stopped(self): - return self.stop_event.is_set() - - @property - def downloader_status(self): - if not self._downloader_status: - self._downloader_status = self.check_downloader() - return self._downloader_status - - @property - def torrents_status(self): - if not self._torrents_status: - self._torrents_status = self.check_torrents() - return self._torrents_status - - @property - def enable_rss(self): - return self.check_analyser() - - @property - def enable_renamer(self): - return self.check_renamer() - - @property - def first_run(self): - return self.check_first_run() - - @property - def legacy_data(self): - return os.path.exists("data/data.json") +import asyncio +import threading + +from module.checker import Checker +from module.conf import LEGACY_DATA_PATH + + +class ProgramStatus(Checker): + def __init__(self): + super().__init__() + self.stop_event = threading.Event() + self.lock = threading.Lock() + self._downloader_status = False + self._torrents_status = False + self.event = asyncio.Event() + + @property + def is_running(self): + if self.stop_event.is_set() or self.check_first_run(): + return False + else: + return True + + @property + def is_stopped(self): + return self.stop_event.is_set() + + @property + def downloader_status(self): + if not self._downloader_status: + self._downloader_status = self.check_downloader() + return self._downloader_status + + @property + def torrents_status(self): + if not self._torrents_status: + self._torrents_status = self.check_torrents() + return self._torrents_status + + @property + def enable_rss(self): + return self.check_analyser() + + @property + def enable_renamer(self): + return self.check_renamer() + + @property + def first_run(self): + return self.check_first_run() + + @property + def legacy_data(self): + return LEGACY_DATA_PATH.exists() diff --git a/backend/src/module/database/connector.py b/backend/src/module/database/connector.py index 36d492cb..506bb7b1 100644 --- a/backend/src/module/database/connector.py +++ b/backend/src/module/database/connector.py @@ -10,8 +10,8 @@ logger = logging.getLogger(__name__) class DataConnector: def __init__(self): # Create folder if not exists - if not os.path.exists(os.path.dirname(DATA_PATH)): - os.makedirs(os.path.dirname(DATA_PATH)) + DATA_PATH.parent.mkdir(parents=True, exist_ok=True) + self._conn = sqlite3.connect(DATA_PATH) self._cursor = self._conn.cursor() @@ -99,10 +99,14 @@ class DataConnector: def _delete(self, table_name: str, condition: dict): condition_sql = " AND ".join([f"{key} = :{key}" for key in condition.keys()]) - self._cursor.execute(f"DELETE FROM {table_name} WHERE {condition_sql}", condition) + self._cursor.execute( + f"DELETE FROM {table_name} WHERE {condition_sql}", condition + ) self._conn.commit() - def _search(self, table_name: str, keys: list[str] | None = None, condition: dict = None): + def _search( + self, table_name: str, keys: list[str] | None = None, condition: dict = None + ): if keys is None: select_sql = "*" else: @@ -111,20 +115,25 @@ class DataConnector: self._cursor.execute(f"SELECT {select_sql} FROM {table_name}") else: custom_condition = condition.pop("_custom_condition", None) - condition_sql = " AND ".join([f"{key} = :{key}" for key in condition.keys()]) + ( - f" AND {custom_condition}" if custom_condition else "" - ) + condition_sql = " AND ".join( + [f"{key} = :{key}" for key in condition.keys()] + ) + (f" AND {custom_condition}" if custom_condition else "") self._cursor.execute( - f"SELECT {select_sql} FROM {table_name} WHERE {condition_sql}", condition + f"SELECT {select_sql} FROM {table_name} WHERE {condition_sql}", + condition, ) - def _search_data(self, table_name: str, keys: list[str] | None = None, condition: dict = None) -> dict: + def _search_data( + self, table_name: str, keys: list[str] | None = None, condition: dict = None + ) -> dict: if keys is None: keys = self.__get_table_columns(table_name) self._search(table_name, keys, condition) return dict(zip(keys, self._cursor.fetchone())) - def _search_datas(self, table_name: str, keys: list[str] | None = None, condition: dict = None) -> list[dict]: + def _search_datas( + self, table_name: str, keys: list[str] | None = None, condition: dict = None + ) -> list[dict]: if keys is None: keys = self.__get_table_columns(table_name) self._search(table_name, keys, condition) diff --git a/backend/src/module/manager/collector.py b/backend/src/module/manager/collector.py index c9e12304..979aa5cb 100644 --- a/backend/src/module/manager/collector.py +++ b/backend/src/module/manager/collector.py @@ -59,5 +59,3 @@ def eps_complete(): sc.collect_season(data) data.eps_collect = True bd.update_list(datas) - - diff --git a/backend/src/module/notification/notification.py b/backend/src/module/notification/notification.py index 000ef171..29ac9d95 100644 --- a/backend/src/module/notification/notification.py +++ b/backend/src/module/notification/notification.py @@ -31,8 +31,7 @@ class PostNotification: def __init__(self): Notifier = getClient(settings.notification.type) self.notifier = Notifier( - token=settings.notification.token, - chat_id=settings.notification.chat_id + token=settings.notification.token, chat_id=settings.notification.chat_id ) @staticmethod diff --git a/backend/src/module/notification/plugin/wecom.py b/backend/src/module/notification/plugin/wecom.py index dc7c811b..01c4635f 100644 --- a/backend/src/module/notification/plugin/wecom.py +++ b/backend/src/module/notification/plugin/wecom.py @@ -27,15 +27,15 @@ class WecomNotification(RequestContent): title = "【番剧更新】" + notify.official_title msg = self.gen_message(notify) picurl = notify.poster_path - #Default pic to avoid blank in message. Resolution:1068*455 + # Default pic to avoid blank in message. Resolution:1068*455 if picurl == "https://mikanani.me": picurl = "https://article.biliimg.com/bfs/article/d8bcd0408bf32594fd82f27de7d2c685829d1b2e.png" data = { - "key":self.token, + "key": self.token, "type": "news", "title": title, "msg": msg, - "picurl":picurl + "picurl": picurl, } resp = self.post_data(self.notification_url, data) logger.debug(f"Wecom notification: {resp.status_code}") diff --git a/backend/src/module/parser/analyser/raw_parser.py b/backend/src/module/parser/analyser/raw_parser.py index 22ab8a38..5de06089 100644 --- a/backend/src/module/parser/analyser/raw_parser.py +++ b/backend/src/module/parser/analyser/raw_parser.py @@ -181,6 +181,6 @@ def raw_parser(raw: str) -> Episode | None: ) -if __name__ == '__main__': +if __name__ == "__main__": title = "[动漫国字幕组&LoliHouse] THE MARGINAL SERVICE - 08 [WebRip 1080p HEVC-10bit AAC][简繁内封字幕]" print(raw_parser(title)) diff --git a/backend/src/module/parser/analyser/tmdb_parser.py b/backend/src/module/parser/analyser/tmdb_parser.py index 449d79fd..393eec4e 100644 --- a/backend/src/module/parser/analyser/tmdb_parser.py +++ b/backend/src/module/parser/analyser/tmdb_parser.py @@ -16,14 +16,13 @@ class TMDBInfo: year: str -LANGUAGE = { - "zh": "zh-CN", - "jp": "ja-JP", - "en": "en-US" -} +LANGUAGE = {"zh": "zh-CN", "jp": "ja-JP", "en": "en-US"} + def search_url(e): return f"https://api.themoviedb.org/3/search/tv?api_key={TMDB_API}&page=1&query={e}&include_adult=false" + + def info_url(e, key): return f"https://api.themoviedb.org/3/tv/{e}?api_key={TMDB_API}&language={LANGUAGE[key]}" @@ -68,8 +67,9 @@ def tmdb_parser(title, language) -> TMDBInfo | None: { "season": s.get("name"), "air_date": s.get("air_date"), - "poster_path": s.get("poster_path") - } for s in info_content.get("seasons") + "poster_path": s.get("poster_path"), + } + for s in info_content.get("seasons") ] last_season = get_season(season) original_title = info_content.get("original_name") @@ -81,7 +81,7 @@ def tmdb_parser(title, language) -> TMDBInfo | None: original_title, season, last_season, - str(year_number) + str(year_number), ) else: return None diff --git a/backend/src/module/parser/analyser/torrent_parser.py b/backend/src/module/parser/analyser/torrent_parser.py index a7eecdb5..366d91ff 100644 --- a/backend/src/module/parser/analyser/torrent_parser.py +++ b/backend/src/module/parser/analyser/torrent_parser.py @@ -1,6 +1,5 @@ import logging -import ntpath as win_path -import os.path as unix_path +import pathlib import re from module.models import EpisodeFile, SubtitleFile @@ -23,11 +22,16 @@ SUBTITLE_LANG = { } -def split_path(torrent_path: str) -> str: - if PLATFORM == "Windows": - return win_path.split(torrent_path)[-1] - else: - return unix_path.split(torrent_path)[-1] +def get_path_basename(torrent_path: str) -> str: + """ + Returns the basename of a path string. + + :param torrent_path: A string representing a path to a file. + :type torrent_path: str + :return: A string representing the basename of the given path. + :rtype: str + """ + return pathlib.Path(torrent_path).name def get_group(group_and_title) -> tuple[str | None, str]: @@ -64,7 +68,7 @@ def torrent_parser( season: int | None = None, file_type: str = "media", ) -> EpisodeFile | SubtitleFile: - media_path = split_path(torrent_path) + media_path = get_path_basename(torrent_path) for rule in RULES: if torrent_name: match_obj = re.match(rule, torrent_name, re.I) @@ -77,7 +81,7 @@ def torrent_parser( else: title, _ = get_season_and_title(title) episode = int(match_obj.group(2)) - suffix = unix_path.splitext(torrent_path)[-1] + suffix = pathlib.Path(torrent_path).suffix if file_type == "media": return EpisodeFile( media_path=torrent_path, diff --git a/backend/src/module/update/data_migration.py b/backend/src/module/update/data_migration.py index 0540ea64..4bdd5b73 100644 --- a/backend/src/module/update/data_migration.py +++ b/backend/src/module/update/data_migration.py @@ -1,20 +1,22 @@ -import os - -from module.database import BangumiDatabase -from module.models import BangumiData -from module.utils import json_config - - -def data_migration(): - if not os.path.isfile("data/data.json"): - return False - old_data = json_config.load("data/data.json") - infos = old_data["bangumi_info"] - rss_link = old_data["rss_link"] - new_data = [] - for info in infos: - new_data.append(BangumiData(**info, rss_link=[rss_link])) - with BangumiDatabase() as database: - database.update_table() - database.insert_list(new_data) - os.remove("data/data.json") +import os + +from module.conf import LEGACY_DATA_PATH +from module.database import BangumiDatabase +from module.models import BangumiData +from module.utils import json_config + + +def data_migration(): + if not LEGACY_DATA_PATH.exists(): + return False + old_data = json_config.load(LEGACY_DATA_PATH) + infos = old_data["bangumi_info"] + rss_link = old_data["rss_link"] + new_data = [] + for info in infos: + new_data.append(BangumiData(**info, rss_link=[rss_link])) + with BangumiDatabase() as database: + database.update_table() + database.insert_list(new_data) + + LEGACY_DATA_PATH.unlink(missing_ok=True) From c9e7088950aa216caa184828d31e690c485a7c86 Mon Sep 17 00:00:00 2001 From: 100gle Date: Tue, 13 Jun 2023 23:46:41 +0800 Subject: [PATCH 02/11] test: testing for `get_path_basename` function --- backend/src/test/test_torrent_parser.py | 153 +++++++++++++----------- 1 file changed, 84 insertions(+), 69 deletions(-) diff --git a/backend/src/test/test_torrent_parser.py b/backend/src/test/test_torrent_parser.py index 54832c7f..5d4e6cb7 100644 --- a/backend/src/test/test_torrent_parser.py +++ b/backend/src/test/test_torrent_parser.py @@ -1,69 +1,84 @@ -from module.parser.analyser import torrent_parser - - -def test_torrent_parser(): - file_name = "[Lilith-Raws] Boku no Kokoro no Yabai Yatsu - 01 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4].mp4" - bf = torrent_parser(file_name) - assert bf.title == "Boku no Kokoro no Yabai Yatsu" - assert bf.group == "Lilith-Raws" - assert bf.episode == 1 - assert bf.season == 1 - - file_name = "[Sakurato] Tonikaku Kawaii S2 [01][AVC-8bit 1080p AAC][CHS].mp4" - bf = torrent_parser(file_name) - assert bf.title == "Tonikaku Kawaii" - assert bf.group == "Sakurato" - assert bf.episode == 1 - assert bf.season == 2 - - file_name = "[SweetSub&LoliHouse] Heavenly Delusion - 01 [WebRip 1080p HEVC-10bit AAC ASSx2].mkv" - bf = torrent_parser(file_name) - assert bf.title == "Heavenly Delusion" - assert bf.group == "SweetSub&LoliHouse" - assert bf.episode == 1 - assert bf.season == 1 - - file_name = "[SBSUB][CONAN][1082][V2][1080P][AVC_AAC][CHS_JP](C1E4E331).mp4" - bf = torrent_parser(file_name) - assert bf.title == "CONAN" - assert bf.group == "SBSUB" - assert bf.episode == 1082 - assert bf.season == 1 - - file_name = "海盗战记 (2019) S01E01.mp4" - bf = torrent_parser(file_name) - assert bf.title == "海盗战记 (2019)" - assert bf.episode == 1 - assert bf.season == 1 - - file_name = "海盗战记/海盗战记 S01E01.mp4" - bf = torrent_parser(file_name) - assert bf.title == "海盗战记" - assert bf.episode == 1 - assert bf.season == 1 - - file_name = "海盗战记 S01E01.zh-tw.ass" - sf = torrent_parser(file_name, file_type="subtitle") - assert sf.title == "海盗战记" - assert sf.episode == 1 - assert sf.season == 1 - assert sf.language == "zh-tw" - - file_name = "海盗战记 S01E01.SC.ass" - sf = torrent_parser(file_name, file_type="subtitle") - assert sf.title == "海盗战记" - assert sf.season == 1 - assert sf.episode == 1 - assert sf.language == "zh" - - file_name = "水星的魔女(2022) S00E19.mp4" - bf = torrent_parser(file_name, season=0) - assert bf.title == "水星的魔女(2022)" - assert bf.season == 0 - assert bf.episode == 19 - - file_name = "【失眠搬运组】放学后失眠的你-Kimi wa Houkago Insomnia - 06 [bilibili - 1080p AVC1 CHS-JP].mp4" - bf = torrent_parser(file_name, season=1) - assert bf.title == "放学后失眠的你-Kimi wa Houkago Insomnia" - assert bf.season == 1 - assert bf.episode == 6 +from module.parser.analyser import torrent_parser +from module.parser.analyser.torrent_parser import get_path_basename + + +def test_torrent_parser(): + file_name = "[Lilith-Raws] Boku no Kokoro no Yabai Yatsu - 01 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4].mp4" + bf = torrent_parser(file_name) + assert bf.title == "Boku no Kokoro no Yabai Yatsu" + assert bf.group == "Lilith-Raws" + assert bf.episode == 1 + assert bf.season == 1 + + file_name = "[Sakurato] Tonikaku Kawaii S2 [01][AVC-8bit 1080p AAC][CHS].mp4" + bf = torrent_parser(file_name) + assert bf.title == "Tonikaku Kawaii" + assert bf.group == "Sakurato" + assert bf.episode == 1 + assert bf.season == 2 + + file_name = "[SweetSub&LoliHouse] Heavenly Delusion - 01 [WebRip 1080p HEVC-10bit AAC ASSx2].mkv" + bf = torrent_parser(file_name) + assert bf.title == "Heavenly Delusion" + assert bf.group == "SweetSub&LoliHouse" + assert bf.episode == 1 + assert bf.season == 1 + + file_name = "[SBSUB][CONAN][1082][V2][1080P][AVC_AAC][CHS_JP](C1E4E331).mp4" + bf = torrent_parser(file_name) + assert bf.title == "CONAN" + assert bf.group == "SBSUB" + assert bf.episode == 1082 + assert bf.season == 1 + + file_name = "海盗战记 (2019) S01E01.mp4" + bf = torrent_parser(file_name) + assert bf.title == "海盗战记 (2019)" + assert bf.episode == 1 + assert bf.season == 1 + + file_name = "海盗战记/海盗战记 S01E01.mp4" + bf = torrent_parser(file_name) + assert bf.title == "海盗战记" + assert bf.episode == 1 + assert bf.season == 1 + + file_name = "海盗战记 S01E01.zh-tw.ass" + sf = torrent_parser(file_name, file_type="subtitle") + assert sf.title == "海盗战记" + assert sf.episode == 1 + assert sf.season == 1 + assert sf.language == "zh-tw" + + file_name = "海盗战记 S01E01.SC.ass" + sf = torrent_parser(file_name, file_type="subtitle") + assert sf.title == "海盗战记" + assert sf.season == 1 + assert sf.episode == 1 + assert sf.language == "zh" + + file_name = "水星的魔女(2022) S00E19.mp4" + bf = torrent_parser(file_name, season=0) + assert bf.title == "水星的魔女(2022)" + assert bf.season == 0 + assert bf.episode == 19 + + file_name = "【失眠搬运组】放学后失眠的你-Kimi wa Houkago Insomnia - 06 [bilibili - 1080p AVC1 CHS-JP].mp4" + bf = torrent_parser(file_name, season=1) + assert bf.title == "放学后失眠的你-Kimi wa Houkago Insomnia" + assert bf.season == 1 + assert bf.episode == 6 + + +class TestGetPathBasename: + def test_regular_path(self): + assert get_path_basename('/path/to/file.txt') == 'file.txt' + + def test_empty_path(self): + assert get_path_basename('') == '' + + def test_path_with_trailing_slash(self): + assert get_path_basename('/path/to/folder/') == 'folder' + + def test_windows_path(self): + assert get_path_basename('C:\\path\\to\\file.txt') == 'file.txt' From 0faabb0389fb856c1385176a37c6bf678c9b4731 Mon Sep 17 00:00:00 2001 From: 100gle Date: Tue, 13 Jun 2023 23:48:34 +0800 Subject: [PATCH 03/11] bugfix: fix duplicated `src` prefix --- backend/src/module/conf/config.py | 2 +- backend/src/module/conf/log.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/src/module/conf/config.py b/backend/src/module/conf/config.py index 5b3cf67f..402d9f9f 100644 --- a/backend/src/module/conf/config.py +++ b/backend/src/module/conf/config.py @@ -10,7 +10,7 @@ from module.models.config import Config from .const import ENV_TO_ATTR logger = logging.getLogger(__name__) -CONFIG_ROOT = pathlib.Path("src/config") +CONFIG_ROOT = pathlib.Path("config") try: diff --git a/backend/src/module/conf/log.py b/backend/src/module/conf/log.py index 9d28fe83..76ce5383 100644 --- a/backend/src/module/conf/log.py +++ b/backend/src/module/conf/log.py @@ -3,7 +3,7 @@ import pathlib from .config import settings -LOG_ROOT = pathlib.Path("src/data") +LOG_ROOT = pathlib.Path("data") LOG_PATH = LOG_ROOT / "log.txt" From 7fdf81368325fd04f2e30f5c7adc30a6290e21e9 Mon Sep 17 00:00:00 2001 From: 100gle Date: Tue, 13 Jun 2023 23:52:11 +0800 Subject: [PATCH 04/11] ci: add skip mark for test case --- backend/src/test/test_torrent_parser.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/backend/src/test/test_torrent_parser.py b/backend/src/test/test_torrent_parser.py index 5d4e6cb7..cf0a3cd0 100644 --- a/backend/src/test/test_torrent_parser.py +++ b/backend/src/test/test_torrent_parser.py @@ -1,3 +1,6 @@ +import sys + +import pytest from module.parser.analyser import torrent_parser from module.parser.analyser.torrent_parser import get_path_basename @@ -80,5 +83,6 @@ class TestGetPathBasename: def test_path_with_trailing_slash(self): assert get_path_basename('/path/to/folder/') == 'folder' + @pytest.mark.skipif(not sys.platform.startswith("win"), reason="Windows specific") def test_windows_path(self): assert get_path_basename('C:\\path\\to\\file.txt') == 'file.txt' From a20de4f0ad0ca2c3e7c53d9cc442cd0fa16762f4 Mon Sep 17 00:00:00 2001 From: 100gle Date: Mon, 3 Jul 2023 20:43:52 +0800 Subject: [PATCH 05/11] refactor: shorten import content --- backend/src/module/conf/__init__.py | 6 +++--- backend/src/module/conf/config.py | 4 ++-- backend/src/module/conf/log.py | 4 ++-- backend/src/module/parser/analyser/torrent_parser.py | 6 +++--- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/backend/src/module/conf/__init__.py b/backend/src/module/conf/__init__.py index 8af4f5ea..cc64c8a3 100644 --- a/backend/src/module/conf/__init__.py +++ b/backend/src/module/conf/__init__.py @@ -1,10 +1,10 @@ -import pathlib +from pathlib import Path from .config import VERSION, settings from .log import LOG_PATH, setup_logger TMDB_API = "32b19d6a05b512190a056fa4e747cbbc" -DATA_PATH = pathlib.Path("data/data.db") -LEGACY_DATA_PATH = pathlib.Path("data/data.json") +DATA_PATH = Path("data/data.db") +LEGACY_DATA_PATH = Path("data/data.json") PLATFORM = "Windows" if "\\" in settings.downloader.path else "Unix" diff --git a/backend/src/module/conf/config.py b/backend/src/module/conf/config.py index 402d9f9f..6e466dbc 100644 --- a/backend/src/module/conf/config.py +++ b/backend/src/module/conf/config.py @@ -1,7 +1,7 @@ import json import logging import os -import pathlib +from pathlib import Path from dotenv import load_dotenv @@ -10,7 +10,7 @@ from module.models.config import Config from .const import ENV_TO_ATTR logger = logging.getLogger(__name__) -CONFIG_ROOT = pathlib.Path("config") +CONFIG_ROOT = Path("config") try: diff --git a/backend/src/module/conf/log.py b/backend/src/module/conf/log.py index 76ce5383..37a0b905 100644 --- a/backend/src/module/conf/log.py +++ b/backend/src/module/conf/log.py @@ -1,9 +1,9 @@ import logging -import pathlib +from pathlib import Path from .config import settings -LOG_ROOT = pathlib.Path("data") +LOG_ROOT = Path("data") LOG_PATH = LOG_ROOT / "log.txt" diff --git a/backend/src/module/parser/analyser/torrent_parser.py b/backend/src/module/parser/analyser/torrent_parser.py index 366d91ff..d4063526 100644 --- a/backend/src/module/parser/analyser/torrent_parser.py +++ b/backend/src/module/parser/analyser/torrent_parser.py @@ -1,5 +1,5 @@ import logging -import pathlib +from pathlib import Path import re from module.models import EpisodeFile, SubtitleFile @@ -31,7 +31,7 @@ def get_path_basename(torrent_path: str) -> str: :return: A string representing the basename of the given path. :rtype: str """ - return pathlib.Path(torrent_path).name + return Path(torrent_path).name def get_group(group_and_title) -> tuple[str | None, str]: @@ -81,7 +81,7 @@ def torrent_parser( else: title, _ = get_season_and_title(title) episode = int(match_obj.group(2)) - suffix = pathlib.Path(torrent_path).suffix + suffix = Path(torrent_path).suffix if file_type == "media": return EpisodeFile( media_path=torrent_path, From 653119dbad950c2143f41d2928c4820771cf2720 Mon Sep 17 00:00:00 2001 From: 100gle Date: Mon, 3 Jul 2023 20:51:05 +0800 Subject: [PATCH 06/11] chore: fix CRLF issue --- backend/src/module/conf/__init__.py | 20 +-- backend/src/module/conf/config.py | 182 ++++++++++---------- backend/src/module/conf/log.py | 62 +++---- backend/src/module/core/status.py | 108 ++++++------ backend/src/module/update/data_migration.py | 44 ++--- backend/src/test/test_torrent_parser.py | 176 +++++++++---------- 6 files changed, 296 insertions(+), 296 deletions(-) diff --git a/backend/src/module/conf/__init__.py b/backend/src/module/conf/__init__.py index cc64c8a3..00dbc6fe 100644 --- a/backend/src/module/conf/__init__.py +++ b/backend/src/module/conf/__init__.py @@ -1,10 +1,10 @@ -from pathlib import Path - -from .config import VERSION, settings -from .log import LOG_PATH, setup_logger - -TMDB_API = "32b19d6a05b512190a056fa4e747cbbc" -DATA_PATH = Path("data/data.db") -LEGACY_DATA_PATH = Path("data/data.json") - -PLATFORM = "Windows" if "\\" in settings.downloader.path else "Unix" +from pathlib import Path + +from .config import VERSION, settings +from .log import LOG_PATH, setup_logger + +TMDB_API = "32b19d6a05b512190a056fa4e747cbbc" +DATA_PATH = Path("data/data.db") +LEGACY_DATA_PATH = Path("data/data.json") + +PLATFORM = "Windows" if "\\" in settings.downloader.path else "Unix" diff --git a/backend/src/module/conf/config.py b/backend/src/module/conf/config.py index 6e466dbc..00debcce 100644 --- a/backend/src/module/conf/config.py +++ b/backend/src/module/conf/config.py @@ -1,91 +1,91 @@ -import json -import logging -import os -from pathlib import Path - -from dotenv import load_dotenv - -from module.models.config import Config - -from .const import ENV_TO_ATTR - -logger = logging.getLogger(__name__) -CONFIG_ROOT = Path("config") - - -try: - from module.__version__ import VERSION -except ImportError: - logger.info("Can't find version info, use DEV_VERSION instead") - VERSION = "DEV_VERSION" - -CONFIG_PATH = ( - CONFIG_ROOT / "config_dev.json" - if VERSION == "DEV_VERSION" - else CONFIG_ROOT / "config.json" -).resolve() - - -class Settings(Config): - def __init__(self): - super().__init__() - if CONFIG_PATH.exists(): - self.load() - self.save() - else: - self.init() - - def load(self): - with open(CONFIG_PATH, "r", encoding="utf-8") as f: - config = json.load(f) - config_obj = Config.parse_obj(config) - self.__dict__.update(config_obj.__dict__) - logger.info("Config loaded") - - def save(self, config_dict: dict | None = None): - if not config_dict: - config_dict = self.dict() - with open(CONFIG_PATH, "w", encoding="utf-8") as f: - json.dump(config_dict, f, indent=4, ensure_ascii=False) - - def init(self): - load_dotenv(".env") - self.__load_from_env() - self.save() - - @property - def rss_link(self) -> str: - if "://" not in self.rss_parser.custom_url: - return f"https://{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}" - return ( - f"{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}" - ) - - def __load_from_env(self): - config_dict = self.dict() - for key, section in ENV_TO_ATTR.items(): - for env, attr in section.items(): - if env in os.environ: - if isinstance(attr, list): - for _attr in attr: - attr_name = _attr[0] if isinstance(_attr, tuple) else _attr - config_dict[key][attr_name] = self.__val_from_env( - env, _attr - ) - else: - attr_name = attr[0] if isinstance(attr, tuple) else attr - config_dict[key][attr_name] = self.__val_from_env(env, attr) - config_obj = Config.parse_obj(config_dict) - self.__dict__.update(config_obj.__dict__) - logger.info("Config loaded from env") - - @staticmethod - def __val_from_env(env: str, attr: tuple): - if isinstance(attr, tuple): - conv_func = attr[1] - return conv_func(os.environ[env]) - else: - return os.environ[env] - - -settings = Settings() +import json +import logging +import os +from pathlib import Path + +from dotenv import load_dotenv + +from module.models.config import Config + +from .const import ENV_TO_ATTR + +logger = logging.getLogger(__name__) +CONFIG_ROOT = Path("config") + + +try: + from module.__version__ import VERSION +except ImportError: + logger.info("Can't find version info, use DEV_VERSION instead") + VERSION = "DEV_VERSION" + +CONFIG_PATH = ( + CONFIG_ROOT / "config_dev.json" + if VERSION == "DEV_VERSION" + else CONFIG_ROOT / "config.json" +).resolve() + + +class Settings(Config): + def __init__(self): + super().__init__() + if CONFIG_PATH.exists(): + self.load() + self.save() + else: + self.init() + + def load(self): + with open(CONFIG_PATH, "r", encoding="utf-8") as f: + config = json.load(f) + config_obj = Config.parse_obj(config) + self.__dict__.update(config_obj.__dict__) + logger.info("Config loaded") + + def save(self, config_dict: dict | None = None): + if not config_dict: + config_dict = self.dict() + with open(CONFIG_PATH, "w", encoding="utf-8") as f: + json.dump(config_dict, f, indent=4, ensure_ascii=False) + + def init(self): + load_dotenv(".env") + self.__load_from_env() + self.save() + + @property + def rss_link(self) -> str: + if "://" not in self.rss_parser.custom_url: + return f"https://{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}" + return ( + f"{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}" + ) + + def __load_from_env(self): + config_dict = self.dict() + for key, section in ENV_TO_ATTR.items(): + for env, attr in section.items(): + if env in os.environ: + if isinstance(attr, list): + for _attr in attr: + attr_name = _attr[0] if isinstance(_attr, tuple) else _attr + config_dict[key][attr_name] = self.__val_from_env( + env, _attr + ) + else: + attr_name = attr[0] if isinstance(attr, tuple) else attr + config_dict[key][attr_name] = self.__val_from_env(env, attr) + config_obj = Config.parse_obj(config_dict) + self.__dict__.update(config_obj.__dict__) + logger.info("Config loaded from env") + + @staticmethod + def __val_from_env(env: str, attr: tuple): + if isinstance(attr, tuple): + conv_func = attr[1] + return conv_func(os.environ[env]) + else: + return os.environ[env] + + +settings = Settings() diff --git a/backend/src/module/conf/log.py b/backend/src/module/conf/log.py index 37a0b905..2d567424 100644 --- a/backend/src/module/conf/log.py +++ b/backend/src/module/conf/log.py @@ -1,31 +1,31 @@ -import logging -from pathlib import Path - -from .config import settings - -LOG_ROOT = Path("data") -LOG_PATH = LOG_ROOT / "log.txt" - - -def setup_logger(level: int = logging.INFO, reset: bool = False): - level = logging.DEBUG if settings.log.debug_enable else level - LOG_ROOT.mkdir(exist_ok=True) - - if reset and LOG_PATH.exists(): - LOG_PATH.unlink(missing_ok=True) - - logging.addLevelName(logging.DEBUG, "DEBUG:") - logging.addLevelName(logging.INFO, "INFO:") - logging.addLevelName(logging.WARNING, "WARNING:") - LOGGING_FORMAT = "[%(asctime)s] %(levelname)-8s %(message)s" - TIME_FORMAT = "%Y-%m-%d %H:%M:%S" - logging.basicConfig( - level=level, - format=LOGGING_FORMAT, - datefmt=TIME_FORMAT, - encoding="utf-8", - handlers=[ - logging.FileHandler(LOG_PATH, encoding="utf-8"), - logging.StreamHandler(), - ], - ) +import logging +from pathlib import Path + +from .config import settings + +LOG_ROOT = Path("data") +LOG_PATH = LOG_ROOT / "log.txt" + + +def setup_logger(level: int = logging.INFO, reset: bool = False): + level = logging.DEBUG if settings.log.debug_enable else level + LOG_ROOT.mkdir(exist_ok=True) + + if reset and LOG_PATH.exists(): + LOG_PATH.unlink(missing_ok=True) + + logging.addLevelName(logging.DEBUG, "DEBUG:") + logging.addLevelName(logging.INFO, "INFO:") + logging.addLevelName(logging.WARNING, "WARNING:") + LOGGING_FORMAT = "[%(asctime)s] %(levelname)-8s %(message)s" + TIME_FORMAT = "%Y-%m-%d %H:%M:%S" + logging.basicConfig( + level=level, + format=LOGGING_FORMAT, + datefmt=TIME_FORMAT, + encoding="utf-8", + handlers=[ + logging.FileHandler(LOG_PATH, encoding="utf-8"), + logging.StreamHandler(), + ], + ) diff --git a/backend/src/module/core/status.py b/backend/src/module/core/status.py index 674e568b..9d98daaf 100644 --- a/backend/src/module/core/status.py +++ b/backend/src/module/core/status.py @@ -1,54 +1,54 @@ -import asyncio -import threading - -from module.checker import Checker -from module.conf import LEGACY_DATA_PATH - - -class ProgramStatus(Checker): - def __init__(self): - super().__init__() - self.stop_event = threading.Event() - self.lock = threading.Lock() - self._downloader_status = False - self._torrents_status = False - self.event = asyncio.Event() - - @property - def is_running(self): - if self.stop_event.is_set() or self.check_first_run(): - return False - else: - return True - - @property - def is_stopped(self): - return self.stop_event.is_set() - - @property - def downloader_status(self): - if not self._downloader_status: - self._downloader_status = self.check_downloader() - return self._downloader_status - - @property - def torrents_status(self): - if not self._torrents_status: - self._torrents_status = self.check_torrents() - return self._torrents_status - - @property - def enable_rss(self): - return self.check_analyser() - - @property - def enable_renamer(self): - return self.check_renamer() - - @property - def first_run(self): - return self.check_first_run() - - @property - def legacy_data(self): - return LEGACY_DATA_PATH.exists() +import asyncio +import threading + +from module.checker import Checker +from module.conf import LEGACY_DATA_PATH + + +class ProgramStatus(Checker): + def __init__(self): + super().__init__() + self.stop_event = threading.Event() + self.lock = threading.Lock() + self._downloader_status = False + self._torrents_status = False + self.event = asyncio.Event() + + @property + def is_running(self): + if self.stop_event.is_set() or self.check_first_run(): + return False + else: + return True + + @property + def is_stopped(self): + return self.stop_event.is_set() + + @property + def downloader_status(self): + if not self._downloader_status: + self._downloader_status = self.check_downloader() + return self._downloader_status + + @property + def torrents_status(self): + if not self._torrents_status: + self._torrents_status = self.check_torrents() + return self._torrents_status + + @property + def enable_rss(self): + return self.check_analyser() + + @property + def enable_renamer(self): + return self.check_renamer() + + @property + def first_run(self): + return self.check_first_run() + + @property + def legacy_data(self): + return LEGACY_DATA_PATH.exists() diff --git a/backend/src/module/update/data_migration.py b/backend/src/module/update/data_migration.py index 4bdd5b73..b9ddf8db 100644 --- a/backend/src/module/update/data_migration.py +++ b/backend/src/module/update/data_migration.py @@ -1,22 +1,22 @@ -import os - -from module.conf import LEGACY_DATA_PATH -from module.database import BangumiDatabase -from module.models import BangumiData -from module.utils import json_config - - -def data_migration(): - if not LEGACY_DATA_PATH.exists(): - return False - old_data = json_config.load(LEGACY_DATA_PATH) - infos = old_data["bangumi_info"] - rss_link = old_data["rss_link"] - new_data = [] - for info in infos: - new_data.append(BangumiData(**info, rss_link=[rss_link])) - with BangumiDatabase() as database: - database.update_table() - database.insert_list(new_data) - - LEGACY_DATA_PATH.unlink(missing_ok=True) +import os + +from module.conf import LEGACY_DATA_PATH +from module.database import BangumiDatabase +from module.models import BangumiData +from module.utils import json_config + + +def data_migration(): + if not LEGACY_DATA_PATH.exists(): + return False + old_data = json_config.load(LEGACY_DATA_PATH) + infos = old_data["bangumi_info"] + rss_link = old_data["rss_link"] + new_data = [] + for info in infos: + new_data.append(BangumiData(**info, rss_link=[rss_link])) + with BangumiDatabase() as database: + database.update_table() + database.insert_list(new_data) + + LEGACY_DATA_PATH.unlink(missing_ok=True) diff --git a/backend/src/test/test_torrent_parser.py b/backend/src/test/test_torrent_parser.py index cf0a3cd0..10e6a546 100644 --- a/backend/src/test/test_torrent_parser.py +++ b/backend/src/test/test_torrent_parser.py @@ -1,88 +1,88 @@ -import sys - -import pytest -from module.parser.analyser import torrent_parser -from module.parser.analyser.torrent_parser import get_path_basename - - -def test_torrent_parser(): - file_name = "[Lilith-Raws] Boku no Kokoro no Yabai Yatsu - 01 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4].mp4" - bf = torrent_parser(file_name) - assert bf.title == "Boku no Kokoro no Yabai Yatsu" - assert bf.group == "Lilith-Raws" - assert bf.episode == 1 - assert bf.season == 1 - - file_name = "[Sakurato] Tonikaku Kawaii S2 [01][AVC-8bit 1080p AAC][CHS].mp4" - bf = torrent_parser(file_name) - assert bf.title == "Tonikaku Kawaii" - assert bf.group == "Sakurato" - assert bf.episode == 1 - assert bf.season == 2 - - file_name = "[SweetSub&LoliHouse] Heavenly Delusion - 01 [WebRip 1080p HEVC-10bit AAC ASSx2].mkv" - bf = torrent_parser(file_name) - assert bf.title == "Heavenly Delusion" - assert bf.group == "SweetSub&LoliHouse" - assert bf.episode == 1 - assert bf.season == 1 - - file_name = "[SBSUB][CONAN][1082][V2][1080P][AVC_AAC][CHS_JP](C1E4E331).mp4" - bf = torrent_parser(file_name) - assert bf.title == "CONAN" - assert bf.group == "SBSUB" - assert bf.episode == 1082 - assert bf.season == 1 - - file_name = "海盗战记 (2019) S01E01.mp4" - bf = torrent_parser(file_name) - assert bf.title == "海盗战记 (2019)" - assert bf.episode == 1 - assert bf.season == 1 - - file_name = "海盗战记/海盗战记 S01E01.mp4" - bf = torrent_parser(file_name) - assert bf.title == "海盗战记" - assert bf.episode == 1 - assert bf.season == 1 - - file_name = "海盗战记 S01E01.zh-tw.ass" - sf = torrent_parser(file_name, file_type="subtitle") - assert sf.title == "海盗战记" - assert sf.episode == 1 - assert sf.season == 1 - assert sf.language == "zh-tw" - - file_name = "海盗战记 S01E01.SC.ass" - sf = torrent_parser(file_name, file_type="subtitle") - assert sf.title == "海盗战记" - assert sf.season == 1 - assert sf.episode == 1 - assert sf.language == "zh" - - file_name = "水星的魔女(2022) S00E19.mp4" - bf = torrent_parser(file_name, season=0) - assert bf.title == "水星的魔女(2022)" - assert bf.season == 0 - assert bf.episode == 19 - - file_name = "【失眠搬运组】放学后失眠的你-Kimi wa Houkago Insomnia - 06 [bilibili - 1080p AVC1 CHS-JP].mp4" - bf = torrent_parser(file_name, season=1) - assert bf.title == "放学后失眠的你-Kimi wa Houkago Insomnia" - assert bf.season == 1 - assert bf.episode == 6 - - -class TestGetPathBasename: - def test_regular_path(self): - assert get_path_basename('/path/to/file.txt') == 'file.txt' - - def test_empty_path(self): - assert get_path_basename('') == '' - - def test_path_with_trailing_slash(self): - assert get_path_basename('/path/to/folder/') == 'folder' - - @pytest.mark.skipif(not sys.platform.startswith("win"), reason="Windows specific") - def test_windows_path(self): - assert get_path_basename('C:\\path\\to\\file.txt') == 'file.txt' +import sys + +import pytest +from module.parser.analyser import torrent_parser +from module.parser.analyser.torrent_parser import get_path_basename + + +def test_torrent_parser(): + file_name = "[Lilith-Raws] Boku no Kokoro no Yabai Yatsu - 01 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4].mp4" + bf = torrent_parser(file_name) + assert bf.title == "Boku no Kokoro no Yabai Yatsu" + assert bf.group == "Lilith-Raws" + assert bf.episode == 1 + assert bf.season == 1 + + file_name = "[Sakurato] Tonikaku Kawaii S2 [01][AVC-8bit 1080p AAC][CHS].mp4" + bf = torrent_parser(file_name) + assert bf.title == "Tonikaku Kawaii" + assert bf.group == "Sakurato" + assert bf.episode == 1 + assert bf.season == 2 + + file_name = "[SweetSub&LoliHouse] Heavenly Delusion - 01 [WebRip 1080p HEVC-10bit AAC ASSx2].mkv" + bf = torrent_parser(file_name) + assert bf.title == "Heavenly Delusion" + assert bf.group == "SweetSub&LoliHouse" + assert bf.episode == 1 + assert bf.season == 1 + + file_name = "[SBSUB][CONAN][1082][V2][1080P][AVC_AAC][CHS_JP](C1E4E331).mp4" + bf = torrent_parser(file_name) + assert bf.title == "CONAN" + assert bf.group == "SBSUB" + assert bf.episode == 1082 + assert bf.season == 1 + + file_name = "海盗战记 (2019) S01E01.mp4" + bf = torrent_parser(file_name) + assert bf.title == "海盗战记 (2019)" + assert bf.episode == 1 + assert bf.season == 1 + + file_name = "海盗战记/海盗战记 S01E01.mp4" + bf = torrent_parser(file_name) + assert bf.title == "海盗战记" + assert bf.episode == 1 + assert bf.season == 1 + + file_name = "海盗战记 S01E01.zh-tw.ass" + sf = torrent_parser(file_name, file_type="subtitle") + assert sf.title == "海盗战记" + assert sf.episode == 1 + assert sf.season == 1 + assert sf.language == "zh-tw" + + file_name = "海盗战记 S01E01.SC.ass" + sf = torrent_parser(file_name, file_type="subtitle") + assert sf.title == "海盗战记" + assert sf.season == 1 + assert sf.episode == 1 + assert sf.language == "zh" + + file_name = "水星的魔女(2022) S00E19.mp4" + bf = torrent_parser(file_name, season=0) + assert bf.title == "水星的魔女(2022)" + assert bf.season == 0 + assert bf.episode == 19 + + file_name = "【失眠搬运组】放学后失眠的你-Kimi wa Houkago Insomnia - 06 [bilibili - 1080p AVC1 CHS-JP].mp4" + bf = torrent_parser(file_name, season=1) + assert bf.title == "放学后失眠的你-Kimi wa Houkago Insomnia" + assert bf.season == 1 + assert bf.episode == 6 + + +class TestGetPathBasename: + def test_regular_path(self): + assert get_path_basename("/path/to/file.txt") == "file.txt" + + def test_empty_path(self): + assert get_path_basename("") == "" + + def test_path_with_trailing_slash(self): + assert get_path_basename("/path/to/folder/") == "folder" + + @pytest.mark.skipif(not sys.platform.startswith("win"), reason="Windows specific") + def test_windows_path(self): + assert get_path_basename("C:\\path\\to\\file.txt") == "file.txt" From 840ffd8a4ae12c8f8ab964aa1997bf8e15b22bfb Mon Sep 17 00:00:00 2001 From: 100gle Date: Mon, 3 Jul 2023 21:41:58 +0800 Subject: [PATCH 07/11] refactor: clean left `os.path` code --- backend/src/module/database/orm/connector.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/backend/src/module/database/orm/connector.py b/backend/src/module/database/orm/connector.py index 05be0df4..ef6e972a 100644 --- a/backend/src/module/database/orm/connector.py +++ b/backend/src/module/database/orm/connector.py @@ -1,4 +1,5 @@ -import os +from os import PathLike +from pathlib import Path import sqlite3 from .delete import Delete @@ -10,10 +11,13 @@ from module.conf import DATA_PATH class Connector: - def __init__(self, table_name: str, data: dict, database: str = DATA_PATH): + def __init__( + self, table_name: str, data: dict, database: PathLike[str] | Path = DATA_PATH + ): # Create folder if not exists - if not os.path.exists(os.path.dirname(DATA_PATH)): - os.makedirs(os.path.dirname(DATA_PATH)) + if isinstance(database, PathLike): + database = Path(database) + database.parent.mkdir(parents=True, exist_ok=True) self._conn = sqlite3.connect(database) self._cursor = self._conn.cursor() From fda4df67ee2b72c437396d29a7cc72dba0a5f1c0 Mon Sep 17 00:00:00 2001 From: 100gle Date: Mon, 3 Jul 2023 21:44:48 +0800 Subject: [PATCH 08/11] bugfix: add missing type --- backend/src/module/database/orm/connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/module/database/orm/connector.py b/backend/src/module/database/orm/connector.py index ef6e972a..d250bbdc 100644 --- a/backend/src/module/database/orm/connector.py +++ b/backend/src/module/database/orm/connector.py @@ -15,7 +15,7 @@ class Connector: self, table_name: str, data: dict, database: PathLike[str] | Path = DATA_PATH ): # Create folder if not exists - if isinstance(database, PathLike): + if isinstance(database, PathLike, str): database = Path(database) database.parent.mkdir(parents=True, exist_ok=True) From c29f89c96817776a5c3276e3f4b1fa35b091db93 Mon Sep 17 00:00:00 2001 From: 100gle Date: Mon, 3 Jul 2023 21:46:12 +0800 Subject: [PATCH 09/11] bugfix: fix wrong usage for `isinstance` --- backend/src/module/database/orm/connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/module/database/orm/connector.py b/backend/src/module/database/orm/connector.py index d250bbdc..7f10106a 100644 --- a/backend/src/module/database/orm/connector.py +++ b/backend/src/module/database/orm/connector.py @@ -15,7 +15,7 @@ class Connector: self, table_name: str, data: dict, database: PathLike[str] | Path = DATA_PATH ): # Create folder if not exists - if isinstance(database, PathLike, str): + if isinstance(database, (PathLike, str)): database = Path(database) database.parent.mkdir(parents=True, exist_ok=True) From fc70f7f1a13bab5590edae885872cf7ac96a2db4 Mon Sep 17 00:00:00 2001 From: 100gle Date: Mon, 3 Jul 2023 21:48:38 +0800 Subject: [PATCH 10/11] chore: add TODO mark for legacy `os.path` and `ntpath` code --- backend/src/module/downloader/path.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/src/module/downloader/path.py b/backend/src/module/downloader/path.py index 30aa0400..594a2173 100644 --- a/backend/src/module/downloader/path.py +++ b/backend/src/module/downloader/path.py @@ -4,6 +4,7 @@ import re from module.conf import settings from module.models import BangumiData +# TODO: replace this logic with pathlib if ":\\" in settings.downloader.path: import ntpath as path else: From 3d7c64829c7bb064a996e9bfdbc7d6d0a7f6a9c5 Mon Sep 17 00:00:00 2001 From: 100gle Date: Tue, 4 Jul 2023 08:48:00 +0800 Subject: [PATCH 11/11] feat: clean legacy os.path logic --- backend/src/module/downloader/path.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/backend/src/module/downloader/path.py b/backend/src/module/downloader/path.py index 594a2173..f1099191 100644 --- a/backend/src/module/downloader/path.py +++ b/backend/src/module/downloader/path.py @@ -1,14 +1,11 @@ import logging +from os import PathLike import re +from pathlib import Path from module.conf import settings from module.models import BangumiData -# TODO: replace this logic with pathlib -if ":\\" in settings.downloader.path: - import ntpath as path -else: - import os.path as path logger = logging.getLogger(__name__) @@ -23,7 +20,7 @@ class TorrentPath: subtitle_list = [] for f in info.files: file_name = f.name - suffix = path.splitext(file_name)[-1] + suffix = Path(file_name).suffix if suffix.lower() in [".mp4", ".mkv"]: media_list.append(file_name) elif suffix.lower() in [".ass", ".srt"]: @@ -31,10 +28,10 @@ class TorrentPath: return media_list, subtitle_list @staticmethod - def _path_to_bangumi(save_path): + def _path_to_bangumi(save_path: PathLike[str] | str): # Split save path and download path - save_parts = save_path.split(path.sep) - download_parts = settings.downloader.path.split(path.sep) + save_parts = Path(save_path).parts + download_parts = Path(settings.downloader.path).parts # Get bangumi name and season bangumi_name = "" season = 1 @@ -46,10 +43,10 @@ class TorrentPath: return bangumi_name, season @staticmethod - def _file_depth(file_path): - return len(file_path.split(path.sep)) + def _file_depth(file_path: PathLike[str] | str): + return len(Path(file_path).parts) - def is_ep(self, file_path): + def is_ep(self, file_path: PathLike[str] | str): return self._file_depth(file_path) <= 2 @staticmethod @@ -57,8 +54,8 @@ class TorrentPath: folder = ( f"{data.official_title} ({data.year})" if data.year else data.official_title ) - save_path = path.join(settings.downloader.path, folder, f"Season {data.season}") - return save_path + save_path = Path(settings.downloader.path) / folder / f"Season {data.season}" + return str(save_path) @staticmethod def _rule_name(data: BangumiData): @@ -71,4 +68,4 @@ class TorrentPath: @staticmethod def _join_path(*args): - return path.join(*args) + return str(Path(*args))