Merge pull request #331 from 100gle/refactor/use-pathlib-to-replace-os-path

feat: use `pathlib` to replace `os.path` for handling file path
This commit is contained in:
zthxxx
2023-07-04 13:02:18 +08:00
committed by GitHub
17 changed files with 124 additions and 77 deletions

8
backend/.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,8 @@
{
"python.formatting.provider": "none",
"python.formatting.blackPath": "black",
"editor.formatOnSave": true,
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
}
}

View File

@@ -14,7 +14,7 @@ async def get_log(current_user=Depends(get_current_user)):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid token"
)
if os.path.isfile(LOG_PATH):
if LOG_PATH.exists():
with open(LOG_PATH, "rb") as f:
return Response(f.read(), media_type="text/plain")
else:
@@ -27,9 +27,8 @@ async def clear_log(current_user=Depends(get_current_user)):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid token"
)
if os.path.isfile(LOG_PATH):
with open(LOG_PATH, "w") as f:
f.write("")
if LOG_PATH.exists():
LOG_PATH.write_text("")
return {"status": "ok"}
else:
return Response("Log file not found", status_code=404)

View File

@@ -1,7 +1,10 @@
from pathlib import Path
from .config import VERSION, settings
from .log import LOG_PATH, setup_logger
TMDB_API = "32b19d6a05b512190a056fa4e747cbbc"
DATA_PATH = "data/data.db"
DATA_PATH = Path("data/data.db")
LEGACY_DATA_PATH = Path("data/data.json")
PLATFORM = "Windows" if "\\" in settings.downloader.path else "Unix"

View File

@@ -1,6 +1,7 @@
import json
import logging
import os
from pathlib import Path
from dotenv import load_dotenv
@@ -9,25 +10,26 @@ from module.models.config import Config
from .const import ENV_TO_ATTR
logger = logging.getLogger(__name__)
CONFIG_ROOT = Path("config")
try:
from module.__version__ import VERSION
if VERSION == "DEV_VERSION":
logger.info("Can't find version info, use DEV_VERSION instead")
CONFIG_PATH = "config/config_dev.json"
else:
CONFIG_PATH = "config/config.json"
except ImportError:
logger.info("Can't find version info, use DEV_VERSION instead")
VERSION = "DEV_VERSION"
CONFIG_PATH = "config/config_dev.json"
CONFIG_PATH = (
CONFIG_ROOT / "config_dev.json"
if VERSION == "DEV_VERSION"
else CONFIG_ROOT / "config.json"
).resolve()
class Settings(Config):
def __init__(self):
super().__init__()
if os.path.exists(CONFIG_PATH):
if CONFIG_PATH.exists():
self.load()
self.save()
else:

View File

@@ -1,17 +1,19 @@
import logging
import os
from pathlib import Path
from .config import settings
LOG_PATH = "data/log.txt"
LOG_ROOT = Path("data")
LOG_PATH = LOG_ROOT / "log.txt"
def setup_logger(level: int = logging.INFO, reset: bool = False):
level = logging.DEBUG if settings.log.debug_enable else level
if not os.path.isdir("data"):
os.mkdir("data")
if reset and os.path.isfile(LOG_PATH):
os.remove(LOG_PATH)
LOG_ROOT.mkdir(exist_ok=True)
if reset and LOG_PATH.exists():
LOG_PATH.unlink(missing_ok=True)
logging.addLevelName(logging.DEBUG, "DEBUG:")
logging.addLevelName(logging.INFO, "INFO:")
logging.addLevelName(logging.WARNING, "WARNING:")

View File

@@ -1,8 +1,8 @@
import asyncio
import os.path
import threading
from module.checker import Checker
from module.conf import LEGACY_DATA_PATH
class ProgramStatus(Checker):
@@ -51,4 +51,4 @@ class ProgramStatus(Checker):
@property
def legacy_data(self):
return os.path.exists("data/data.json")
return LEGACY_DATA_PATH.exists()

View File

@@ -10,8 +10,8 @@ logger = logging.getLogger(__name__)
class DataConnector:
def __init__(self):
# Create folder if not exists
if not os.path.exists(os.path.dirname(DATA_PATH)):
os.makedirs(os.path.dirname(DATA_PATH))
DATA_PATH.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(DATA_PATH)
self._cursor = self._conn.cursor()
@@ -99,10 +99,14 @@ class DataConnector:
def _delete(self, table_name: str, condition: dict):
condition_sql = " AND ".join([f"{key} = :{key}" for key in condition.keys()])
self._cursor.execute(f"DELETE FROM {table_name} WHERE {condition_sql}", condition)
self._cursor.execute(
f"DELETE FROM {table_name} WHERE {condition_sql}", condition
)
self._conn.commit()
def _search(self, table_name: str, keys: list[str] | None = None, condition: dict = None):
def _search(
self, table_name: str, keys: list[str] | None = None, condition: dict = None
):
if keys is None:
select_sql = "*"
else:
@@ -111,20 +115,25 @@ class DataConnector:
self._cursor.execute(f"SELECT {select_sql} FROM {table_name}")
else:
custom_condition = condition.pop("_custom_condition", None)
condition_sql = " AND ".join([f"{key} = :{key}" for key in condition.keys()]) + (
f" AND {custom_condition}" if custom_condition else ""
)
condition_sql = " AND ".join(
[f"{key} = :{key}" for key in condition.keys()]
) + (f" AND {custom_condition}" if custom_condition else "")
self._cursor.execute(
f"SELECT {select_sql} FROM {table_name} WHERE {condition_sql}", condition
f"SELECT {select_sql} FROM {table_name} WHERE {condition_sql}",
condition,
)
def _search_data(self, table_name: str, keys: list[str] | None = None, condition: dict = None) -> dict:
def _search_data(
self, table_name: str, keys: list[str] | None = None, condition: dict = None
) -> dict:
if keys is None:
keys = self.__get_table_columns(table_name)
self._search(table_name, keys, condition)
return dict(zip(keys, self._cursor.fetchone()))
def _search_datas(self, table_name: str, keys: list[str] | None = None, condition: dict = None) -> list[dict]:
def _search_datas(
self, table_name: str, keys: list[str] | None = None, condition: dict = None
) -> list[dict]:
if keys is None:
keys = self.__get_table_columns(table_name)
self._search(table_name, keys, condition)

View File

@@ -1,4 +1,5 @@
import os
from os import PathLike
from pathlib import Path
import sqlite3
from .delete import Delete
@@ -10,10 +11,13 @@ from module.conf import DATA_PATH
class Connector:
def __init__(self, table_name: str, data: dict, database: str = DATA_PATH):
def __init__(
self, table_name: str, data: dict, database: PathLike[str] | Path = DATA_PATH
):
# Create folder if not exists
if not os.path.exists(os.path.dirname(DATA_PATH)):
os.makedirs(os.path.dirname(DATA_PATH))
if isinstance(database, (PathLike, str)):
database = Path(database)
database.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(database)
self._cursor = self._conn.cursor()

View File

@@ -1,13 +1,11 @@
import logging
from os import PathLike
import re
from pathlib import Path
from module.conf import settings
from module.models import BangumiData
if ":\\" in settings.downloader.path:
import ntpath as path
else:
import os.path as path
logger = logging.getLogger(__name__)
@@ -22,7 +20,7 @@ class TorrentPath:
subtitle_list = []
for f in info.files:
file_name = f.name
suffix = path.splitext(file_name)[-1]
suffix = Path(file_name).suffix
if suffix.lower() in [".mp4", ".mkv"]:
media_list.append(file_name)
elif suffix.lower() in [".ass", ".srt"]:
@@ -30,10 +28,10 @@ class TorrentPath:
return media_list, subtitle_list
@staticmethod
def _path_to_bangumi(save_path):
def _path_to_bangumi(save_path: PathLike[str] | str):
# Split save path and download path
save_parts = save_path.split(path.sep)
download_parts = settings.downloader.path.split(path.sep)
save_parts = Path(save_path).parts
download_parts = Path(settings.downloader.path).parts
# Get bangumi name and season
bangumi_name = ""
season = 1
@@ -45,10 +43,10 @@ class TorrentPath:
return bangumi_name, season
@staticmethod
def _file_depth(file_path):
return len(file_path.split(path.sep))
def _file_depth(file_path: PathLike[str] | str):
return len(Path(file_path).parts)
def is_ep(self, file_path):
def is_ep(self, file_path: PathLike[str] | str):
return self._file_depth(file_path) <= 2
@staticmethod
@@ -56,8 +54,8 @@ class TorrentPath:
folder = (
f"{data.official_title} ({data.year})" if data.year else data.official_title
)
save_path = path.join(settings.downloader.path, folder, f"Season {data.season}")
return save_path
save_path = Path(settings.downloader.path) / folder / f"Season {data.season}"
return str(save_path)
@staticmethod
def _rule_name(data: BangumiData):
@@ -70,4 +68,4 @@ class TorrentPath:
@staticmethod
def _join_path(*args):
return path.join(*args)
return str(Path(*args))

View File

@@ -59,5 +59,3 @@ def eps_complete():
sc.collect_season(data)
data.eps_collect = True
bd.update_list(datas)

View File

@@ -31,8 +31,7 @@ class PostNotification:
def __init__(self):
Notifier = getClient(settings.notification.type)
self.notifier = Notifier(
token=settings.notification.token,
chat_id=settings.notification.chat_id
token=settings.notification.token, chat_id=settings.notification.chat_id
)
@staticmethod

View File

@@ -27,15 +27,15 @@ class WecomNotification(RequestContent):
title = "【番剧更新】" + notify.official_title
msg = self.gen_message(notify)
picurl = notify.poster_path
#Default pic to avoid blank in message. Resolution:1068*455
# Default pic to avoid blank in message. Resolution:1068*455
if picurl == "https://mikanani.me":
picurl = "https://article.biliimg.com/bfs/article/d8bcd0408bf32594fd82f27de7d2c685829d1b2e.png"
data = {
"key":self.token,
"key": self.token,
"type": "news",
"title": title,
"msg": msg,
"picurl":picurl
"picurl": picurl,
}
resp = self.post_data(self.notification_url, data)
logger.debug(f"Wecom notification: {resp.status_code}")

View File

@@ -181,6 +181,6 @@ def raw_parser(raw: str) -> Episode | None:
)
if __name__ == '__main__':
if __name__ == "__main__":
title = "[动漫国字幕组&LoliHouse] THE MARGINAL SERVICE - 08 [WebRip 1080p HEVC-10bit AAC][简繁内封字幕]"
print(raw_parser(title))

View File

@@ -16,14 +16,13 @@ class TMDBInfo:
year: str
LANGUAGE = {
"zh": "zh-CN",
"jp": "ja-JP",
"en": "en-US"
}
LANGUAGE = {"zh": "zh-CN", "jp": "ja-JP", "en": "en-US"}
def search_url(e):
return f"https://api.themoviedb.org/3/search/tv?api_key={TMDB_API}&page=1&query={e}&include_adult=false"
def info_url(e, key):
return f"https://api.themoviedb.org/3/tv/{e}?api_key={TMDB_API}&language={LANGUAGE[key]}"
@@ -68,8 +67,9 @@ def tmdb_parser(title, language) -> TMDBInfo | None:
{
"season": s.get("name"),
"air_date": s.get("air_date"),
"poster_path": s.get("poster_path")
} for s in info_content.get("seasons")
"poster_path": s.get("poster_path"),
}
for s in info_content.get("seasons")
]
last_season = get_season(season)
original_title = info_content.get("original_name")
@@ -81,7 +81,7 @@ def tmdb_parser(title, language) -> TMDBInfo | None:
original_title,
season,
last_season,
str(year_number)
str(year_number),
)
else:
return None

View File

@@ -1,6 +1,5 @@
import logging
import ntpath as win_path
import os.path as unix_path
from pathlib import Path
import re
from module.models import EpisodeFile, SubtitleFile
@@ -23,11 +22,16 @@ SUBTITLE_LANG = {
}
def split_path(torrent_path: str) -> str:
if PLATFORM == "Windows":
return win_path.split(torrent_path)[-1]
else:
return unix_path.split(torrent_path)[-1]
def get_path_basename(torrent_path: str) -> str:
"""
Returns the basename of a path string.
:param torrent_path: A string representing a path to a file.
:type torrent_path: str
:return: A string representing the basename of the given path.
:rtype: str
"""
return Path(torrent_path).name
def get_group(group_and_title) -> tuple[str | None, str]:
@@ -64,7 +68,7 @@ def torrent_parser(
season: int | None = None,
file_type: str = "media",
) -> EpisodeFile | SubtitleFile:
media_path = split_path(torrent_path)
media_path = get_path_basename(torrent_path)
for rule in RULES:
if torrent_name:
match_obj = re.match(rule, torrent_name, re.I)
@@ -77,7 +81,7 @@ def torrent_parser(
else:
title, _ = get_season_and_title(title)
episode = int(match_obj.group(2))
suffix = unix_path.splitext(torrent_path)[-1]
suffix = Path(torrent_path).suffix
if file_type == "media":
return EpisodeFile(
media_path=torrent_path,

View File

@@ -1,14 +1,15 @@
import os
from module.conf import LEGACY_DATA_PATH
from module.database import BangumiDatabase
from module.models import BangumiData
from module.utils import json_config
def data_migration():
if not os.path.isfile("data/data.json"):
if not LEGACY_DATA_PATH.exists():
return False
old_data = json_config.load("data/data.json")
old_data = json_config.load(LEGACY_DATA_PATH)
infos = old_data["bangumi_info"]
rss_link = old_data["rss_link"]
new_data = []
@@ -17,4 +18,5 @@ def data_migration():
with BangumiDatabase() as database:
database.update_table()
database.insert_list(new_data)
os.remove("data/data.json")
LEGACY_DATA_PATH.unlink(missing_ok=True)

View File

@@ -1,4 +1,8 @@
import sys
import pytest
from module.parser.analyser import torrent_parser
from module.parser.analyser.torrent_parser import get_path_basename
def test_torrent_parser():
@@ -67,3 +71,18 @@ def test_torrent_parser():
assert bf.title == "放学后失眠的你-Kimi wa Houkago Insomnia"
assert bf.season == 1
assert bf.episode == 6
class TestGetPathBasename:
def test_regular_path(self):
assert get_path_basename("/path/to/file.txt") == "file.txt"
def test_empty_path(self):
assert get_path_basename("") == ""
def test_path_with_trailing_slash(self):
assert get_path_basename("/path/to/folder/") == "folder"
@pytest.mark.skipif(not sys.platform.startswith("win"), reason="Windows specific")
def test_windows_path(self):
assert get_path_basename("C:\\path\\to\\file.txt") == "file.txt"