diff --git a/backend/.vscode/settings.json b/backend/.vscode/settings.json new file mode 100644 index 00000000..24dec46e --- /dev/null +++ b/backend/.vscode/settings.json @@ -0,0 +1,8 @@ +{ + "python.formatting.provider": "none", + "python.formatting.blackPath": "black", + "editor.formatOnSave": true, + "[python]": { + "editor.defaultFormatter": "ms-python.black-formatter" + } +} diff --git a/backend/src/module/api/log.py b/backend/src/module/api/log.py index 9956dffe..88ec2c57 100644 --- a/backend/src/module/api/log.py +++ b/backend/src/module/api/log.py @@ -14,7 +14,7 @@ async def get_log(current_user=Depends(get_current_user)): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid token" ) - if os.path.isfile(LOG_PATH): + if LOG_PATH.exists(): with open(LOG_PATH, "rb") as f: return Response(f.read(), media_type="text/plain") else: @@ -27,9 +27,8 @@ async def clear_log(current_user=Depends(get_current_user)): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid token" ) - if os.path.isfile(LOG_PATH): - with open(LOG_PATH, "w") as f: - f.write("") + if LOG_PATH.exists(): + LOG_PATH.write_text("") return {"status": "ok"} else: return Response("Log file not found", status_code=404) diff --git a/backend/src/module/conf/__init__.py b/backend/src/module/conf/__init__.py index afb47703..00dbc6fe 100644 --- a/backend/src/module/conf/__init__.py +++ b/backend/src/module/conf/__init__.py @@ -1,7 +1,10 @@ +from pathlib import Path + from .config import VERSION, settings from .log import LOG_PATH, setup_logger TMDB_API = "32b19d6a05b512190a056fa4e747cbbc" -DATA_PATH = "data/data.db" +DATA_PATH = Path("data/data.db") +LEGACY_DATA_PATH = Path("data/data.json") PLATFORM = "Windows" if "\\" in settings.downloader.path else "Unix" diff --git a/backend/src/module/conf/config.py b/backend/src/module/conf/config.py index 051feeca..00debcce 100644 --- a/backend/src/module/conf/config.py +++ b/backend/src/module/conf/config.py @@ -1,6 +1,7 @@ import json import logging import os +from pathlib import Path from dotenv import load_dotenv @@ -9,25 +10,26 @@ from module.models.config import Config from .const import ENV_TO_ATTR logger = logging.getLogger(__name__) +CONFIG_ROOT = Path("config") + try: from module.__version__ import VERSION - - if VERSION == "DEV_VERSION": - logger.info("Can't find version info, use DEV_VERSION instead") - CONFIG_PATH = "config/config_dev.json" - else: - CONFIG_PATH = "config/config.json" except ImportError: logger.info("Can't find version info, use DEV_VERSION instead") VERSION = "DEV_VERSION" - CONFIG_PATH = "config/config_dev.json" + +CONFIG_PATH = ( + CONFIG_ROOT / "config_dev.json" + if VERSION == "DEV_VERSION" + else CONFIG_ROOT / "config.json" +).resolve() class Settings(Config): def __init__(self): super().__init__() - if os.path.exists(CONFIG_PATH): + if CONFIG_PATH.exists(): self.load() self.save() else: diff --git a/backend/src/module/conf/log.py b/backend/src/module/conf/log.py index 2d503e74..2d567424 100644 --- a/backend/src/module/conf/log.py +++ b/backend/src/module/conf/log.py @@ -1,17 +1,19 @@ import logging -import os +from pathlib import Path from .config import settings -LOG_PATH = "data/log.txt" +LOG_ROOT = Path("data") +LOG_PATH = LOG_ROOT / "log.txt" def setup_logger(level: int = logging.INFO, reset: bool = False): level = logging.DEBUG if settings.log.debug_enable else level - if not os.path.isdir("data"): - os.mkdir("data") - if reset and os.path.isfile(LOG_PATH): - os.remove(LOG_PATH) + LOG_ROOT.mkdir(exist_ok=True) + + if reset and LOG_PATH.exists(): + LOG_PATH.unlink(missing_ok=True) + logging.addLevelName(logging.DEBUG, "DEBUG:") logging.addLevelName(logging.INFO, "INFO:") logging.addLevelName(logging.WARNING, "WARNING:") diff --git a/backend/src/module/core/status.py b/backend/src/module/core/status.py index 6f056463..9d98daaf 100644 --- a/backend/src/module/core/status.py +++ b/backend/src/module/core/status.py @@ -1,8 +1,8 @@ import asyncio -import os.path import threading from module.checker import Checker +from module.conf import LEGACY_DATA_PATH class ProgramStatus(Checker): @@ -51,4 +51,4 @@ class ProgramStatus(Checker): @property def legacy_data(self): - return os.path.exists("data/data.json") + return LEGACY_DATA_PATH.exists() diff --git a/backend/src/module/database/connector.py b/backend/src/module/database/connector.py index 36d492cb..506bb7b1 100644 --- a/backend/src/module/database/connector.py +++ b/backend/src/module/database/connector.py @@ -10,8 +10,8 @@ logger = logging.getLogger(__name__) class DataConnector: def __init__(self): # Create folder if not exists - if not os.path.exists(os.path.dirname(DATA_PATH)): - os.makedirs(os.path.dirname(DATA_PATH)) + DATA_PATH.parent.mkdir(parents=True, exist_ok=True) + self._conn = sqlite3.connect(DATA_PATH) self._cursor = self._conn.cursor() @@ -99,10 +99,14 @@ class DataConnector: def _delete(self, table_name: str, condition: dict): condition_sql = " AND ".join([f"{key} = :{key}" for key in condition.keys()]) - self._cursor.execute(f"DELETE FROM {table_name} WHERE {condition_sql}", condition) + self._cursor.execute( + f"DELETE FROM {table_name} WHERE {condition_sql}", condition + ) self._conn.commit() - def _search(self, table_name: str, keys: list[str] | None = None, condition: dict = None): + def _search( + self, table_name: str, keys: list[str] | None = None, condition: dict = None + ): if keys is None: select_sql = "*" else: @@ -111,20 +115,25 @@ class DataConnector: self._cursor.execute(f"SELECT {select_sql} FROM {table_name}") else: custom_condition = condition.pop("_custom_condition", None) - condition_sql = " AND ".join([f"{key} = :{key}" for key in condition.keys()]) + ( - f" AND {custom_condition}" if custom_condition else "" - ) + condition_sql = " AND ".join( + [f"{key} = :{key}" for key in condition.keys()] + ) + (f" AND {custom_condition}" if custom_condition else "") self._cursor.execute( - f"SELECT {select_sql} FROM {table_name} WHERE {condition_sql}", condition + f"SELECT {select_sql} FROM {table_name} WHERE {condition_sql}", + condition, ) - def _search_data(self, table_name: str, keys: list[str] | None = None, condition: dict = None) -> dict: + def _search_data( + self, table_name: str, keys: list[str] | None = None, condition: dict = None + ) -> dict: if keys is None: keys = self.__get_table_columns(table_name) self._search(table_name, keys, condition) return dict(zip(keys, self._cursor.fetchone())) - def _search_datas(self, table_name: str, keys: list[str] | None = None, condition: dict = None) -> list[dict]: + def _search_datas( + self, table_name: str, keys: list[str] | None = None, condition: dict = None + ) -> list[dict]: if keys is None: keys = self.__get_table_columns(table_name) self._search(table_name, keys, condition) diff --git a/backend/src/module/database/orm/connector.py b/backend/src/module/database/orm/connector.py index 05be0df4..7f10106a 100644 --- a/backend/src/module/database/orm/connector.py +++ b/backend/src/module/database/orm/connector.py @@ -1,4 +1,5 @@ -import os +from os import PathLike +from pathlib import Path import sqlite3 from .delete import Delete @@ -10,10 +11,13 @@ from module.conf import DATA_PATH class Connector: - def __init__(self, table_name: str, data: dict, database: str = DATA_PATH): + def __init__( + self, table_name: str, data: dict, database: PathLike[str] | Path = DATA_PATH + ): # Create folder if not exists - if not os.path.exists(os.path.dirname(DATA_PATH)): - os.makedirs(os.path.dirname(DATA_PATH)) + if isinstance(database, (PathLike, str)): + database = Path(database) + database.parent.mkdir(parents=True, exist_ok=True) self._conn = sqlite3.connect(database) self._cursor = self._conn.cursor() diff --git a/backend/src/module/downloader/path.py b/backend/src/module/downloader/path.py index 30aa0400..f1099191 100644 --- a/backend/src/module/downloader/path.py +++ b/backend/src/module/downloader/path.py @@ -1,13 +1,11 @@ import logging +from os import PathLike import re +from pathlib import Path from module.conf import settings from module.models import BangumiData -if ":\\" in settings.downloader.path: - import ntpath as path -else: - import os.path as path logger = logging.getLogger(__name__) @@ -22,7 +20,7 @@ class TorrentPath: subtitle_list = [] for f in info.files: file_name = f.name - suffix = path.splitext(file_name)[-1] + suffix = Path(file_name).suffix if suffix.lower() in [".mp4", ".mkv"]: media_list.append(file_name) elif suffix.lower() in [".ass", ".srt"]: @@ -30,10 +28,10 @@ class TorrentPath: return media_list, subtitle_list @staticmethod - def _path_to_bangumi(save_path): + def _path_to_bangumi(save_path: PathLike[str] | str): # Split save path and download path - save_parts = save_path.split(path.sep) - download_parts = settings.downloader.path.split(path.sep) + save_parts = Path(save_path).parts + download_parts = Path(settings.downloader.path).parts # Get bangumi name and season bangumi_name = "" season = 1 @@ -45,10 +43,10 @@ class TorrentPath: return bangumi_name, season @staticmethod - def _file_depth(file_path): - return len(file_path.split(path.sep)) + def _file_depth(file_path: PathLike[str] | str): + return len(Path(file_path).parts) - def is_ep(self, file_path): + def is_ep(self, file_path: PathLike[str] | str): return self._file_depth(file_path) <= 2 @staticmethod @@ -56,8 +54,8 @@ class TorrentPath: folder = ( f"{data.official_title} ({data.year})" if data.year else data.official_title ) - save_path = path.join(settings.downloader.path, folder, f"Season {data.season}") - return save_path + save_path = Path(settings.downloader.path) / folder / f"Season {data.season}" + return str(save_path) @staticmethod def _rule_name(data: BangumiData): @@ -70,4 +68,4 @@ class TorrentPath: @staticmethod def _join_path(*args): - return path.join(*args) + return str(Path(*args)) diff --git a/backend/src/module/manager/collector.py b/backend/src/module/manager/collector.py index c9e12304..979aa5cb 100644 --- a/backend/src/module/manager/collector.py +++ b/backend/src/module/manager/collector.py @@ -59,5 +59,3 @@ def eps_complete(): sc.collect_season(data) data.eps_collect = True bd.update_list(datas) - - diff --git a/backend/src/module/notification/notification.py b/backend/src/module/notification/notification.py index 000ef171..29ac9d95 100644 --- a/backend/src/module/notification/notification.py +++ b/backend/src/module/notification/notification.py @@ -31,8 +31,7 @@ class PostNotification: def __init__(self): Notifier = getClient(settings.notification.type) self.notifier = Notifier( - token=settings.notification.token, - chat_id=settings.notification.chat_id + token=settings.notification.token, chat_id=settings.notification.chat_id ) @staticmethod diff --git a/backend/src/module/notification/plugin/wecom.py b/backend/src/module/notification/plugin/wecom.py index dc7c811b..01c4635f 100644 --- a/backend/src/module/notification/plugin/wecom.py +++ b/backend/src/module/notification/plugin/wecom.py @@ -27,15 +27,15 @@ class WecomNotification(RequestContent): title = "【番剧更新】" + notify.official_title msg = self.gen_message(notify) picurl = notify.poster_path - #Default pic to avoid blank in message. Resolution:1068*455 + # Default pic to avoid blank in message. Resolution:1068*455 if picurl == "https://mikanani.me": picurl = "https://article.biliimg.com/bfs/article/d8bcd0408bf32594fd82f27de7d2c685829d1b2e.png" data = { - "key":self.token, + "key": self.token, "type": "news", "title": title, "msg": msg, - "picurl":picurl + "picurl": picurl, } resp = self.post_data(self.notification_url, data) logger.debug(f"Wecom notification: {resp.status_code}") diff --git a/backend/src/module/parser/analyser/raw_parser.py b/backend/src/module/parser/analyser/raw_parser.py index 22ab8a38..5de06089 100644 --- a/backend/src/module/parser/analyser/raw_parser.py +++ b/backend/src/module/parser/analyser/raw_parser.py @@ -181,6 +181,6 @@ def raw_parser(raw: str) -> Episode | None: ) -if __name__ == '__main__': +if __name__ == "__main__": title = "[动漫国字幕组&LoliHouse] THE MARGINAL SERVICE - 08 [WebRip 1080p HEVC-10bit AAC][简繁内封字幕]" print(raw_parser(title)) diff --git a/backend/src/module/parser/analyser/tmdb_parser.py b/backend/src/module/parser/analyser/tmdb_parser.py index 449d79fd..393eec4e 100644 --- a/backend/src/module/parser/analyser/tmdb_parser.py +++ b/backend/src/module/parser/analyser/tmdb_parser.py @@ -16,14 +16,13 @@ class TMDBInfo: year: str -LANGUAGE = { - "zh": "zh-CN", - "jp": "ja-JP", - "en": "en-US" -} +LANGUAGE = {"zh": "zh-CN", "jp": "ja-JP", "en": "en-US"} + def search_url(e): return f"https://api.themoviedb.org/3/search/tv?api_key={TMDB_API}&page=1&query={e}&include_adult=false" + + def info_url(e, key): return f"https://api.themoviedb.org/3/tv/{e}?api_key={TMDB_API}&language={LANGUAGE[key]}" @@ -68,8 +67,9 @@ def tmdb_parser(title, language) -> TMDBInfo | None: { "season": s.get("name"), "air_date": s.get("air_date"), - "poster_path": s.get("poster_path") - } for s in info_content.get("seasons") + "poster_path": s.get("poster_path"), + } + for s in info_content.get("seasons") ] last_season = get_season(season) original_title = info_content.get("original_name") @@ -81,7 +81,7 @@ def tmdb_parser(title, language) -> TMDBInfo | None: original_title, season, last_season, - str(year_number) + str(year_number), ) else: return None diff --git a/backend/src/module/parser/analyser/torrent_parser.py b/backend/src/module/parser/analyser/torrent_parser.py index a7eecdb5..d4063526 100644 --- a/backend/src/module/parser/analyser/torrent_parser.py +++ b/backend/src/module/parser/analyser/torrent_parser.py @@ -1,6 +1,5 @@ import logging -import ntpath as win_path -import os.path as unix_path +from pathlib import Path import re from module.models import EpisodeFile, SubtitleFile @@ -23,11 +22,16 @@ SUBTITLE_LANG = { } -def split_path(torrent_path: str) -> str: - if PLATFORM == "Windows": - return win_path.split(torrent_path)[-1] - else: - return unix_path.split(torrent_path)[-1] +def get_path_basename(torrent_path: str) -> str: + """ + Returns the basename of a path string. + + :param torrent_path: A string representing a path to a file. + :type torrent_path: str + :return: A string representing the basename of the given path. + :rtype: str + """ + return Path(torrent_path).name def get_group(group_and_title) -> tuple[str | None, str]: @@ -64,7 +68,7 @@ def torrent_parser( season: int | None = None, file_type: str = "media", ) -> EpisodeFile | SubtitleFile: - media_path = split_path(torrent_path) + media_path = get_path_basename(torrent_path) for rule in RULES: if torrent_name: match_obj = re.match(rule, torrent_name, re.I) @@ -77,7 +81,7 @@ def torrent_parser( else: title, _ = get_season_and_title(title) episode = int(match_obj.group(2)) - suffix = unix_path.splitext(torrent_path)[-1] + suffix = Path(torrent_path).suffix if file_type == "media": return EpisodeFile( media_path=torrent_path, diff --git a/backend/src/module/update/data_migration.py b/backend/src/module/update/data_migration.py index 0540ea64..b9ddf8db 100644 --- a/backend/src/module/update/data_migration.py +++ b/backend/src/module/update/data_migration.py @@ -1,14 +1,15 @@ import os +from module.conf import LEGACY_DATA_PATH from module.database import BangumiDatabase from module.models import BangumiData from module.utils import json_config def data_migration(): - if not os.path.isfile("data/data.json"): + if not LEGACY_DATA_PATH.exists(): return False - old_data = json_config.load("data/data.json") + old_data = json_config.load(LEGACY_DATA_PATH) infos = old_data["bangumi_info"] rss_link = old_data["rss_link"] new_data = [] @@ -17,4 +18,5 @@ def data_migration(): with BangumiDatabase() as database: database.update_table() database.insert_list(new_data) - os.remove("data/data.json") + + LEGACY_DATA_PATH.unlink(missing_ok=True) diff --git a/backend/src/test/test_torrent_parser.py b/backend/src/test/test_torrent_parser.py index 54832c7f..10e6a546 100644 --- a/backend/src/test/test_torrent_parser.py +++ b/backend/src/test/test_torrent_parser.py @@ -1,4 +1,8 @@ +import sys + +import pytest from module.parser.analyser import torrent_parser +from module.parser.analyser.torrent_parser import get_path_basename def test_torrent_parser(): @@ -67,3 +71,18 @@ def test_torrent_parser(): assert bf.title == "放学后失眠的你-Kimi wa Houkago Insomnia" assert bf.season == 1 assert bf.episode == 6 + + +class TestGetPathBasename: + def test_regular_path(self): + assert get_path_basename("/path/to/file.txt") == "file.txt" + + def test_empty_path(self): + assert get_path_basename("") == "" + + def test_path_with_trailing_slash(self): + assert get_path_basename("/path/to/folder/") == "folder" + + @pytest.mark.skipif(not sys.platform.startswith("win"), reason="Windows specific") + def test_windows_path(self): + assert get_path_basename("C:\\path\\to\\file.txt") == "file.txt"