diff --git a/backend/.vscode/settings.json b/backend/.vscode/settings.json new file mode 100644 index 00000000..24dec46e --- /dev/null +++ b/backend/.vscode/settings.json @@ -0,0 +1,8 @@ +{ + "python.formatting.provider": "none", + "python.formatting.blackPath": "black", + "editor.formatOnSave": true, + "[python]": { + "editor.defaultFormatter": "ms-python.black-formatter" + } +} diff --git a/backend/src/module/api/log.py b/backend/src/module/api/log.py index 9956dffe..88ec2c57 100644 --- a/backend/src/module/api/log.py +++ b/backend/src/module/api/log.py @@ -14,7 +14,7 @@ async def get_log(current_user=Depends(get_current_user)): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid token" ) - if os.path.isfile(LOG_PATH): + if LOG_PATH.exists(): with open(LOG_PATH, "rb") as f: return Response(f.read(), media_type="text/plain") else: @@ -27,9 +27,8 @@ async def clear_log(current_user=Depends(get_current_user)): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid token" ) - if os.path.isfile(LOG_PATH): - with open(LOG_PATH, "w") as f: - f.write("") + if LOG_PATH.exists(): + LOG_PATH.write_text("") return {"status": "ok"} else: return Response("Log file not found", status_code=404) diff --git a/backend/src/module/conf/__init__.py b/backend/src/module/conf/__init__.py index afb47703..8af4f5ea 100644 --- a/backend/src/module/conf/__init__.py +++ b/backend/src/module/conf/__init__.py @@ -1,7 +1,10 @@ -from .config import VERSION, settings -from .log import LOG_PATH, setup_logger - -TMDB_API = "32b19d6a05b512190a056fa4e747cbbc" -DATA_PATH = "data/data.db" - -PLATFORM = "Windows" if "\\" in settings.downloader.path else "Unix" +import pathlib + +from .config import VERSION, settings +from .log import LOG_PATH, setup_logger + +TMDB_API = "32b19d6a05b512190a056fa4e747cbbc" +DATA_PATH = pathlib.Path("data/data.db") +LEGACY_DATA_PATH = pathlib.Path("data/data.json") + +PLATFORM = "Windows" if "\\" in settings.downloader.path else "Unix" diff --git a/backend/src/module/conf/config.py b/backend/src/module/conf/config.py index 051feeca..5b3cf67f 100644 --- a/backend/src/module/conf/config.py +++ b/backend/src/module/conf/config.py @@ -1,89 +1,91 @@ -import json -import logging -import os - -from dotenv import load_dotenv - -from module.models.config import Config - -from .const import ENV_TO_ATTR - -logger = logging.getLogger(__name__) - -try: - from module.__version__ import VERSION - - if VERSION == "DEV_VERSION": - logger.info("Can't find version info, use DEV_VERSION instead") - CONFIG_PATH = "config/config_dev.json" - else: - CONFIG_PATH = "config/config.json" -except ImportError: - logger.info("Can't find version info, use DEV_VERSION instead") - VERSION = "DEV_VERSION" - CONFIG_PATH = "config/config_dev.json" - - -class Settings(Config): - def __init__(self): - super().__init__() - if os.path.exists(CONFIG_PATH): - self.load() - self.save() - else: - self.init() - - def load(self): - with open(CONFIG_PATH, "r", encoding="utf-8") as f: - config = json.load(f) - config_obj = Config.parse_obj(config) - self.__dict__.update(config_obj.__dict__) - logger.info("Config loaded") - - def save(self, config_dict: dict | None = None): - if not config_dict: - config_dict = self.dict() - with open(CONFIG_PATH, "w", encoding="utf-8") as f: - json.dump(config_dict, f, indent=4, ensure_ascii=False) - - def init(self): - load_dotenv(".env") - self.__load_from_env() - self.save() - - @property - def rss_link(self) -> str: - if "://" not in self.rss_parser.custom_url: - return f"https://{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}" - return ( - f"{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}" - ) - - def __load_from_env(self): - config_dict = self.dict() - for key, section in ENV_TO_ATTR.items(): - for env, attr in section.items(): - if env in os.environ: - if isinstance(attr, list): - for _attr in attr: - attr_name = _attr[0] if isinstance(_attr, tuple) else _attr - config_dict[key][attr_name] = self.__val_from_env( - env, _attr - ) - else: - attr_name = attr[0] if isinstance(attr, tuple) else attr - config_dict[key][attr_name] = self.__val_from_env(env, attr) - config_obj = Config.parse_obj(config_dict) - self.__dict__.update(config_obj.__dict__) - logger.info("Config loaded from env") - - @staticmethod - def __val_from_env(env: str, attr: tuple): - if isinstance(attr, tuple): - conv_func = attr[1] - return conv_func(os.environ[env]) - else: - return os.environ[env] - - -settings = Settings() +import json +import logging +import os +import pathlib + +from dotenv import load_dotenv + +from module.models.config import Config + +from .const import ENV_TO_ATTR + +logger = logging.getLogger(__name__) +CONFIG_ROOT = pathlib.Path("src/config") + + +try: + from module.__version__ import VERSION +except ImportError: + logger.info("Can't find version info, use DEV_VERSION instead") + VERSION = "DEV_VERSION" + +CONFIG_PATH = ( + CONFIG_ROOT / "config_dev.json" + if VERSION == "DEV_VERSION" + else CONFIG_ROOT / "config.json" +).resolve() + + +class Settings(Config): + def __init__(self): + super().__init__() + if CONFIG_PATH.exists(): + self.load() + self.save() + else: + self.init() + + def load(self): + with open(CONFIG_PATH, "r", encoding="utf-8") as f: + config = json.load(f) + config_obj = Config.parse_obj(config) + self.__dict__.update(config_obj.__dict__) + logger.info("Config loaded") + + def save(self, config_dict: dict | None = None): + if not config_dict: + config_dict = self.dict() + with open(CONFIG_PATH, "w", encoding="utf-8") as f: + json.dump(config_dict, f, indent=4, ensure_ascii=False) + + def init(self): + load_dotenv(".env") + self.__load_from_env() + self.save() + + @property + def rss_link(self) -> str: + if "://" not in self.rss_parser.custom_url: + return f"https://{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}" + return ( + f"{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}" + ) + + def __load_from_env(self): + config_dict = self.dict() + for key, section in ENV_TO_ATTR.items(): + for env, attr in section.items(): + if env in os.environ: + if isinstance(attr, list): + for _attr in attr: + attr_name = _attr[0] if isinstance(_attr, tuple) else _attr + config_dict[key][attr_name] = self.__val_from_env( + env, _attr + ) + else: + attr_name = attr[0] if isinstance(attr, tuple) else attr + config_dict[key][attr_name] = self.__val_from_env(env, attr) + config_obj = Config.parse_obj(config_dict) + self.__dict__.update(config_obj.__dict__) + logger.info("Config loaded from env") + + @staticmethod + def __val_from_env(env: str, attr: tuple): + if isinstance(attr, tuple): + conv_func = attr[1] + return conv_func(os.environ[env]) + else: + return os.environ[env] + + +settings = Settings() diff --git a/backend/src/module/conf/log.py b/backend/src/module/conf/log.py index 2d503e74..9d28fe83 100644 --- a/backend/src/module/conf/log.py +++ b/backend/src/module/conf/log.py @@ -1,29 +1,31 @@ -import logging -import os - -from .config import settings - -LOG_PATH = "data/log.txt" - - -def setup_logger(level: int = logging.INFO, reset: bool = False): - level = logging.DEBUG if settings.log.debug_enable else level - if not os.path.isdir("data"): - os.mkdir("data") - if reset and os.path.isfile(LOG_PATH): - os.remove(LOG_PATH) - logging.addLevelName(logging.DEBUG, "DEBUG:") - logging.addLevelName(logging.INFO, "INFO:") - logging.addLevelName(logging.WARNING, "WARNING:") - LOGGING_FORMAT = "[%(asctime)s] %(levelname)-8s %(message)s" - TIME_FORMAT = "%Y-%m-%d %H:%M:%S" - logging.basicConfig( - level=level, - format=LOGGING_FORMAT, - datefmt=TIME_FORMAT, - encoding="utf-8", - handlers=[ - logging.FileHandler(LOG_PATH, encoding="utf-8"), - logging.StreamHandler(), - ], - ) +import logging +import pathlib + +from .config import settings + +LOG_ROOT = pathlib.Path("src/data") +LOG_PATH = LOG_ROOT / "log.txt" + + +def setup_logger(level: int = logging.INFO, reset: bool = False): + level = logging.DEBUG if settings.log.debug_enable else level + LOG_ROOT.mkdir(exist_ok=True) + + if reset and LOG_PATH.exists(): + LOG_PATH.unlink(missing_ok=True) + + logging.addLevelName(logging.DEBUG, "DEBUG:") + logging.addLevelName(logging.INFO, "INFO:") + logging.addLevelName(logging.WARNING, "WARNING:") + LOGGING_FORMAT = "[%(asctime)s] %(levelname)-8s %(message)s" + TIME_FORMAT = "%Y-%m-%d %H:%M:%S" + logging.basicConfig( + level=level, + format=LOGGING_FORMAT, + datefmt=TIME_FORMAT, + encoding="utf-8", + handlers=[ + logging.FileHandler(LOG_PATH, encoding="utf-8"), + logging.StreamHandler(), + ], + ) diff --git a/backend/src/module/core/status.py b/backend/src/module/core/status.py index 6f056463..674e568b 100644 --- a/backend/src/module/core/status.py +++ b/backend/src/module/core/status.py @@ -1,54 +1,54 @@ -import asyncio -import os.path -import threading - -from module.checker import Checker - - -class ProgramStatus(Checker): - def __init__(self): - super().__init__() - self.stop_event = threading.Event() - self.lock = threading.Lock() - self._downloader_status = False - self._torrents_status = False - self.event = asyncio.Event() - - @property - def is_running(self): - if self.stop_event.is_set() or self.check_first_run(): - return False - else: - return True - - @property - def is_stopped(self): - return self.stop_event.is_set() - - @property - def downloader_status(self): - if not self._downloader_status: - self._downloader_status = self.check_downloader() - return self._downloader_status - - @property - def torrents_status(self): - if not self._torrents_status: - self._torrents_status = self.check_torrents() - return self._torrents_status - - @property - def enable_rss(self): - return self.check_analyser() - - @property - def enable_renamer(self): - return self.check_renamer() - - @property - def first_run(self): - return self.check_first_run() - - @property - def legacy_data(self): - return os.path.exists("data/data.json") +import asyncio +import threading + +from module.checker import Checker +from module.conf import LEGACY_DATA_PATH + + +class ProgramStatus(Checker): + def __init__(self): + super().__init__() + self.stop_event = threading.Event() + self.lock = threading.Lock() + self._downloader_status = False + self._torrents_status = False + self.event = asyncio.Event() + + @property + def is_running(self): + if self.stop_event.is_set() or self.check_first_run(): + return False + else: + return True + + @property + def is_stopped(self): + return self.stop_event.is_set() + + @property + def downloader_status(self): + if not self._downloader_status: + self._downloader_status = self.check_downloader() + return self._downloader_status + + @property + def torrents_status(self): + if not self._torrents_status: + self._torrents_status = self.check_torrents() + return self._torrents_status + + @property + def enable_rss(self): + return self.check_analyser() + + @property + def enable_renamer(self): + return self.check_renamer() + + @property + def first_run(self): + return self.check_first_run() + + @property + def legacy_data(self): + return LEGACY_DATA_PATH.exists() diff --git a/backend/src/module/database/connector.py b/backend/src/module/database/connector.py index 36d492cb..506bb7b1 100644 --- a/backend/src/module/database/connector.py +++ b/backend/src/module/database/connector.py @@ -10,8 +10,8 @@ logger = logging.getLogger(__name__) class DataConnector: def __init__(self): # Create folder if not exists - if not os.path.exists(os.path.dirname(DATA_PATH)): - os.makedirs(os.path.dirname(DATA_PATH)) + DATA_PATH.parent.mkdir(parents=True, exist_ok=True) + self._conn = sqlite3.connect(DATA_PATH) self._cursor = self._conn.cursor() @@ -99,10 +99,14 @@ class DataConnector: def _delete(self, table_name: str, condition: dict): condition_sql = " AND ".join([f"{key} = :{key}" for key in condition.keys()]) - self._cursor.execute(f"DELETE FROM {table_name} WHERE {condition_sql}", condition) + self._cursor.execute( + f"DELETE FROM {table_name} WHERE {condition_sql}", condition + ) self._conn.commit() - def _search(self, table_name: str, keys: list[str] | None = None, condition: dict = None): + def _search( + self, table_name: str, keys: list[str] | None = None, condition: dict = None + ): if keys is None: select_sql = "*" else: @@ -111,20 +115,25 @@ class DataConnector: self._cursor.execute(f"SELECT {select_sql} FROM {table_name}") else: custom_condition = condition.pop("_custom_condition", None) - condition_sql = " AND ".join([f"{key} = :{key}" for key in condition.keys()]) + ( - f" AND {custom_condition}" if custom_condition else "" - ) + condition_sql = " AND ".join( + [f"{key} = :{key}" for key in condition.keys()] + ) + (f" AND {custom_condition}" if custom_condition else "") self._cursor.execute( - f"SELECT {select_sql} FROM {table_name} WHERE {condition_sql}", condition + f"SELECT {select_sql} FROM {table_name} WHERE {condition_sql}", + condition, ) - def _search_data(self, table_name: str, keys: list[str] | None = None, condition: dict = None) -> dict: + def _search_data( + self, table_name: str, keys: list[str] | None = None, condition: dict = None + ) -> dict: if keys is None: keys = self.__get_table_columns(table_name) self._search(table_name, keys, condition) return dict(zip(keys, self._cursor.fetchone())) - def _search_datas(self, table_name: str, keys: list[str] | None = None, condition: dict = None) -> list[dict]: + def _search_datas( + self, table_name: str, keys: list[str] | None = None, condition: dict = None + ) -> list[dict]: if keys is None: keys = self.__get_table_columns(table_name) self._search(table_name, keys, condition) diff --git a/backend/src/module/manager/collector.py b/backend/src/module/manager/collector.py index c9e12304..979aa5cb 100644 --- a/backend/src/module/manager/collector.py +++ b/backend/src/module/manager/collector.py @@ -59,5 +59,3 @@ def eps_complete(): sc.collect_season(data) data.eps_collect = True bd.update_list(datas) - - diff --git a/backend/src/module/notification/notification.py b/backend/src/module/notification/notification.py index 000ef171..29ac9d95 100644 --- a/backend/src/module/notification/notification.py +++ b/backend/src/module/notification/notification.py @@ -31,8 +31,7 @@ class PostNotification: def __init__(self): Notifier = getClient(settings.notification.type) self.notifier = Notifier( - token=settings.notification.token, - chat_id=settings.notification.chat_id + token=settings.notification.token, chat_id=settings.notification.chat_id ) @staticmethod diff --git a/backend/src/module/notification/plugin/wecom.py b/backend/src/module/notification/plugin/wecom.py index dc7c811b..01c4635f 100644 --- a/backend/src/module/notification/plugin/wecom.py +++ b/backend/src/module/notification/plugin/wecom.py @@ -27,15 +27,15 @@ class WecomNotification(RequestContent): title = "【番剧更新】" + notify.official_title msg = self.gen_message(notify) picurl = notify.poster_path - #Default pic to avoid blank in message. Resolution:1068*455 + # Default pic to avoid blank in message. Resolution:1068*455 if picurl == "https://mikanani.me": picurl = "https://article.biliimg.com/bfs/article/d8bcd0408bf32594fd82f27de7d2c685829d1b2e.png" data = { - "key":self.token, + "key": self.token, "type": "news", "title": title, "msg": msg, - "picurl":picurl + "picurl": picurl, } resp = self.post_data(self.notification_url, data) logger.debug(f"Wecom notification: {resp.status_code}") diff --git a/backend/src/module/parser/analyser/raw_parser.py b/backend/src/module/parser/analyser/raw_parser.py index 22ab8a38..5de06089 100644 --- a/backend/src/module/parser/analyser/raw_parser.py +++ b/backend/src/module/parser/analyser/raw_parser.py @@ -181,6 +181,6 @@ def raw_parser(raw: str) -> Episode | None: ) -if __name__ == '__main__': +if __name__ == "__main__": title = "[动漫国字幕组&LoliHouse] THE MARGINAL SERVICE - 08 [WebRip 1080p HEVC-10bit AAC][简繁内封字幕]" print(raw_parser(title)) diff --git a/backend/src/module/parser/analyser/tmdb_parser.py b/backend/src/module/parser/analyser/tmdb_parser.py index 449d79fd..393eec4e 100644 --- a/backend/src/module/parser/analyser/tmdb_parser.py +++ b/backend/src/module/parser/analyser/tmdb_parser.py @@ -16,14 +16,13 @@ class TMDBInfo: year: str -LANGUAGE = { - "zh": "zh-CN", - "jp": "ja-JP", - "en": "en-US" -} +LANGUAGE = {"zh": "zh-CN", "jp": "ja-JP", "en": "en-US"} + def search_url(e): return f"https://api.themoviedb.org/3/search/tv?api_key={TMDB_API}&page=1&query={e}&include_adult=false" + + def info_url(e, key): return f"https://api.themoviedb.org/3/tv/{e}?api_key={TMDB_API}&language={LANGUAGE[key]}" @@ -68,8 +67,9 @@ def tmdb_parser(title, language) -> TMDBInfo | None: { "season": s.get("name"), "air_date": s.get("air_date"), - "poster_path": s.get("poster_path") - } for s in info_content.get("seasons") + "poster_path": s.get("poster_path"), + } + for s in info_content.get("seasons") ] last_season = get_season(season) original_title = info_content.get("original_name") @@ -81,7 +81,7 @@ def tmdb_parser(title, language) -> TMDBInfo | None: original_title, season, last_season, - str(year_number) + str(year_number), ) else: return None diff --git a/backend/src/module/parser/analyser/torrent_parser.py b/backend/src/module/parser/analyser/torrent_parser.py index a7eecdb5..366d91ff 100644 --- a/backend/src/module/parser/analyser/torrent_parser.py +++ b/backend/src/module/parser/analyser/torrent_parser.py @@ -1,6 +1,5 @@ import logging -import ntpath as win_path -import os.path as unix_path +import pathlib import re from module.models import EpisodeFile, SubtitleFile @@ -23,11 +22,16 @@ SUBTITLE_LANG = { } -def split_path(torrent_path: str) -> str: - if PLATFORM == "Windows": - return win_path.split(torrent_path)[-1] - else: - return unix_path.split(torrent_path)[-1] +def get_path_basename(torrent_path: str) -> str: + """ + Returns the basename of a path string. + + :param torrent_path: A string representing a path to a file. + :type torrent_path: str + :return: A string representing the basename of the given path. + :rtype: str + """ + return pathlib.Path(torrent_path).name def get_group(group_and_title) -> tuple[str | None, str]: @@ -64,7 +68,7 @@ def torrent_parser( season: int | None = None, file_type: str = "media", ) -> EpisodeFile | SubtitleFile: - media_path = split_path(torrent_path) + media_path = get_path_basename(torrent_path) for rule in RULES: if torrent_name: match_obj = re.match(rule, torrent_name, re.I) @@ -77,7 +81,7 @@ def torrent_parser( else: title, _ = get_season_and_title(title) episode = int(match_obj.group(2)) - suffix = unix_path.splitext(torrent_path)[-1] + suffix = pathlib.Path(torrent_path).suffix if file_type == "media": return EpisodeFile( media_path=torrent_path, diff --git a/backend/src/module/update/data_migration.py b/backend/src/module/update/data_migration.py index 0540ea64..4bdd5b73 100644 --- a/backend/src/module/update/data_migration.py +++ b/backend/src/module/update/data_migration.py @@ -1,20 +1,22 @@ -import os - -from module.database import BangumiDatabase -from module.models import BangumiData -from module.utils import json_config - - -def data_migration(): - if not os.path.isfile("data/data.json"): - return False - old_data = json_config.load("data/data.json") - infos = old_data["bangumi_info"] - rss_link = old_data["rss_link"] - new_data = [] - for info in infos: - new_data.append(BangumiData(**info, rss_link=[rss_link])) - with BangumiDatabase() as database: - database.update_table() - database.insert_list(new_data) - os.remove("data/data.json") +import os + +from module.conf import LEGACY_DATA_PATH +from module.database import BangumiDatabase +from module.models import BangumiData +from module.utils import json_config + + +def data_migration(): + if not LEGACY_DATA_PATH.exists(): + return False + old_data = json_config.load(LEGACY_DATA_PATH) + infos = old_data["bangumi_info"] + rss_link = old_data["rss_link"] + new_data = [] + for info in infos: + new_data.append(BangumiData(**info, rss_link=[rss_link])) + with BangumiDatabase() as database: + database.update_table() + database.insert_list(new_data) + + LEGACY_DATA_PATH.unlink(missing_ok=True)