feat: use pathlib to replace os.path for handling file path

This commit is contained in:
100gle
2023-06-13 23:44:38 +08:00
committed by MacBook Pro
parent 6acf69af34
commit ebb5e0995c
14 changed files with 264 additions and 238 deletions

View File

@@ -14,7 +14,7 @@ async def get_log(current_user=Depends(get_current_user)):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid token"
)
if os.path.isfile(LOG_PATH):
if LOG_PATH.exists():
with open(LOG_PATH, "rb") as f:
return Response(f.read(), media_type="text/plain")
else:
@@ -27,9 +27,8 @@ async def clear_log(current_user=Depends(get_current_user)):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid token"
)
if os.path.isfile(LOG_PATH):
with open(LOG_PATH, "w") as f:
f.write("")
if LOG_PATH.exists():
LOG_PATH.write_text("")
return {"status": "ok"}
else:
return Response("Log file not found", status_code=404)

View File

@@ -1,7 +1,10 @@
from .config import VERSION, settings
from .log import LOG_PATH, setup_logger
TMDB_API = "32b19d6a05b512190a056fa4e747cbbc"
DATA_PATH = "data/data.db"
PLATFORM = "Windows" if "\\" in settings.downloader.path else "Unix"
import pathlib
from .config import VERSION, settings
from .log import LOG_PATH, setup_logger
TMDB_API = "32b19d6a05b512190a056fa4e747cbbc"
DATA_PATH = pathlib.Path("data/data.db")
LEGACY_DATA_PATH = pathlib.Path("data/data.json")
PLATFORM = "Windows" if "\\" in settings.downloader.path else "Unix"

View File

@@ -1,89 +1,91 @@
import json
import logging
import os
from dotenv import load_dotenv
from module.models.config import Config
from .const import ENV_TO_ATTR
logger = logging.getLogger(__name__)
try:
from module.__version__ import VERSION
if VERSION == "DEV_VERSION":
logger.info("Can't find version info, use DEV_VERSION instead")
CONFIG_PATH = "config/config_dev.json"
else:
CONFIG_PATH = "config/config.json"
except ImportError:
logger.info("Can't find version info, use DEV_VERSION instead")
VERSION = "DEV_VERSION"
CONFIG_PATH = "config/config_dev.json"
class Settings(Config):
def __init__(self):
super().__init__()
if os.path.exists(CONFIG_PATH):
self.load()
self.save()
else:
self.init()
def load(self):
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
config = json.load(f)
config_obj = Config.parse_obj(config)
self.__dict__.update(config_obj.__dict__)
logger.info("Config loaded")
def save(self, config_dict: dict | None = None):
if not config_dict:
config_dict = self.dict()
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(config_dict, f, indent=4, ensure_ascii=False)
def init(self):
load_dotenv(".env")
self.__load_from_env()
self.save()
@property
def rss_link(self) -> str:
if "://" not in self.rss_parser.custom_url:
return f"https://{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}"
return (
f"{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}"
)
def __load_from_env(self):
config_dict = self.dict()
for key, section in ENV_TO_ATTR.items():
for env, attr in section.items():
if env in os.environ:
if isinstance(attr, list):
for _attr in attr:
attr_name = _attr[0] if isinstance(_attr, tuple) else _attr
config_dict[key][attr_name] = self.__val_from_env(
env, _attr
)
else:
attr_name = attr[0] if isinstance(attr, tuple) else attr
config_dict[key][attr_name] = self.__val_from_env(env, attr)
config_obj = Config.parse_obj(config_dict)
self.__dict__.update(config_obj.__dict__)
logger.info("Config loaded from env")
@staticmethod
def __val_from_env(env: str, attr: tuple):
if isinstance(attr, tuple):
conv_func = attr[1]
return conv_func(os.environ[env])
else:
return os.environ[env]
settings = Settings()
import json
import logging
import os
import pathlib
from dotenv import load_dotenv
from module.models.config import Config
from .const import ENV_TO_ATTR
logger = logging.getLogger(__name__)
CONFIG_ROOT = pathlib.Path("src/config")
try:
from module.__version__ import VERSION
except ImportError:
logger.info("Can't find version info, use DEV_VERSION instead")
VERSION = "DEV_VERSION"
CONFIG_PATH = (
CONFIG_ROOT / "config_dev.json"
if VERSION == "DEV_VERSION"
else CONFIG_ROOT / "config.json"
).resolve()
class Settings(Config):
def __init__(self):
super().__init__()
if CONFIG_PATH.exists():
self.load()
self.save()
else:
self.init()
def load(self):
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
config = json.load(f)
config_obj = Config.parse_obj(config)
self.__dict__.update(config_obj.__dict__)
logger.info("Config loaded")
def save(self, config_dict: dict | None = None):
if not config_dict:
config_dict = self.dict()
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(config_dict, f, indent=4, ensure_ascii=False)
def init(self):
load_dotenv(".env")
self.__load_from_env()
self.save()
@property
def rss_link(self) -> str:
if "://" not in self.rss_parser.custom_url:
return f"https://{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}"
return (
f"{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}"
)
def __load_from_env(self):
config_dict = self.dict()
for key, section in ENV_TO_ATTR.items():
for env, attr in section.items():
if env in os.environ:
if isinstance(attr, list):
for _attr in attr:
attr_name = _attr[0] if isinstance(_attr, tuple) else _attr
config_dict[key][attr_name] = self.__val_from_env(
env, _attr
)
else:
attr_name = attr[0] if isinstance(attr, tuple) else attr
config_dict[key][attr_name] = self.__val_from_env(env, attr)
config_obj = Config.parse_obj(config_dict)
self.__dict__.update(config_obj.__dict__)
logger.info("Config loaded from env")
@staticmethod
def __val_from_env(env: str, attr: tuple):
if isinstance(attr, tuple):
conv_func = attr[1]
return conv_func(os.environ[env])
else:
return os.environ[env]
settings = Settings()

View File

@@ -1,29 +1,31 @@
import logging
import os
from .config import settings
LOG_PATH = "data/log.txt"
def setup_logger(level: int = logging.INFO, reset: bool = False):
level = logging.DEBUG if settings.log.debug_enable else level
if not os.path.isdir("data"):
os.mkdir("data")
if reset and os.path.isfile(LOG_PATH):
os.remove(LOG_PATH)
logging.addLevelName(logging.DEBUG, "DEBUG:")
logging.addLevelName(logging.INFO, "INFO:")
logging.addLevelName(logging.WARNING, "WARNING:")
LOGGING_FORMAT = "[%(asctime)s] %(levelname)-8s %(message)s"
TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
logging.basicConfig(
level=level,
format=LOGGING_FORMAT,
datefmt=TIME_FORMAT,
encoding="utf-8",
handlers=[
logging.FileHandler(LOG_PATH, encoding="utf-8"),
logging.StreamHandler(),
],
)
import logging
import pathlib
from .config import settings
LOG_ROOT = pathlib.Path("src/data")
LOG_PATH = LOG_ROOT / "log.txt"
def setup_logger(level: int = logging.INFO, reset: bool = False):
level = logging.DEBUG if settings.log.debug_enable else level
LOG_ROOT.mkdir(exist_ok=True)
if reset and LOG_PATH.exists():
LOG_PATH.unlink(missing_ok=True)
logging.addLevelName(logging.DEBUG, "DEBUG:")
logging.addLevelName(logging.INFO, "INFO:")
logging.addLevelName(logging.WARNING, "WARNING:")
LOGGING_FORMAT = "[%(asctime)s] %(levelname)-8s %(message)s"
TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
logging.basicConfig(
level=level,
format=LOGGING_FORMAT,
datefmt=TIME_FORMAT,
encoding="utf-8",
handlers=[
logging.FileHandler(LOG_PATH, encoding="utf-8"),
logging.StreamHandler(),
],
)

View File

@@ -1,54 +1,54 @@
import asyncio
import os.path
import threading
from module.checker import Checker
class ProgramStatus(Checker):
def __init__(self):
super().__init__()
self.stop_event = threading.Event()
self.lock = threading.Lock()
self._downloader_status = False
self._torrents_status = False
self.event = asyncio.Event()
@property
def is_running(self):
if self.stop_event.is_set() or self.check_first_run():
return False
else:
return True
@property
def is_stopped(self):
return self.stop_event.is_set()
@property
def downloader_status(self):
if not self._downloader_status:
self._downloader_status = self.check_downloader()
return self._downloader_status
@property
def torrents_status(self):
if not self._torrents_status:
self._torrents_status = self.check_torrents()
return self._torrents_status
@property
def enable_rss(self):
return self.check_analyser()
@property
def enable_renamer(self):
return self.check_renamer()
@property
def first_run(self):
return self.check_first_run()
@property
def legacy_data(self):
return os.path.exists("data/data.json")
import asyncio
import threading
from module.checker import Checker
from module.conf import LEGACY_DATA_PATH
class ProgramStatus(Checker):
def __init__(self):
super().__init__()
self.stop_event = threading.Event()
self.lock = threading.Lock()
self._downloader_status = False
self._torrents_status = False
self.event = asyncio.Event()
@property
def is_running(self):
if self.stop_event.is_set() or self.check_first_run():
return False
else:
return True
@property
def is_stopped(self):
return self.stop_event.is_set()
@property
def downloader_status(self):
if not self._downloader_status:
self._downloader_status = self.check_downloader()
return self._downloader_status
@property
def torrents_status(self):
if not self._torrents_status:
self._torrents_status = self.check_torrents()
return self._torrents_status
@property
def enable_rss(self):
return self.check_analyser()
@property
def enable_renamer(self):
return self.check_renamer()
@property
def first_run(self):
return self.check_first_run()
@property
def legacy_data(self):
return LEGACY_DATA_PATH.exists()

View File

@@ -10,8 +10,8 @@ logger = logging.getLogger(__name__)
class DataConnector:
def __init__(self):
# Create folder if not exists
if not os.path.exists(os.path.dirname(DATA_PATH)):
os.makedirs(os.path.dirname(DATA_PATH))
DATA_PATH.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(DATA_PATH)
self._cursor = self._conn.cursor()
@@ -99,10 +99,14 @@ class DataConnector:
def _delete(self, table_name: str, condition: dict):
condition_sql = " AND ".join([f"{key} = :{key}" for key in condition.keys()])
self._cursor.execute(f"DELETE FROM {table_name} WHERE {condition_sql}", condition)
self._cursor.execute(
f"DELETE FROM {table_name} WHERE {condition_sql}", condition
)
self._conn.commit()
def _search(self, table_name: str, keys: list[str] | None = None, condition: dict = None):
def _search(
self, table_name: str, keys: list[str] | None = None, condition: dict = None
):
if keys is None:
select_sql = "*"
else:
@@ -111,20 +115,25 @@ class DataConnector:
self._cursor.execute(f"SELECT {select_sql} FROM {table_name}")
else:
custom_condition = condition.pop("_custom_condition", None)
condition_sql = " AND ".join([f"{key} = :{key}" for key in condition.keys()]) + (
f" AND {custom_condition}" if custom_condition else ""
)
condition_sql = " AND ".join(
[f"{key} = :{key}" for key in condition.keys()]
) + (f" AND {custom_condition}" if custom_condition else "")
self._cursor.execute(
f"SELECT {select_sql} FROM {table_name} WHERE {condition_sql}", condition
f"SELECT {select_sql} FROM {table_name} WHERE {condition_sql}",
condition,
)
def _search_data(self, table_name: str, keys: list[str] | None = None, condition: dict = None) -> dict:
def _search_data(
self, table_name: str, keys: list[str] | None = None, condition: dict = None
) -> dict:
if keys is None:
keys = self.__get_table_columns(table_name)
self._search(table_name, keys, condition)
return dict(zip(keys, self._cursor.fetchone()))
def _search_datas(self, table_name: str, keys: list[str] | None = None, condition: dict = None) -> list[dict]:
def _search_datas(
self, table_name: str, keys: list[str] | None = None, condition: dict = None
) -> list[dict]:
if keys is None:
keys = self.__get_table_columns(table_name)
self._search(table_name, keys, condition)

View File

@@ -59,5 +59,3 @@ def eps_complete():
sc.collect_season(data)
data.eps_collect = True
bd.update_list(datas)

View File

@@ -31,8 +31,7 @@ class PostNotification:
def __init__(self):
Notifier = getClient(settings.notification.type)
self.notifier = Notifier(
token=settings.notification.token,
chat_id=settings.notification.chat_id
token=settings.notification.token, chat_id=settings.notification.chat_id
)
@staticmethod

View File

@@ -27,15 +27,15 @@ class WecomNotification(RequestContent):
title = "【番剧更新】" + notify.official_title
msg = self.gen_message(notify)
picurl = notify.poster_path
#Default pic to avoid blank in message. Resolution:1068*455
# Default pic to avoid blank in message. Resolution:1068*455
if picurl == "https://mikanani.me":
picurl = "https://article.biliimg.com/bfs/article/d8bcd0408bf32594fd82f27de7d2c685829d1b2e.png"
data = {
"key":self.token,
"key": self.token,
"type": "news",
"title": title,
"msg": msg,
"picurl":picurl
"picurl": picurl,
}
resp = self.post_data(self.notification_url, data)
logger.debug(f"Wecom notification: {resp.status_code}")

View File

@@ -181,6 +181,6 @@ def raw_parser(raw: str) -> Episode | None:
)
if __name__ == '__main__':
if __name__ == "__main__":
title = "[动漫国字幕组&LoliHouse] THE MARGINAL SERVICE - 08 [WebRip 1080p HEVC-10bit AAC][简繁内封字幕]"
print(raw_parser(title))

View File

@@ -16,14 +16,13 @@ class TMDBInfo:
year: str
LANGUAGE = {
"zh": "zh-CN",
"jp": "ja-JP",
"en": "en-US"
}
LANGUAGE = {"zh": "zh-CN", "jp": "ja-JP", "en": "en-US"}
def search_url(e):
return f"https://api.themoviedb.org/3/search/tv?api_key={TMDB_API}&page=1&query={e}&include_adult=false"
def info_url(e, key):
return f"https://api.themoviedb.org/3/tv/{e}?api_key={TMDB_API}&language={LANGUAGE[key]}"
@@ -68,8 +67,9 @@ def tmdb_parser(title, language) -> TMDBInfo | None:
{
"season": s.get("name"),
"air_date": s.get("air_date"),
"poster_path": s.get("poster_path")
} for s in info_content.get("seasons")
"poster_path": s.get("poster_path"),
}
for s in info_content.get("seasons")
]
last_season = get_season(season)
original_title = info_content.get("original_name")
@@ -81,7 +81,7 @@ def tmdb_parser(title, language) -> TMDBInfo | None:
original_title,
season,
last_season,
str(year_number)
str(year_number),
)
else:
return None

View File

@@ -1,6 +1,5 @@
import logging
import ntpath as win_path
import os.path as unix_path
import pathlib
import re
from module.models import EpisodeFile, SubtitleFile
@@ -23,11 +22,16 @@ SUBTITLE_LANG = {
}
def split_path(torrent_path: str) -> str:
if PLATFORM == "Windows":
return win_path.split(torrent_path)[-1]
else:
return unix_path.split(torrent_path)[-1]
def get_path_basename(torrent_path: str) -> str:
"""
Returns the basename of a path string.
:param torrent_path: A string representing a path to a file.
:type torrent_path: str
:return: A string representing the basename of the given path.
:rtype: str
"""
return pathlib.Path(torrent_path).name
def get_group(group_and_title) -> tuple[str | None, str]:
@@ -64,7 +68,7 @@ def torrent_parser(
season: int | None = None,
file_type: str = "media",
) -> EpisodeFile | SubtitleFile:
media_path = split_path(torrent_path)
media_path = get_path_basename(torrent_path)
for rule in RULES:
if torrent_name:
match_obj = re.match(rule, torrent_name, re.I)
@@ -77,7 +81,7 @@ def torrent_parser(
else:
title, _ = get_season_and_title(title)
episode = int(match_obj.group(2))
suffix = unix_path.splitext(torrent_path)[-1]
suffix = pathlib.Path(torrent_path).suffix
if file_type == "media":
return EpisodeFile(
media_path=torrent_path,

View File

@@ -1,20 +1,22 @@
import os
from module.database import BangumiDatabase
from module.models import BangumiData
from module.utils import json_config
def data_migration():
if not os.path.isfile("data/data.json"):
return False
old_data = json_config.load("data/data.json")
infos = old_data["bangumi_info"]
rss_link = old_data["rss_link"]
new_data = []
for info in infos:
new_data.append(BangumiData(**info, rss_link=[rss_link]))
with BangumiDatabase() as database:
database.update_table()
database.insert_list(new_data)
os.remove("data/data.json")
import os
from module.conf import LEGACY_DATA_PATH
from module.database import BangumiDatabase
from module.models import BangumiData
from module.utils import json_config
def data_migration():
if not LEGACY_DATA_PATH.exists():
return False
old_data = json_config.load(LEGACY_DATA_PATH)
infos = old_data["bangumi_info"]
rss_link = old_data["rss_link"]
new_data = []
for info in infos:
new_data.append(BangumiData(**info, rss_link=[rss_link]))
with BangumiDatabase() as database:
database.update_table()
database.insert_list(new_data)
LEGACY_DATA_PATH.unlink(missing_ok=True)