chore: fix CRLF issue

This commit is contained in:
100gle
2023-07-03 20:51:05 +08:00
parent a20de4f0ad
commit 653119dbad
6 changed files with 296 additions and 296 deletions

View File

@@ -1,10 +1,10 @@
from pathlib import Path
from .config import VERSION, settings
from .log import LOG_PATH, setup_logger
TMDB_API = "32b19d6a05b512190a056fa4e747cbbc"
DATA_PATH = Path("data/data.db")
LEGACY_DATA_PATH = Path("data/data.json")
PLATFORM = "Windows" if "\\" in settings.downloader.path else "Unix"
from pathlib import Path
from .config import VERSION, settings
from .log import LOG_PATH, setup_logger
TMDB_API = "32b19d6a05b512190a056fa4e747cbbc"
DATA_PATH = Path("data/data.db")
LEGACY_DATA_PATH = Path("data/data.json")
PLATFORM = "Windows" if "\\" in settings.downloader.path else "Unix"

View File

@@ -1,91 +1,91 @@
import json
import logging
import os
from pathlib import Path
from dotenv import load_dotenv
from module.models.config import Config
from .const import ENV_TO_ATTR
logger = logging.getLogger(__name__)
CONFIG_ROOT = Path("config")
try:
from module.__version__ import VERSION
except ImportError:
logger.info("Can't find version info, use DEV_VERSION instead")
VERSION = "DEV_VERSION"
CONFIG_PATH = (
CONFIG_ROOT / "config_dev.json"
if VERSION == "DEV_VERSION"
else CONFIG_ROOT / "config.json"
).resolve()
class Settings(Config):
def __init__(self):
super().__init__()
if CONFIG_PATH.exists():
self.load()
self.save()
else:
self.init()
def load(self):
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
config = json.load(f)
config_obj = Config.parse_obj(config)
self.__dict__.update(config_obj.__dict__)
logger.info("Config loaded")
def save(self, config_dict: dict | None = None):
if not config_dict:
config_dict = self.dict()
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(config_dict, f, indent=4, ensure_ascii=False)
def init(self):
load_dotenv(".env")
self.__load_from_env()
self.save()
@property
def rss_link(self) -> str:
if "://" not in self.rss_parser.custom_url:
return f"https://{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}"
return (
f"{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}"
)
def __load_from_env(self):
config_dict = self.dict()
for key, section in ENV_TO_ATTR.items():
for env, attr in section.items():
if env in os.environ:
if isinstance(attr, list):
for _attr in attr:
attr_name = _attr[0] if isinstance(_attr, tuple) else _attr
config_dict[key][attr_name] = self.__val_from_env(
env, _attr
)
else:
attr_name = attr[0] if isinstance(attr, tuple) else attr
config_dict[key][attr_name] = self.__val_from_env(env, attr)
config_obj = Config.parse_obj(config_dict)
self.__dict__.update(config_obj.__dict__)
logger.info("Config loaded from env")
@staticmethod
def __val_from_env(env: str, attr: tuple):
if isinstance(attr, tuple):
conv_func = attr[1]
return conv_func(os.environ[env])
else:
return os.environ[env]
settings = Settings()
import json
import logging
import os
from pathlib import Path
from dotenv import load_dotenv
from module.models.config import Config
from .const import ENV_TO_ATTR
logger = logging.getLogger(__name__)
CONFIG_ROOT = Path("config")
try:
from module.__version__ import VERSION
except ImportError:
logger.info("Can't find version info, use DEV_VERSION instead")
VERSION = "DEV_VERSION"
CONFIG_PATH = (
CONFIG_ROOT / "config_dev.json"
if VERSION == "DEV_VERSION"
else CONFIG_ROOT / "config.json"
).resolve()
class Settings(Config):
def __init__(self):
super().__init__()
if CONFIG_PATH.exists():
self.load()
self.save()
else:
self.init()
def load(self):
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
config = json.load(f)
config_obj = Config.parse_obj(config)
self.__dict__.update(config_obj.__dict__)
logger.info("Config loaded")
def save(self, config_dict: dict | None = None):
if not config_dict:
config_dict = self.dict()
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(config_dict, f, indent=4, ensure_ascii=False)
def init(self):
load_dotenv(".env")
self.__load_from_env()
self.save()
@property
def rss_link(self) -> str:
if "://" not in self.rss_parser.custom_url:
return f"https://{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}"
return (
f"{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}"
)
def __load_from_env(self):
config_dict = self.dict()
for key, section in ENV_TO_ATTR.items():
for env, attr in section.items():
if env in os.environ:
if isinstance(attr, list):
for _attr in attr:
attr_name = _attr[0] if isinstance(_attr, tuple) else _attr
config_dict[key][attr_name] = self.__val_from_env(
env, _attr
)
else:
attr_name = attr[0] if isinstance(attr, tuple) else attr
config_dict[key][attr_name] = self.__val_from_env(env, attr)
config_obj = Config.parse_obj(config_dict)
self.__dict__.update(config_obj.__dict__)
logger.info("Config loaded from env")
@staticmethod
def __val_from_env(env: str, attr: tuple):
if isinstance(attr, tuple):
conv_func = attr[1]
return conv_func(os.environ[env])
else:
return os.environ[env]
settings = Settings()

View File

@@ -1,31 +1,31 @@
import logging
from pathlib import Path
from .config import settings
LOG_ROOT = Path("data")
LOG_PATH = LOG_ROOT / "log.txt"
def setup_logger(level: int = logging.INFO, reset: bool = False):
level = logging.DEBUG if settings.log.debug_enable else level
LOG_ROOT.mkdir(exist_ok=True)
if reset and LOG_PATH.exists():
LOG_PATH.unlink(missing_ok=True)
logging.addLevelName(logging.DEBUG, "DEBUG:")
logging.addLevelName(logging.INFO, "INFO:")
logging.addLevelName(logging.WARNING, "WARNING:")
LOGGING_FORMAT = "[%(asctime)s] %(levelname)-8s %(message)s"
TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
logging.basicConfig(
level=level,
format=LOGGING_FORMAT,
datefmt=TIME_FORMAT,
encoding="utf-8",
handlers=[
logging.FileHandler(LOG_PATH, encoding="utf-8"),
logging.StreamHandler(),
],
)
import logging
from pathlib import Path
from .config import settings
LOG_ROOT = Path("data")
LOG_PATH = LOG_ROOT / "log.txt"
def setup_logger(level: int = logging.INFO, reset: bool = False):
level = logging.DEBUG if settings.log.debug_enable else level
LOG_ROOT.mkdir(exist_ok=True)
if reset and LOG_PATH.exists():
LOG_PATH.unlink(missing_ok=True)
logging.addLevelName(logging.DEBUG, "DEBUG:")
logging.addLevelName(logging.INFO, "INFO:")
logging.addLevelName(logging.WARNING, "WARNING:")
LOGGING_FORMAT = "[%(asctime)s] %(levelname)-8s %(message)s"
TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
logging.basicConfig(
level=level,
format=LOGGING_FORMAT,
datefmt=TIME_FORMAT,
encoding="utf-8",
handlers=[
logging.FileHandler(LOG_PATH, encoding="utf-8"),
logging.StreamHandler(),
],
)

View File

@@ -1,54 +1,54 @@
import asyncio
import threading
from module.checker import Checker
from module.conf import LEGACY_DATA_PATH
class ProgramStatus(Checker):
def __init__(self):
super().__init__()
self.stop_event = threading.Event()
self.lock = threading.Lock()
self._downloader_status = False
self._torrents_status = False
self.event = asyncio.Event()
@property
def is_running(self):
if self.stop_event.is_set() or self.check_first_run():
return False
else:
return True
@property
def is_stopped(self):
return self.stop_event.is_set()
@property
def downloader_status(self):
if not self._downloader_status:
self._downloader_status = self.check_downloader()
return self._downloader_status
@property
def torrents_status(self):
if not self._torrents_status:
self._torrents_status = self.check_torrents()
return self._torrents_status
@property
def enable_rss(self):
return self.check_analyser()
@property
def enable_renamer(self):
return self.check_renamer()
@property
def first_run(self):
return self.check_first_run()
@property
def legacy_data(self):
return LEGACY_DATA_PATH.exists()
import asyncio
import threading
from module.checker import Checker
from module.conf import LEGACY_DATA_PATH
class ProgramStatus(Checker):
def __init__(self):
super().__init__()
self.stop_event = threading.Event()
self.lock = threading.Lock()
self._downloader_status = False
self._torrents_status = False
self.event = asyncio.Event()
@property
def is_running(self):
if self.stop_event.is_set() or self.check_first_run():
return False
else:
return True
@property
def is_stopped(self):
return self.stop_event.is_set()
@property
def downloader_status(self):
if not self._downloader_status:
self._downloader_status = self.check_downloader()
return self._downloader_status
@property
def torrents_status(self):
if not self._torrents_status:
self._torrents_status = self.check_torrents()
return self._torrents_status
@property
def enable_rss(self):
return self.check_analyser()
@property
def enable_renamer(self):
return self.check_renamer()
@property
def first_run(self):
return self.check_first_run()
@property
def legacy_data(self):
return LEGACY_DATA_PATH.exists()

View File

@@ -1,22 +1,22 @@
import os
from module.conf import LEGACY_DATA_PATH
from module.database import BangumiDatabase
from module.models import BangumiData
from module.utils import json_config
def data_migration():
if not LEGACY_DATA_PATH.exists():
return False
old_data = json_config.load(LEGACY_DATA_PATH)
infos = old_data["bangumi_info"]
rss_link = old_data["rss_link"]
new_data = []
for info in infos:
new_data.append(BangumiData(**info, rss_link=[rss_link]))
with BangumiDatabase() as database:
database.update_table()
database.insert_list(new_data)
LEGACY_DATA_PATH.unlink(missing_ok=True)
import os
from module.conf import LEGACY_DATA_PATH
from module.database import BangumiDatabase
from module.models import BangumiData
from module.utils import json_config
def data_migration():
if not LEGACY_DATA_PATH.exists():
return False
old_data = json_config.load(LEGACY_DATA_PATH)
infos = old_data["bangumi_info"]
rss_link = old_data["rss_link"]
new_data = []
for info in infos:
new_data.append(BangumiData(**info, rss_link=[rss_link]))
with BangumiDatabase() as database:
database.update_table()
database.insert_list(new_data)
LEGACY_DATA_PATH.unlink(missing_ok=True)

View File

@@ -1,88 +1,88 @@
import sys
import pytest
from module.parser.analyser import torrent_parser
from module.parser.analyser.torrent_parser import get_path_basename
def test_torrent_parser():
file_name = "[Lilith-Raws] Boku no Kokoro no Yabai Yatsu - 01 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4].mp4"
bf = torrent_parser(file_name)
assert bf.title == "Boku no Kokoro no Yabai Yatsu"
assert bf.group == "Lilith-Raws"
assert bf.episode == 1
assert bf.season == 1
file_name = "[Sakurato] Tonikaku Kawaii S2 [01][AVC-8bit 1080p AAC][CHS].mp4"
bf = torrent_parser(file_name)
assert bf.title == "Tonikaku Kawaii"
assert bf.group == "Sakurato"
assert bf.episode == 1
assert bf.season == 2
file_name = "[SweetSub&LoliHouse] Heavenly Delusion - 01 [WebRip 1080p HEVC-10bit AAC ASSx2].mkv"
bf = torrent_parser(file_name)
assert bf.title == "Heavenly Delusion"
assert bf.group == "SweetSub&LoliHouse"
assert bf.episode == 1
assert bf.season == 1
file_name = "[SBSUB][CONAN][1082][V2][1080P][AVC_AAC][CHS_JP](C1E4E331).mp4"
bf = torrent_parser(file_name)
assert bf.title == "CONAN"
assert bf.group == "SBSUB"
assert bf.episode == 1082
assert bf.season == 1
file_name = "海盗战记 (2019) S01E01.mp4"
bf = torrent_parser(file_name)
assert bf.title == "海盗战记 (2019)"
assert bf.episode == 1
assert bf.season == 1
file_name = "海盗战记/海盗战记 S01E01.mp4"
bf = torrent_parser(file_name)
assert bf.title == "海盗战记"
assert bf.episode == 1
assert bf.season == 1
file_name = "海盗战记 S01E01.zh-tw.ass"
sf = torrent_parser(file_name, file_type="subtitle")
assert sf.title == "海盗战记"
assert sf.episode == 1
assert sf.season == 1
assert sf.language == "zh-tw"
file_name = "海盗战记 S01E01.SC.ass"
sf = torrent_parser(file_name, file_type="subtitle")
assert sf.title == "海盗战记"
assert sf.season == 1
assert sf.episode == 1
assert sf.language == "zh"
file_name = "水星的魔女(2022) S00E19.mp4"
bf = torrent_parser(file_name, season=0)
assert bf.title == "水星的魔女(2022)"
assert bf.season == 0
assert bf.episode == 19
file_name = "【失眠搬运组】放学后失眠的你-Kimi wa Houkago Insomnia - 06 [bilibili - 1080p AVC1 CHS-JP].mp4"
bf = torrent_parser(file_name, season=1)
assert bf.title == "放学后失眠的你-Kimi wa Houkago Insomnia"
assert bf.season == 1
assert bf.episode == 6
class TestGetPathBasename:
def test_regular_path(self):
assert get_path_basename('/path/to/file.txt') == 'file.txt'
def test_empty_path(self):
assert get_path_basename('') == ''
def test_path_with_trailing_slash(self):
assert get_path_basename('/path/to/folder/') == 'folder'
@pytest.mark.skipif(not sys.platform.startswith("win"), reason="Windows specific")
def test_windows_path(self):
assert get_path_basename('C:\\path\\to\\file.txt') == 'file.txt'
import sys
import pytest
from module.parser.analyser import torrent_parser
from module.parser.analyser.torrent_parser import get_path_basename
def test_torrent_parser():
file_name = "[Lilith-Raws] Boku no Kokoro no Yabai Yatsu - 01 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4].mp4"
bf = torrent_parser(file_name)
assert bf.title == "Boku no Kokoro no Yabai Yatsu"
assert bf.group == "Lilith-Raws"
assert bf.episode == 1
assert bf.season == 1
file_name = "[Sakurato] Tonikaku Kawaii S2 [01][AVC-8bit 1080p AAC][CHS].mp4"
bf = torrent_parser(file_name)
assert bf.title == "Tonikaku Kawaii"
assert bf.group == "Sakurato"
assert bf.episode == 1
assert bf.season == 2
file_name = "[SweetSub&LoliHouse] Heavenly Delusion - 01 [WebRip 1080p HEVC-10bit AAC ASSx2].mkv"
bf = torrent_parser(file_name)
assert bf.title == "Heavenly Delusion"
assert bf.group == "SweetSub&LoliHouse"
assert bf.episode == 1
assert bf.season == 1
file_name = "[SBSUB][CONAN][1082][V2][1080P][AVC_AAC][CHS_JP](C1E4E331).mp4"
bf = torrent_parser(file_name)
assert bf.title == "CONAN"
assert bf.group == "SBSUB"
assert bf.episode == 1082
assert bf.season == 1
file_name = "海盗战记 (2019) S01E01.mp4"
bf = torrent_parser(file_name)
assert bf.title == "海盗战记 (2019)"
assert bf.episode == 1
assert bf.season == 1
file_name = "海盗战记/海盗战记 S01E01.mp4"
bf = torrent_parser(file_name)
assert bf.title == "海盗战记"
assert bf.episode == 1
assert bf.season == 1
file_name = "海盗战记 S01E01.zh-tw.ass"
sf = torrent_parser(file_name, file_type="subtitle")
assert sf.title == "海盗战记"
assert sf.episode == 1
assert sf.season == 1
assert sf.language == "zh-tw"
file_name = "海盗战记 S01E01.SC.ass"
sf = torrent_parser(file_name, file_type="subtitle")
assert sf.title == "海盗战记"
assert sf.season == 1
assert sf.episode == 1
assert sf.language == "zh"
file_name = "水星的魔女(2022) S00E19.mp4"
bf = torrent_parser(file_name, season=0)
assert bf.title == "水星的魔女(2022)"
assert bf.season == 0
assert bf.episode == 19
file_name = "【失眠搬运组】放学后失眠的你-Kimi wa Houkago Insomnia - 06 [bilibili - 1080p AVC1 CHS-JP].mp4"
bf = torrent_parser(file_name, season=1)
assert bf.title == "放学后失眠的你-Kimi wa Houkago Insomnia"
assert bf.season == 1
assert bf.episode == 6
class TestGetPathBasename:
def test_regular_path(self):
assert get_path_basename("/path/to/file.txt") == "file.txt"
def test_empty_path(self):
assert get_path_basename("") == ""
def test_path_with_trailing_slash(self):
assert get_path_basename("/path/to/folder/") == "folder"
@pytest.mark.skipif(not sys.platform.startswith("win"), reason="Windows specific")
def test_windows_path(self):
assert get_path_basename("C:\\path\\to\\file.txt") == "file.txt"