From 653119dbad950c2143f41d2928c4820771cf2720 Mon Sep 17 00:00:00 2001 From: 100gle Date: Mon, 3 Jul 2023 20:51:05 +0800 Subject: [PATCH] chore: fix CRLF issue --- backend/src/module/conf/__init__.py | 20 +-- backend/src/module/conf/config.py | 182 ++++++++++---------- backend/src/module/conf/log.py | 62 +++---- backend/src/module/core/status.py | 108 ++++++------ backend/src/module/update/data_migration.py | 44 ++--- backend/src/test/test_torrent_parser.py | 176 +++++++++---------- 6 files changed, 296 insertions(+), 296 deletions(-) diff --git a/backend/src/module/conf/__init__.py b/backend/src/module/conf/__init__.py index cc64c8a3..00dbc6fe 100644 --- a/backend/src/module/conf/__init__.py +++ b/backend/src/module/conf/__init__.py @@ -1,10 +1,10 @@ -from pathlib import Path - -from .config import VERSION, settings -from .log import LOG_PATH, setup_logger - -TMDB_API = "32b19d6a05b512190a056fa4e747cbbc" -DATA_PATH = Path("data/data.db") -LEGACY_DATA_PATH = Path("data/data.json") - -PLATFORM = "Windows" if "\\" in settings.downloader.path else "Unix" +from pathlib import Path + +from .config import VERSION, settings +from .log import LOG_PATH, setup_logger + +TMDB_API = "32b19d6a05b512190a056fa4e747cbbc" +DATA_PATH = Path("data/data.db") +LEGACY_DATA_PATH = Path("data/data.json") + +PLATFORM = "Windows" if "\\" in settings.downloader.path else "Unix" diff --git a/backend/src/module/conf/config.py b/backend/src/module/conf/config.py index 6e466dbc..00debcce 100644 --- a/backend/src/module/conf/config.py +++ b/backend/src/module/conf/config.py @@ -1,91 +1,91 @@ -import json -import logging -import os -from pathlib import Path - -from dotenv import load_dotenv - -from module.models.config import Config - -from .const import ENV_TO_ATTR - -logger = logging.getLogger(__name__) -CONFIG_ROOT = Path("config") - - -try: - from module.__version__ import VERSION -except ImportError: - logger.info("Can't find version info, use DEV_VERSION instead") - VERSION = "DEV_VERSION" - -CONFIG_PATH = ( - CONFIG_ROOT / "config_dev.json" - if VERSION == "DEV_VERSION" - else CONFIG_ROOT / "config.json" -).resolve() - - -class Settings(Config): - def __init__(self): - super().__init__() - if CONFIG_PATH.exists(): - self.load() - self.save() - else: - self.init() - - def load(self): - with open(CONFIG_PATH, "r", encoding="utf-8") as f: - config = json.load(f) - config_obj = Config.parse_obj(config) - self.__dict__.update(config_obj.__dict__) - logger.info("Config loaded") - - def save(self, config_dict: dict | None = None): - if not config_dict: - config_dict = self.dict() - with open(CONFIG_PATH, "w", encoding="utf-8") as f: - json.dump(config_dict, f, indent=4, ensure_ascii=False) - - def init(self): - load_dotenv(".env") - self.__load_from_env() - self.save() - - @property - def rss_link(self) -> str: - if "://" not in self.rss_parser.custom_url: - return f"https://{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}" - return ( - f"{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}" - ) - - def __load_from_env(self): - config_dict = self.dict() - for key, section in ENV_TO_ATTR.items(): - for env, attr in section.items(): - if env in os.environ: - if isinstance(attr, list): - for _attr in attr: - attr_name = _attr[0] if isinstance(_attr, tuple) else _attr - config_dict[key][attr_name] = self.__val_from_env( - env, _attr - ) - else: - attr_name = attr[0] if isinstance(attr, tuple) else attr - config_dict[key][attr_name] = self.__val_from_env(env, attr) - config_obj = Config.parse_obj(config_dict) - self.__dict__.update(config_obj.__dict__) - logger.info("Config loaded from env") - - @staticmethod - def __val_from_env(env: str, attr: tuple): - if isinstance(attr, tuple): - conv_func = attr[1] - return conv_func(os.environ[env]) - else: - return os.environ[env] - - -settings = Settings() +import json +import logging +import os +from pathlib import Path + +from dotenv import load_dotenv + +from module.models.config import Config + +from .const import ENV_TO_ATTR + +logger = logging.getLogger(__name__) +CONFIG_ROOT = Path("config") + + +try: + from module.__version__ import VERSION +except ImportError: + logger.info("Can't find version info, use DEV_VERSION instead") + VERSION = "DEV_VERSION" + +CONFIG_PATH = ( + CONFIG_ROOT / "config_dev.json" + if VERSION == "DEV_VERSION" + else CONFIG_ROOT / "config.json" +).resolve() + + +class Settings(Config): + def __init__(self): + super().__init__() + if CONFIG_PATH.exists(): + self.load() + self.save() + else: + self.init() + + def load(self): + with open(CONFIG_PATH, "r", encoding="utf-8") as f: + config = json.load(f) + config_obj = Config.parse_obj(config) + self.__dict__.update(config_obj.__dict__) + logger.info("Config loaded") + + def save(self, config_dict: dict | None = None): + if not config_dict: + config_dict = self.dict() + with open(CONFIG_PATH, "w", encoding="utf-8") as f: + json.dump(config_dict, f, indent=4, ensure_ascii=False) + + def init(self): + load_dotenv(".env") + self.__load_from_env() + self.save() + + @property + def rss_link(self) -> str: + if "://" not in self.rss_parser.custom_url: + return f"https://{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}" + return ( + f"{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}" + ) + + def __load_from_env(self): + config_dict = self.dict() + for key, section in ENV_TO_ATTR.items(): + for env, attr in section.items(): + if env in os.environ: + if isinstance(attr, list): + for _attr in attr: + attr_name = _attr[0] if isinstance(_attr, tuple) else _attr + config_dict[key][attr_name] = self.__val_from_env( + env, _attr + ) + else: + attr_name = attr[0] if isinstance(attr, tuple) else attr + config_dict[key][attr_name] = self.__val_from_env(env, attr) + config_obj = Config.parse_obj(config_dict) + self.__dict__.update(config_obj.__dict__) + logger.info("Config loaded from env") + + @staticmethod + def __val_from_env(env: str, attr: tuple): + if isinstance(attr, tuple): + conv_func = attr[1] + return conv_func(os.environ[env]) + else: + return os.environ[env] + + +settings = Settings() diff --git a/backend/src/module/conf/log.py b/backend/src/module/conf/log.py index 37a0b905..2d567424 100644 --- a/backend/src/module/conf/log.py +++ b/backend/src/module/conf/log.py @@ -1,31 +1,31 @@ -import logging -from pathlib import Path - -from .config import settings - -LOG_ROOT = Path("data") -LOG_PATH = LOG_ROOT / "log.txt" - - -def setup_logger(level: int = logging.INFO, reset: bool = False): - level = logging.DEBUG if settings.log.debug_enable else level - LOG_ROOT.mkdir(exist_ok=True) - - if reset and LOG_PATH.exists(): - LOG_PATH.unlink(missing_ok=True) - - logging.addLevelName(logging.DEBUG, "DEBUG:") - logging.addLevelName(logging.INFO, "INFO:") - logging.addLevelName(logging.WARNING, "WARNING:") - LOGGING_FORMAT = "[%(asctime)s] %(levelname)-8s %(message)s" - TIME_FORMAT = "%Y-%m-%d %H:%M:%S" - logging.basicConfig( - level=level, - format=LOGGING_FORMAT, - datefmt=TIME_FORMAT, - encoding="utf-8", - handlers=[ - logging.FileHandler(LOG_PATH, encoding="utf-8"), - logging.StreamHandler(), - ], - ) +import logging +from pathlib import Path + +from .config import settings + +LOG_ROOT = Path("data") +LOG_PATH = LOG_ROOT / "log.txt" + + +def setup_logger(level: int = logging.INFO, reset: bool = False): + level = logging.DEBUG if settings.log.debug_enable else level + LOG_ROOT.mkdir(exist_ok=True) + + if reset and LOG_PATH.exists(): + LOG_PATH.unlink(missing_ok=True) + + logging.addLevelName(logging.DEBUG, "DEBUG:") + logging.addLevelName(logging.INFO, "INFO:") + logging.addLevelName(logging.WARNING, "WARNING:") + LOGGING_FORMAT = "[%(asctime)s] %(levelname)-8s %(message)s" + TIME_FORMAT = "%Y-%m-%d %H:%M:%S" + logging.basicConfig( + level=level, + format=LOGGING_FORMAT, + datefmt=TIME_FORMAT, + encoding="utf-8", + handlers=[ + logging.FileHandler(LOG_PATH, encoding="utf-8"), + logging.StreamHandler(), + ], + ) diff --git a/backend/src/module/core/status.py b/backend/src/module/core/status.py index 674e568b..9d98daaf 100644 --- a/backend/src/module/core/status.py +++ b/backend/src/module/core/status.py @@ -1,54 +1,54 @@ -import asyncio -import threading - -from module.checker import Checker -from module.conf import LEGACY_DATA_PATH - - -class ProgramStatus(Checker): - def __init__(self): - super().__init__() - self.stop_event = threading.Event() - self.lock = threading.Lock() - self._downloader_status = False - self._torrents_status = False - self.event = asyncio.Event() - - @property - def is_running(self): - if self.stop_event.is_set() or self.check_first_run(): - return False - else: - return True - - @property - def is_stopped(self): - return self.stop_event.is_set() - - @property - def downloader_status(self): - if not self._downloader_status: - self._downloader_status = self.check_downloader() - return self._downloader_status - - @property - def torrents_status(self): - if not self._torrents_status: - self._torrents_status = self.check_torrents() - return self._torrents_status - - @property - def enable_rss(self): - return self.check_analyser() - - @property - def enable_renamer(self): - return self.check_renamer() - - @property - def first_run(self): - return self.check_first_run() - - @property - def legacy_data(self): - return LEGACY_DATA_PATH.exists() +import asyncio +import threading + +from module.checker import Checker +from module.conf import LEGACY_DATA_PATH + + +class ProgramStatus(Checker): + def __init__(self): + super().__init__() + self.stop_event = threading.Event() + self.lock = threading.Lock() + self._downloader_status = False + self._torrents_status = False + self.event = asyncio.Event() + + @property + def is_running(self): + if self.stop_event.is_set() or self.check_first_run(): + return False + else: + return True + + @property + def is_stopped(self): + return self.stop_event.is_set() + + @property + def downloader_status(self): + if not self._downloader_status: + self._downloader_status = self.check_downloader() + return self._downloader_status + + @property + def torrents_status(self): + if not self._torrents_status: + self._torrents_status = self.check_torrents() + return self._torrents_status + + @property + def enable_rss(self): + return self.check_analyser() + + @property + def enable_renamer(self): + return self.check_renamer() + + @property + def first_run(self): + return self.check_first_run() + + @property + def legacy_data(self): + return LEGACY_DATA_PATH.exists() diff --git a/backend/src/module/update/data_migration.py b/backend/src/module/update/data_migration.py index 4bdd5b73..b9ddf8db 100644 --- a/backend/src/module/update/data_migration.py +++ b/backend/src/module/update/data_migration.py @@ -1,22 +1,22 @@ -import os - -from module.conf import LEGACY_DATA_PATH -from module.database import BangumiDatabase -from module.models import BangumiData -from module.utils import json_config - - -def data_migration(): - if not LEGACY_DATA_PATH.exists(): - return False - old_data = json_config.load(LEGACY_DATA_PATH) - infos = old_data["bangumi_info"] - rss_link = old_data["rss_link"] - new_data = [] - for info in infos: - new_data.append(BangumiData(**info, rss_link=[rss_link])) - with BangumiDatabase() as database: - database.update_table() - database.insert_list(new_data) - - LEGACY_DATA_PATH.unlink(missing_ok=True) +import os + +from module.conf import LEGACY_DATA_PATH +from module.database import BangumiDatabase +from module.models import BangumiData +from module.utils import json_config + + +def data_migration(): + if not LEGACY_DATA_PATH.exists(): + return False + old_data = json_config.load(LEGACY_DATA_PATH) + infos = old_data["bangumi_info"] + rss_link = old_data["rss_link"] + new_data = [] + for info in infos: + new_data.append(BangumiData(**info, rss_link=[rss_link])) + with BangumiDatabase() as database: + database.update_table() + database.insert_list(new_data) + + LEGACY_DATA_PATH.unlink(missing_ok=True) diff --git a/backend/src/test/test_torrent_parser.py b/backend/src/test/test_torrent_parser.py index cf0a3cd0..10e6a546 100644 --- a/backend/src/test/test_torrent_parser.py +++ b/backend/src/test/test_torrent_parser.py @@ -1,88 +1,88 @@ -import sys - -import pytest -from module.parser.analyser import torrent_parser -from module.parser.analyser.torrent_parser import get_path_basename - - -def test_torrent_parser(): - file_name = "[Lilith-Raws] Boku no Kokoro no Yabai Yatsu - 01 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4].mp4" - bf = torrent_parser(file_name) - assert bf.title == "Boku no Kokoro no Yabai Yatsu" - assert bf.group == "Lilith-Raws" - assert bf.episode == 1 - assert bf.season == 1 - - file_name = "[Sakurato] Tonikaku Kawaii S2 [01][AVC-8bit 1080p AAC][CHS].mp4" - bf = torrent_parser(file_name) - assert bf.title == "Tonikaku Kawaii" - assert bf.group == "Sakurato" - assert bf.episode == 1 - assert bf.season == 2 - - file_name = "[SweetSub&LoliHouse] Heavenly Delusion - 01 [WebRip 1080p HEVC-10bit AAC ASSx2].mkv" - bf = torrent_parser(file_name) - assert bf.title == "Heavenly Delusion" - assert bf.group == "SweetSub&LoliHouse" - assert bf.episode == 1 - assert bf.season == 1 - - file_name = "[SBSUB][CONAN][1082][V2][1080P][AVC_AAC][CHS_JP](C1E4E331).mp4" - bf = torrent_parser(file_name) - assert bf.title == "CONAN" - assert bf.group == "SBSUB" - assert bf.episode == 1082 - assert bf.season == 1 - - file_name = "海盗战记 (2019) S01E01.mp4" - bf = torrent_parser(file_name) - assert bf.title == "海盗战记 (2019)" - assert bf.episode == 1 - assert bf.season == 1 - - file_name = "海盗战记/海盗战记 S01E01.mp4" - bf = torrent_parser(file_name) - assert bf.title == "海盗战记" - assert bf.episode == 1 - assert bf.season == 1 - - file_name = "海盗战记 S01E01.zh-tw.ass" - sf = torrent_parser(file_name, file_type="subtitle") - assert sf.title == "海盗战记" - assert sf.episode == 1 - assert sf.season == 1 - assert sf.language == "zh-tw" - - file_name = "海盗战记 S01E01.SC.ass" - sf = torrent_parser(file_name, file_type="subtitle") - assert sf.title == "海盗战记" - assert sf.season == 1 - assert sf.episode == 1 - assert sf.language == "zh" - - file_name = "水星的魔女(2022) S00E19.mp4" - bf = torrent_parser(file_name, season=0) - assert bf.title == "水星的魔女(2022)" - assert bf.season == 0 - assert bf.episode == 19 - - file_name = "【失眠搬运组】放学后失眠的你-Kimi wa Houkago Insomnia - 06 [bilibili - 1080p AVC1 CHS-JP].mp4" - bf = torrent_parser(file_name, season=1) - assert bf.title == "放学后失眠的你-Kimi wa Houkago Insomnia" - assert bf.season == 1 - assert bf.episode == 6 - - -class TestGetPathBasename: - def test_regular_path(self): - assert get_path_basename('/path/to/file.txt') == 'file.txt' - - def test_empty_path(self): - assert get_path_basename('') == '' - - def test_path_with_trailing_slash(self): - assert get_path_basename('/path/to/folder/') == 'folder' - - @pytest.mark.skipif(not sys.platform.startswith("win"), reason="Windows specific") - def test_windows_path(self): - assert get_path_basename('C:\\path\\to\\file.txt') == 'file.txt' +import sys + +import pytest +from module.parser.analyser import torrent_parser +from module.parser.analyser.torrent_parser import get_path_basename + + +def test_torrent_parser(): + file_name = "[Lilith-Raws] Boku no Kokoro no Yabai Yatsu - 01 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4].mp4" + bf = torrent_parser(file_name) + assert bf.title == "Boku no Kokoro no Yabai Yatsu" + assert bf.group == "Lilith-Raws" + assert bf.episode == 1 + assert bf.season == 1 + + file_name = "[Sakurato] Tonikaku Kawaii S2 [01][AVC-8bit 1080p AAC][CHS].mp4" + bf = torrent_parser(file_name) + assert bf.title == "Tonikaku Kawaii" + assert bf.group == "Sakurato" + assert bf.episode == 1 + assert bf.season == 2 + + file_name = "[SweetSub&LoliHouse] Heavenly Delusion - 01 [WebRip 1080p HEVC-10bit AAC ASSx2].mkv" + bf = torrent_parser(file_name) + assert bf.title == "Heavenly Delusion" + assert bf.group == "SweetSub&LoliHouse" + assert bf.episode == 1 + assert bf.season == 1 + + file_name = "[SBSUB][CONAN][1082][V2][1080P][AVC_AAC][CHS_JP](C1E4E331).mp4" + bf = torrent_parser(file_name) + assert bf.title == "CONAN" + assert bf.group == "SBSUB" + assert bf.episode == 1082 + assert bf.season == 1 + + file_name = "海盗战记 (2019) S01E01.mp4" + bf = torrent_parser(file_name) + assert bf.title == "海盗战记 (2019)" + assert bf.episode == 1 + assert bf.season == 1 + + file_name = "海盗战记/海盗战记 S01E01.mp4" + bf = torrent_parser(file_name) + assert bf.title == "海盗战记" + assert bf.episode == 1 + assert bf.season == 1 + + file_name = "海盗战记 S01E01.zh-tw.ass" + sf = torrent_parser(file_name, file_type="subtitle") + assert sf.title == "海盗战记" + assert sf.episode == 1 + assert sf.season == 1 + assert sf.language == "zh-tw" + + file_name = "海盗战记 S01E01.SC.ass" + sf = torrent_parser(file_name, file_type="subtitle") + assert sf.title == "海盗战记" + assert sf.season == 1 + assert sf.episode == 1 + assert sf.language == "zh" + + file_name = "水星的魔女(2022) S00E19.mp4" + bf = torrent_parser(file_name, season=0) + assert bf.title == "水星的魔女(2022)" + assert bf.season == 0 + assert bf.episode == 19 + + file_name = "【失眠搬运组】放学后失眠的你-Kimi wa Houkago Insomnia - 06 [bilibili - 1080p AVC1 CHS-JP].mp4" + bf = torrent_parser(file_name, season=1) + assert bf.title == "放学后失眠的你-Kimi wa Houkago Insomnia" + assert bf.season == 1 + assert bf.episode == 6 + + +class TestGetPathBasename: + def test_regular_path(self): + assert get_path_basename("/path/to/file.txt") == "file.txt" + + def test_empty_path(self): + assert get_path_basename("") == "" + + def test_path_with_trailing_slash(self): + assert get_path_basename("/path/to/folder/") == "folder" + + @pytest.mark.skipif(not sys.platform.startswith("win"), reason="Windows specific") + def test_windows_path(self): + assert get_path_basename("C:\\path\\to\\file.txt") == "file.txt"