- Rewrite config class

- Change multiprocessing to thread
- black all code
This commit is contained in:
EstrellaXD
2023-05-06 23:47:44 +08:00
parent d7a41d7404
commit 5c71b9d35d
38 changed files with 400 additions and 313 deletions

View File

@@ -89,7 +89,9 @@ if VERSION != "DEV_VERSION":
def index(request: Request):
context = {"request": request}
return templates.TemplateResponse("index.html", context)
else:
@router.get("/", status_code=302, tags=["html"])
def index():
return RedirectResponse("/docs")
@@ -97,6 +99,9 @@ else:
if __name__ == "__main__":
log_config = uvicorn.config.LOGGING_CONFIG
log_config["formatters"]["default"]["fmt"] = "[%(asctime)s] %(levelname)-8s %(message)s"
uvicorn.run(router, host="0.0.0.0", port=settings.program.webui_port, log_config=log_config)
log_config["formatters"]["default"][
"fmt"
] = "[%(asctime)s] %(levelname)-8s %(message)s"
uvicorn.run(
router, host="0.0.0.0", port=settings.program.webui_port, log_config=log_config
)

View File

@@ -16,6 +16,7 @@ def qb_connect_failed_wait(func):
logger.warning("Cannot connect to qBittorrent. Wait 5 min and retry...")
time.sleep(300)
times += 1
return wrapper
@@ -27,4 +28,5 @@ def api_failed(func):
logger.debug(f"URL: {args[0]}")
logger.warning("Wrong API response.")
logger.debug(e)
return wrapper

View File

@@ -111,4 +111,3 @@ async def get_rss(full_path: str):
async def download(full_path: str):
torrent = api_func.get_torrent(full_path)
return Response(torrent, media_type="application/x-bittorrent")

View File

@@ -1,6 +1,7 @@
import os
import time
import logging
import asyncio
from module.conf import setup_logger, LOG_PATH, RSSLink, VERSION
@@ -18,45 +19,33 @@ def reset_log():
os.remove(LOG_PATH)
def main_process(rss_link: str, download_client: DownloadClient, _settings: Config):
rename = Renamer(download_client, _settings)
rss_analyser = RSSAnalyser(_settings)
while True:
times = 0
if _settings.rss_parser.enable:
extra_data = rss_analyser.run(rss_link=rss_link)
download_client.add_rules(extra_data, rss_link)
if _settings.bangumi_manage.eps_complete:
FullSeasonGet(settings=_settings).eps_complete(download_client)
logger.info("Running....")
while times < _settings.program.rename_times:
if _settings.bangumi_manage.enable:
rename.rename()
times += 1
time.sleep(_settings.program.sleep_time / _settings.program.rename_times)
async def rss_loop(
rss_link: str,
rss_analyser: RSSAnalyser,
download_client: DownloadClient,
season_get: FullSeasonGet,
eps_complete: bool = False,
wait_time: int = 7200,
):
datas = rss_analyser.run(rss_link)
if datas:
download_client.add_rules(datas, rss_link)
if eps_complete:
season_get.eps_complete(datas, download_client)
await asyncio.sleep(wait_time)
async def rename_loop(renamer: Renamer, wait_time: int = 360):
renamer.rename()
await asyncio.sleep(wait_time)
def show_info():
with open("icon", "r") as f:
for line in f.readlines():
logger.info(line.strip("\n"))
logger.info(f"Version {VERSION} Author: EstrellaXD Twitter: https://twitter.com/Estrella_Pan")
logger.info(
f"Version {VERSION} Author: EstrellaXD Twitter: https://twitter.com/Estrella_Pan"
)
logger.info("GitHub: https://github.com/EstrellaXD/Auto_Bangumi/")
logger.info("Starting AutoBangumi...")
def run(settings: Config):
# 初始化
rss_link = RSSLink()
reset_log()
setup_logger()
show_info()
if settings.rss_parser.token in ["", "token", None]:
logger.error("Please set your RSS token in config file.")
exit(1)
download_client = DownloadClient(settings)
download_client.auth()
download_client.init_downloader()
download_client.rss_feed(rss_link)
# 主程序循环
main_process(rss_link, download_client, settings)

View File

@@ -1,17 +1,8 @@
from .log import setup_logger, LOG_PATH
from .config import settings, VERSION
from .config import VERSION, settings
TMDB_API = "32b19d6a05b512190a056fa4e747cbbc"
DATA_PATH = "data/data.db"
class RSSLink(str):
def __new__(cls):
if "://" not in settings.rss_parser.custom_url:
return f"https://{settings.rss_parser.custom_url}/RSS/MyBangumi?token={settings.rss_parser.token}"
return f"{settings.rss_parser.custom_url}/RSS/MyBangumi?token={settings.rss_parser.token}"
PLATFORM = "Windows" if "\\" in settings.downloader.path else "Unix"
MIKANANI_URL = "mikanani.me"

View File

@@ -3,80 +3,82 @@ import os
import logging
from dotenv import load_dotenv
from module.conf.const import ENV_TO_ATTR
from .const import ENV_TO_ATTR
from module.models.config import Config
logger = logging.getLogger(__name__)
try:
from ..__version__ import VERSION
from module.__version__ import VERSION
if VERSION == "DEV_VERSION":
logger.info("Can't find version info, use DEV_VERSION instead")
CONFIG_PATH = "config/config_dev.json"
else:
CONFIG_PATH = f"config/config.json"
except ImportError:
logger.info("Can't find version info, use DEV_VERSION instead")
VERSION = "DEV_VERSION"
CONFIG_PATH = "config/config_dev.json"
class Setting(Config):
@staticmethod
def reload():
load_config_from_file(CONFIG_PATH)
class Settings(Config):
def __init__(self):
super().__init__()
if os.path.exists(CONFIG_PATH):
self.load()
self.save()
else:
# load from env
load_dotenv(".env")
self.__load_from_env()
self.save()
def load(self):
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
config = json.load(f)
config_obj = Config.parse_obj(config)
self.__dict__.update(config_obj.__dict__)
logger.info(f"Config loaded")
def save(self):
save_config_to_file(self, CONFIG_PATH)
def save_config_to_file(config: Config, path: str):
config_dict = config.dict()
with open(path, "w", encoding="utf-8") as f:
json.dump(config_dict, f, indent=4)
logger.info(f"Config saved")
def load_config_from_file(path: str) -> Config:
with open(path, "r", encoding="utf-8") as f:
config = json.load(f)
return Setting(**config)
def _val_from_env(env: str, attr: tuple):
if isinstance(attr, tuple):
conv_func = attr[1]
return conv_func(os.environ[env])
else:
return os.environ[env]
def env_to_config() -> Setting:
_settings = Setting().dict()
for key, section in ENV_TO_ATTR.items():
for env, attr in section.items():
if env in os.environ:
if isinstance(attr, list):
for _attr in attr:
attr_name = _attr[0] if isinstance(_attr, tuple) else _attr
_settings[key][attr_name] = _val_from_env(env, _attr)
else:
attr_name = attr[0] if isinstance(attr, tuple) else attr
_settings[key][attr_name] = _val_from_env(env, attr)
return Setting(**_settings)
if os.path.isdir("config") and VERSION == "DEV_VERSION":
CONFIG_PATH = "config/config_dev.json"
if os.path.isfile(CONFIG_PATH):
settings = load_config_from_file(CONFIG_PATH)
else:
load_dotenv(".env")
settings = env_to_config()
save_config_to_file(settings, CONFIG_PATH)
elif os.path.isdir("config") and VERSION != "DEV_VERSION":
CONFIG_PATH = "config/config.json"
if os.path.isfile(CONFIG_PATH):
settings = load_config_from_file(CONFIG_PATH)
else:
settings = env_to_config()
save_config_to_file(settings, CONFIG_PATH)
else:
settings = Setting()
config_dict = self.dict()
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(config_dict, f, indent=4)
logger.info(f"Config saved")
def rss_link(self):
if "://" not in self.rss_parser.custom_url:
return f"https://{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}"
return (
f"{self.rss_parser.custom_url}/RSS/MyBangumi?token={self.rss_parser.token}"
)
def __load_from_env(self):
config_dict = self.dict()
for key, section in ENV_TO_ATTR.items():
for env, attr in section.items():
if env in os.environ:
if isinstance(attr, list):
for _attr in attr:
attr_name = _attr[0] if isinstance(_attr, tuple) else _attr
config_dict[key][attr_name] = self.__val_from_env(
env, _attr
)
else:
attr_name = attr[0] if isinstance(attr, tuple) else attr
config_dict[key][attr_name] = self.__val_from_env(env, attr)
config_obj = Config.parse_obj(config_dict)
self.__dict__.update(config_obj.__dict__)
logger.info(f"Config loaded from env")
@staticmethod
def __val_from_env(env: str, attr: tuple):
if isinstance(attr, tuple):
conv_func = attr[1]
return conv_func(os.environ[env])
else:
return os.environ[env]
settings = Settings()

View File

@@ -7,7 +7,7 @@ DEFAULT_SETTINGS = {
"sleep_time": 7200,
"times": 20,
"webui_port": 7892,
"data_version": 4.0
"data_version": 4.0,
},
"downloader": {
"type": "qbittorrent",
@@ -15,7 +15,7 @@ DEFAULT_SETTINGS = {
"username": "admin",
"password": "adminadmin",
"path": "/downloads/Bangumi",
"ssl": False
"ssl": False,
},
"rss_parser": {
"enable": True,
@@ -24,14 +24,14 @@ DEFAULT_SETTINGS = {
"token": "",
"enable_tmdb": False,
"filter": ["720", "\\d+-\\d+"],
"language": "zh"
"language": "zh",
},
"bangumi_manage": {
"enable": True,
"eps_complete": False,
"rename_method": "pn",
"group_tag": False,
"remove_bad_torrent": False
"remove_bad_torrent": False,
},
"log": {
"debug_enable": False,
@@ -42,14 +42,9 @@ DEFAULT_SETTINGS = {
"host": "",
"port": 1080,
"username": "",
"password": ""
"password": "",
},
"notification": {
"enable": False,
"type": "telegram",
"token": "",
"chat_id": ""
}
"notification": {"enable": False, "type": "telegram", "token": "", "chat_id": ""},
}
@@ -80,7 +75,10 @@ ENV_TO_ATTR = {
"AB_METHOD": ("rename_method", lambda e: e.lower()),
"AB_GROUP_TAG": ("group_tag", lambda e: e.lower() in ("true", "1", "t")),
"AB_EP_COMPLETE": ("eps_complete", lambda e: e.lower() in ("true", "1", "t")),
"AB_REMOVE_BAD_BT": ("remove_bad_torrent", lambda e: e.lower() in ("true", "1", "t")),
"AB_REMOVE_BAD_BT": (
"remove_bad_torrent",
lambda e: e.lower() in ("true", "1", "t"),
),
},
"log": {
"AB_DEBUG_MODE": ("debug_enable", lambda e: e.lower() in ("true", "1", "t")),

View File

@@ -1,15 +1,15 @@
import logging
from .config import settings
from module.models import Config
LOG_PATH = "data/log.txt"
def setup_logger():
def setup_logger(settings: Config):
level = logging.DEBUG if settings.log.debug_enable else logging.INFO
logging.addLevelName(logging.DEBUG, 'DEBUG:')
logging.addLevelName(logging.INFO, 'INFO:')
logging.addLevelName(logging.WARNING, 'WARNING:')
logging.addLevelName(logging.DEBUG, "DEBUG:")
logging.addLevelName(logging.INFO, "INFO:")
logging.addLevelName(logging.WARNING, "WARNING:")
LOGGING_FORMAT = "[%(asctime)s] %(levelname)-8s %(message)s"
logging.basicConfig(
level=level,
@@ -18,5 +18,5 @@ def setup_logger():
handlers=[
logging.FileHandler(LOG_PATH, encoding="utf-8"),
logging.StreamHandler(),
]
],
)

View File

@@ -12,4 +12,4 @@ def parse():
)
parser.add_argument("-d", "--debug", action="store_true", help="debug mode")
return parser.parse_args()
return parser.parse_args()

View File

@@ -38,7 +38,12 @@ class DownloadClient:
self.download_path = os.path.join(prefs["save_path"], "Bangumi")
def set_rule(self, info: BangumiData, rss_link):
official_name, raw_name, season, group = info.official_title, info.title_raw, info.season, info.group
official_name, raw_name, season, group = (
info.official_title,
info.title_raw,
info.season,
info.group,
)
rule = {
"enable": True,
"mustContain": raw_name,
@@ -81,9 +86,7 @@ class DownloadClient:
logger.debug("Finished.")
def get_torrent_info(self, category="Bangumi"):
return self.client.torrents_info(
status_filter="completed", category=category
)
return self.client.torrents_info(status_filter="completed", category=category)
def rename_torrent_file(self, _hash, old_path, new_path):
self.client.torrents_rename_file(
@@ -92,23 +95,16 @@ class DownloadClient:
logger.info(f"{old_path} >> {new_path}")
def delete_torrent(self, hashes):
self.client.torrents_delete(
hashes
)
self.client.torrents_delete(hashes)
logger.info(f"Remove bad torrents.")
def add_torrent(self, torrent: dict):
self.client.torrents_add(
urls=torrent["url"],
save_path=torrent["save_path"],
category="Bangumi"
urls=torrent["url"], save_path=torrent["save_path"], category="Bangumi"
)
def move_torrent(self, hashes, location):
self.client.move_torrent(
hashes=hashes,
new_location=location
)
self.client.move_torrent(hashes=hashes, new_location=location)
def add_rss_feed(self, rss_link, item_path):
self.client.rss_add_feed(url=rss_link, item_path=item_path)
@@ -122,4 +118,3 @@ class DownloadClient:
def set_category(self, hashes, category):
self.client.set_category(hashes, category)

View File

@@ -14,8 +14,8 @@ class RSSFilter:
self.filter_rule = json_config.load(settings.filter_rule)
def filter(self, item: xml.etree.ElementTree.Element) -> Tuple[bool, str]:
title = item.find('title').text
torrent = item.find("enclosure").attrib['url']
title = item.find("title").text
torrent = item.find("enclosure").attrib["url"]
download = False
for rule in self.filter_rule:
if re.search(rule["include"], title):

View File

@@ -1 +1 @@
from .operator import DataOperator
from .operator import DataOperator

View File

@@ -37,4 +37,3 @@ class DataConnector:
def __exit__(self, exc_type, exc_val, exc_tb):
self._conn.close()

View File

@@ -27,7 +27,8 @@ class DataOperator(DataConnector):
def insert(self, data: BangumiData):
db_data = self.data_to_db(data)
self._cursor.execute('''
self._cursor.execute(
"""
INSERT INTO bangumi (
id,
official_title,
@@ -57,12 +58,15 @@ class DataOperator(DataConnector):
:filter,
:rss
)
''', db_data)
""",
db_data,
)
self._conn.commit()
def insert_list(self, data: list[BangumiData]):
db_data = [self.data_to_db(x) for x in data]
self._cursor.executemany('''
self._cursor.executemany(
"""
INSERT INTO bangumi (
id,
official_title,
@@ -92,12 +96,15 @@ class DataOperator(DataConnector):
:filter,
:rss
)
''', db_data)
""",
db_data,
)
self._conn.commit()
def update(self, data: BangumiData) -> bool:
db_data = self.data_to_db(data)
self._cursor.execute('''
self._cursor.execute(
"""
UPDATE bangumi SET
official_title = :official_title,
title_raw = :title_raw,
@@ -111,14 +118,19 @@ class DataOperator(DataConnector):
offset = :offset,
filter = :filter
WHERE id = :id
''', db_data)
""",
db_data,
)
self._conn.commit()
return self._cursor.rowcount == 1
def search_id(self, _id: int) -> BangumiData | None:
self._cursor.execute('''
self._cursor.execute(
"""
SELECT * FROM bangumi WHERE id = :id
''', {"id": _id})
""",
{"id": _id},
)
values = self._cursor.fetchone()
if values is None:
return None
@@ -127,9 +139,12 @@ class DataOperator(DataConnector):
return self.db_to_data(dict_data)
def search_official_title(self, official_title: str) -> BangumiData | None:
self._cursor.execute('''
self._cursor.execute(
"""
SELECT * FROM bangumi WHERE official_title = :official_title
''', {"official_title": official_title})
""",
{"official_title": official_title},
)
values = self._cursor.fetchone()
if values is None:
return None
@@ -139,9 +154,11 @@ class DataOperator(DataConnector):
def match_title(self, title: str) -> bool:
# Select all title_raw
self._cursor.execute('''
self._cursor.execute(
"""
SELECT title_raw FROM bangumi
''')
"""
)
title_raws = [x[0] for x in self._cursor.fetchall()]
# Match title
for title_raw in title_raws:
@@ -151,9 +168,11 @@ class DataOperator(DataConnector):
def not_exist_titles(self, titles: list[str]) -> list[str]:
# Select all title_raw
self._cursor.execute('''
self._cursor.execute(
"""
SELECT title_raw FROM bangumi
''')
"""
)
title_raws = [x[0] for x in self._cursor.fetchall()]
# Match title
for title_raw in title_raws:
@@ -163,13 +182,14 @@ class DataOperator(DataConnector):
return titles
def gen_id(self) -> int:
self._cursor.execute('''
self._cursor.execute(
"""
SELECT id FROM bangumi ORDER BY id DESC LIMIT 1
''')
"""
)
return self._cursor.fetchone()[0] + 1
if __name__ == '__main__':
if __name__ == "__main__":
with DataOperator() as op:
pass

View File

@@ -10,6 +10,7 @@ def getClient(settings: Config):
ssl = settings.downloader.ssl
if type == "qbittorrent":
from module.downloader.client.qb_downloader import QbDownloader
return QbDownloader(host, username, password, ssl)
else:
raise Exception(f"Unsupported downloader type: {type}")

View File

@@ -15,13 +15,7 @@ class QbDownloader:
def __init__(self, host, username, password):
while True:
try:
self._client = API(
Client(
host=host,
port=6800,
secret=password
)
)
self._client = API(Client(host=host, port=6800, secret=password))
break
except ClientException:
logger.warning(
@@ -35,4 +29,4 @@ class QbDownloader:
torrent_file_path=urls,
save_path=save_path,
category=category,
)
)

View File

@@ -16,7 +16,7 @@ class QbDownloader:
host=host,
username=username,
password=password,
VERIFY_WEBUI_CERTIFICATE=ssl
VERIFY_WEBUI_CERTIFICATE=ssl,
)
self.host = host
self.username = username
@@ -57,13 +57,12 @@ class QbDownloader:
)
def torrents_delete(self, hash):
return self._client.torrents_delete(
delete_files=True,
torrent_hashes=hash
)
return self._client.torrents_delete(delete_files=True, torrent_hashes=hash)
def torrents_rename_file(self, torrent_hash, old_path, new_path):
self._client.torrents_rename_file(torrent_hash=torrent_hash, old_path=old_path, new_path=new_path)
self._client.torrents_rename_file(
torrent_hash=torrent_hash, old_path=old_path, new_path=new_path
)
def check_rss(self, url, item_path) -> tuple[str | None, bool]:
items = self._client.rss_items()

View File

@@ -12,8 +12,19 @@ logger = logging.getLogger(__name__)
class FullSeasonGet:
def __init__(self, settings: Config):
self.SEARCH_KEY = ["group", "title_raw", "season_raw", "subtitle", "source", "dpi"]
self.CUSTOM_URL = "https://mikanani.me" if settings.rss_parser.custom_url == "" else settings.rss_parser.custom_url
self.SEARCH_KEY = [
"group",
"title_raw",
"season_raw",
"subtitle",
"source",
"dpi",
]
self.CUSTOM_URL = (
"https://mikanani.me"
if settings.rss_parser.custom_url == ""
else settings.rss_parser.custom_url
)
if "://" not in self.CUSTOM_URL:
if re.match(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", self.CUSTOM_URL):
self.CUSTOM_URL = f"http://{self.CUSTOM_URL}"
@@ -33,7 +44,9 @@ class FullSeasonGet:
def get_season_torrents(self, data: BangumiData):
keyword = self.init_eps_complete_search_str(data)
with RequestContent() as req:
torrents = req.get_torrents(f"{self.CUSTOM_URL}/RSS/Search?searchstr={keyword}")
torrents = req.get_torrents(
f"{self.CUSTOM_URL}/RSS/Search?searchstr={keyword}"
)
return [torrent for torrent in torrents if data.title_raw in torrent.name]
def collect_season_torrents(self, data: BangumiData, torrents):
@@ -42,14 +55,13 @@ class FullSeasonGet:
download_info = {
"url": torrent.torrent_link,
"save_path": os.path.join(
self.save_path,
data.official_title,
f"Season {data.season}")
self.save_path, data.official_title, f"Season {data.season}"
),
}
downloads.append(download_info)
return downloads
def download_eps(self, data: BangumiData, download_client: DownloadClient):
def download_season(self, data: BangumiData, download_client: DownloadClient):
logger.info(f"Start collecting {data.official_title} Season {data.season}...")
torrents = self.get_season_torrents(data)
downloads = self.collect_season_torrents(data, torrents)
@@ -58,12 +70,14 @@ class FullSeasonGet:
logger.info("Completed!")
data.eps_collect = False
def eps_complete(self, bangumi_info: list[BangumiData], download_client: DownloadClient):
for data in bangumi_info:
def eps_complete(self, datas: list[BangumiData], download_client: DownloadClient):
for data in datas:
if data.eps_collect:
self.download_eps(data, download_client)
self.download_season(data, download_client)
def download_collection(self, data: BangumiData, link, download_client: DownloadClient):
def download_collection(
self, data: BangumiData, link, download_client: DownloadClient
):
with RequestContent() as req:
torrents = req.get_torrents(link)
downloads = self.collect_season_torrents(data, torrents)
@@ -71,5 +85,3 @@ class FullSeasonGet:
for download in downloads:
download_client.add_torrent(download)
logger.info("Completed!")

View File

@@ -22,7 +22,9 @@ class Renamer:
@staticmethod
def print_result(torrent_count, rename_count):
if rename_count != 0:
logger.info(f"Finished checking {torrent_count} files' name, renamed {rename_count} files.")
logger.info(
f"Finished checking {torrent_count} files' name, renamed {rename_count} files."
)
logger.debug(f"Checked {torrent_count} files")
def get_torrent_info(self, category="Bangumi"):
@@ -43,7 +45,15 @@ class Renamer:
subtitle_list.append(file_name)
return media_list, subtitle_list
def rename_file(self, info, media_path: str, method: str, bangumi_name: str, season: int, remove_bad_torrents: bool):
def rename_file(
self,
info,
media_path: str,
method: str,
bangumi_name: str,
season: int,
remove_bad_torrents: bool,
):
torrent_name = info.name
suffix = os.path.splitext(media_path)[-1]
compare_name = self.get_file_name(media_path)
@@ -52,20 +62,32 @@ class Renamer:
bangumi_name=bangumi_name,
season=season,
suffix=suffix,
method=method
method=method,
)
if compare_name != new_path:
try:
self._client.rename_torrent_file(_hash=info.hash, old_path=media_path, new_path=new_path)
self._client.rename_torrent_file(
_hash=info.hash, old_path=media_path, new_path=new_path
)
self._notification.send_msg(bangumi_name, f"{new_path}已经更新,已自动重命名。")
except Exception as e:
logger.warning(f"{torrent_name} rename failed")
logger.warning(f"Season name: {bangumi_name}, Season: {season}, Suffix: {suffix}")
logger.warning(
f"Season name: {bangumi_name}, Season: {season}, Suffix: {suffix}"
)
logger.debug(e)
# Delete bad torrent
self.delete_bad_torrent(info, remove_bad_torrents)
def rename_collection(self, info, media_list: list[str], bangumi_name: str, season: int, remove_bad_torrents: bool, method: str):
def rename_collection(
self,
info,
media_list: list[str],
bangumi_name: str,
season: int,
remove_bad_torrents: bool,
method: str,
):
_hash = info.hash
for media_path in media_list:
path_len = len(media_path.split(os.path.sep))
@@ -77,26 +99,30 @@ class Renamer:
bangumi_name=bangumi_name,
season=season,
suffix=suffix,
method=method
method=method,
)
if torrent_name != new_name:
try:
self._client.rename_torrent_file(_hash=_hash, old_path=media_path, new_path=new_name)
self._client.rename_torrent_file(
_hash=_hash, old_path=media_path, new_path=new_name
)
except Exception as e:
logger.warning(f"{torrent_name} rename failed")
logger.warning(f"Bangumi name: {bangumi_name}, Season: {season}, Suffix: {suffix}")
logger.warning(
f"Bangumi name: {bangumi_name}, Season: {season}, Suffix: {suffix}"
)
logger.debug(e)
# Delete bad torrent.
self.delete_bad_torrent(info, remove_bad_torrents)
self._client.set_category(category="BangumiCollection", hashes=_hash)
def rename_subtitles(
self,
subtitle_list: list[str],
bangumi_name: str,
season: int,
method: str,
_hash
self,
subtitle_list: list[str],
bangumi_name: str,
season: int,
method: str,
_hash,
):
method = "subtitle_" + method
for subtitle_path in subtitle_list:
@@ -107,11 +133,13 @@ class Renamer:
torrent_name=old_name,
bangumi_name=bangumi_name,
season=season,
suffix=suffix
suffix=suffix,
)
if old_name != new_name:
try:
self._client.rename_torrent_file(_hash=_hash, old_path=subtitle_path, new_path=new_name)
self._client.rename_torrent_file(
_hash=_hash, old_path=subtitle_path, new_path=new_name
)
except Exception as e:
logger.warning(f"{old_name} rename failed")
logger.warning(f"Suffix: {suffix}")
@@ -127,11 +155,17 @@ class Renamer:
# Remove default save path
save_path = save_path.replace(download_path, "")
# Check windows or linux path
path_parts = PurePath(save_path).parts \
if PurePath(save_path).name != save_path \
path_parts = (
PurePath(save_path).parts
if PurePath(save_path).name != save_path
else PureWindowsPath(save_path).parts
)
# Get folder name
folder_name = path_parts[1] if path_parts[0] == "/" or path_parts[0] == "\\" else path_parts[0]
folder_name = (
path_parts[1]
if path_parts[0] == "/" or path_parts[0] == "\\"
else path_parts[0]
)
# Get season
try:
if re.search(r"S\d{1,2}|[Ss]eason", path_parts[-1]) is not None:
@@ -147,9 +181,11 @@ class Renamer:
@staticmethod
def get_file_name(file_path: str):
# Check windows or linux path
path_parts = PurePath(file_path).parts \
if PurePath(file_path).name != file_path \
path_parts = (
PurePath(file_path).parts
if PurePath(file_path).name != file_path
else PureWindowsPath(file_path).parts
)
# Get file name
file_name = path_parts[-1]
return file_name
@@ -170,7 +206,7 @@ class Renamer:
method=rename_method,
bangumi_name=bangumi_name,
season=season,
remove_bad_torrents=remove_bad_torrents
remove_bad_torrents=remove_bad_torrents,
)
if len(subtitle_list) > 0:
self.rename_subtitles(
@@ -178,7 +214,7 @@ class Renamer:
bangumi_name=bangumi_name,
season=season,
method=rename_method,
_hash=info.hash
_hash=info.hash,
)
elif len(media_list) > 1:
logger.info("Start rename collection")
@@ -188,7 +224,7 @@ class Renamer:
bangumi_name=bangumi_name,
season=season,
remove_bad_torrents=remove_bad_torrents,
method=rename_method
method=rename_method,
)
if len(subtitle_list) > 0:
self.rename_subtitles(
@@ -196,8 +232,7 @@ class Renamer:
bangumi_name=bangumi_name,
season=season,
method=rename_method,
_hash=info.hash
_hash=info.hash,
)
else:
logger.warning(f"{info.name} has no media file")

View File

@@ -45,7 +45,9 @@ class RepathTorrents:
path = rules.get(rule).savePath
must_contain = rules.get(rule).mustContain
season, folder_name = self.analyse_path(path)
new_path = PurePath(settings.downloader.path, folder_name, f"Season {season}").__str__()
new_path = PurePath(
settings.downloader.path, folder_name, f"Season {season}"
).__str__()
all_rule.append(RuleInfo(rule, must_contain, season, folder_name, new_path))
return all_rule
@@ -62,7 +64,9 @@ class RepathTorrents:
break
return different_data
def get_matched_torrents_list(self, repath_rules: list[RuleInfo]) -> list[RepathInfo]:
def get_matched_torrents_list(
self, repath_rules: list[RuleInfo]
) -> list[RepathInfo]:
infos = self._client.get_torrent_info()
repath_list = []
for rule in repath_rules:

View File

@@ -16,4 +16,3 @@ class ChangeConfig(BaseModel):
class ChangeRule(BaseModel):
rule: dict

View File

@@ -67,6 +67,3 @@ class SeasonInfo(dict):
subtitle: str
added: bool
eps_collect: bool

View File

@@ -57,7 +57,6 @@ class Notification(BaseModel):
class Config(BaseModel):
data_version: float = Field(5.0, description="Data version")
program: Program = Program()
downloader: Downloader = Downloader()
rss_parser: RSSParser = RSSParser()

View File

@@ -9,4 +9,4 @@ class TorrentInfo(BaseModel):
class FileSet(BaseModel):
media_path: str = Field(...)
sc_subtitle: str | None = Field(None)
tc_subtitle: str | None = Field(None)
tc_subtitle: str | None = Field(None)

View File

@@ -50,6 +50,7 @@ class TelegramNotification:
class ServerChanNotification:
"""Server酱推送"""
def __init__(self):
self.token = settings.notification.token
self.notification_url = f"https://sctapi.ftqq.com/{self.token}.send"
@@ -71,11 +72,7 @@ class BarkNotification:
self.notification_url = "https://api.day.app/push"
def send_msg(self, title: str, desp: str):
data = {
"title": title,
"body": desp,
"device_key": self.token
}
data = {"title": title, "body": desp, "device_key": self.token}
with RequestContent() as req:
resp = req.post_data(self.notification_url, data)
logger.debug(f"Bark notification: {resp.status_code}")

View File

@@ -25,7 +25,7 @@ class RequestContent(RequestURL):
for item in soup.findall("./channel/item"):
torrent_titles.append(item.find("title").text)
torrent_urls.append(item.find("enclosure").attrib['url'])
torrent_urls.append(item.find("enclosure").attrib["url"])
torrent_homepage.append(item.find("link").text)
torrents = []
@@ -39,11 +39,11 @@ class RequestContent(RequestURL):
def get_poster(self, _url):
content = self.get_html(_url).text
soup = BeautifulSoup(content, 'html.parser')
div = soup.find('div', {'class': 'bangumi-poster'})
style = div.get('style')
soup = BeautifulSoup(content, "html.parser")
div = soup.find("div", {"class": "bangumi-poster"})
style = div.get("style")
if style:
return style.split('url(\'')[1].split('\')')[0]
return style.split("url('")[1].split("')")[0]
return None
def get_xml(self, _url) -> xml.etree.ElementTree.Element:
@@ -64,4 +64,3 @@ class RequestContent(RequestURL):
def get_content(self, _url):
return self.get_url(_url).content

View File

@@ -12,10 +12,7 @@ logger = logging.getLogger(__name__)
class RequestURL:
def __init__(self):
self.header = {
"user-agent": "Mozilla/5.0",
"Accept": "application/xml"
}
self.header = {"user-agent": "Mozilla/5.0", "Accept": "application/xml"}
def get_url(self, url):
try_time = 0
@@ -64,12 +61,16 @@ class RequestURL:
"http": url,
}
elif settings.proxy.type == "socks5":
socks.set_default_proxy(socks.SOCKS5, addr=settings.proxy.host, port=settings.proxy.port, rdns=True,
username=settings.proxy.username, password=settings.proxy.password)
socks.set_default_proxy(
socks.SOCKS5,
addr=settings.proxy.host,
port=settings.proxy.port,
rdns=True,
username=settings.proxy.username,
password=settings.proxy.password,
)
socket.socket = socks.socksocket
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.session.close()

View File

@@ -1,4 +1,3 @@
from .raw_parser import raw_parser
from .torrent_parser import torrent_parser
from .tmdb_parser import TMDBMatcher

View File

@@ -3,10 +3,8 @@ from module.network import RequestContent
class BgmAPI:
def __init__(self):
self.search_url = lambda e: \
f"https://api.bgm.tv/search/subject/{e}?type=2"
self.info_url = lambda e: \
f"https://api.bgm.tv/subject/{e}"
self.search_url = lambda e: f"https://api.bgm.tv/search/subject/{e}?type=2"
self.info_url = lambda e: f"https://api.bgm.tv/subject/{e}"
def search(self, title):
url = self.search_url(title)

View File

@@ -108,6 +108,7 @@ def name_process(name: str):
name_en = item.strip()
return name_en, name_zh, name_jp
def find_tags(other):
elements = re.sub(r"[\[\]()]", " ", other).split(" ")
# find CHT
@@ -136,9 +137,9 @@ def process(raw_title: str):
# 翻译组的名字
match_obj = TITLE_RE.match(content_title)
# 处理标题
season_info, episode_info, other = list(map(
lambda x: x.strip(), match_obj.groups()
))
season_info, episode_info, other = list(
map(lambda x: x.strip(), match_obj.groups())
)
process_raw = prefix_process(season_info, group)
# 处理 前缀
raw_name, season_raw, season = season_process(process_raw)
@@ -155,7 +156,18 @@ def process(raw_title: str):
if raw_episode is not None:
episode = int(raw_episode.group())
sub, dpi, source = find_tags(other) # 剩余信息处理
return name_en, name_zh, name_jp, season, season_raw, episode, sub, dpi, source, group
return (
name_en,
name_zh,
name_jp,
season,
season_raw,
episode,
sub,
dpi,
source,
group,
)
def raw_parser(raw: str) -> Episode | None:
@@ -163,14 +175,13 @@ def raw_parser(raw: str) -> Episode | None:
if ret is None:
logger.error(f"Parser cannot analyse {raw}")
return None
name_en, name_zh, name_jp, season, sr, episode, \
sub, dpi, source, group = ret
return Episode(name_en, name_zh, name_jp, season, sr, episode, sub, group, dpi, source)
name_en, name_zh, name_jp, season, sr, episode, sub, dpi, source, group = ret
return Episode(
name_en, name_zh, name_jp, season, sr, episode, sub, group, dpi, source
)
if __name__ == '__main__':
if __name__ == "__main__":
title = "【幻樱字幕组】【4月新番】【古见同学有交流障碍症 第二季 Komi-san wa, Komyushou Desu. S02】【22】【GB_MP4】【1920X1080】"
ep = raw_parser(title)
print(ep)

View File

@@ -154,19 +154,19 @@ METHODS = {
def torrent_parser(
file_name: str,
folder_name: str,
season: int,
suffix: str,
method: str = "pn",
file_name: str,
folder_name: str,
season: int,
suffix: str,
method: str = "pn",
):
info = rename_init(file_name, folder_name, season, suffix)
return METHODS[method.lower()](info)
if __name__ == '__main__':
if __name__ == "__main__":
title = "海盗战记 S02E17.zh.ass"
folder_name = "海盗战记"
season = 2
suffix = ".ass"
print(torrent_parser(title, folder_name, season, suffix, method="advance"))
print(torrent_parser(title, folder_name, season, suffix, method="advance"))

View File

@@ -1,2 +1 @@

View File

@@ -13,11 +13,11 @@ class TitleParser:
@staticmethod
def torrent_parser(
method: str,
torrent_name: str,
bangumi_name: str | None = None,
season: int | None = None,
suffix: str | None = None,
method: str,
torrent_name: str,
bangumi_name: str | None = None,
season: int | None = None,
suffix: str | None = None,
):
return torrent_parser(torrent_name, bangumi_name, season, suffix, method)
@@ -39,27 +39,20 @@ class TitleParser:
official_title = official_title if official_title else title
return official_title, tmdb_season
def raw_parser(
self,
raw: str,
settings: Config,
_id: int = 0
) -> BangumiData:
def raw_parser(self, raw: str, settings: Config, _id: int = 0) -> BangumiData:
language = settings.rss_parser.language
try:
episode = raw_parser(raw)
titles = {
"zh": episode.title_zh,
"en": episode.title_en,
"jp": episode.title_jp
"jp": episode.title_jp,
}
title_search = episode.title_zh if episode.title_zh else episode.title_en
title_raw = episode.title_en if episode.title_en else episode.title_zh
if settings.rss_parser.enable_tmdb:
official_title, _season = self.tmdb_parser(
title_search,
episode.season,
language
title_search, episode.season, language
)
else:
official_title = titles[language] if titles[language] else titles["zh"]

View File

@@ -24,9 +24,8 @@ class RSSAnalyser:
_id = op.gen_id()
for raw_title in add_title_list:
data = self._title_analyser.raw_parser(
raw=raw_title,
_id=_id,
settings=self.settings)
raw=raw_title, _id=_id, settings=self.settings
)
if data is not None and op.match_title(data.official_title) is None:
data_list.append(data)
_id += 1
@@ -39,8 +38,7 @@ class RSSAnalyser:
for torrent in rss_torrents:
try:
data = self._title_analyser.raw_parser(
torrent.name,
settings=self.settings
torrent.name, settings=self.settings
)
return data
except Exception as e:

View File

@@ -1 +1 @@
from .bangumi_data import load_program_data, save_program_data
from .bangumi_data import load_program_data, save_program_data

View File

@@ -12,12 +12,14 @@ def load_program_data(path: str) -> ProgramData:
data = ProgramData(**data)
logger.info("Data file loaded")
except Exception as e:
logger.warning("Data file is not compatible with the current version, rebuilding...")
logger.warning(
"Data file is not compatible with the current version, rebuilding..."
)
logger.debug(e)
data = ProgramData(
rss_link=data["rss_link"],
data_version=data["data_version"],
bangumi_info=[]
bangumi_info=[],
)
return data

View File

@@ -15,4 +15,4 @@ def save(filename, obj):
def get(url):
req = requests.get(url)
return req.json()
return req.json()

View File

@@ -6,56 +6,106 @@ def test_torrent_parser():
folder_name = "我内心的糟糕念头(2023)"
season = 1
suffix = ".mp4"
assert torrent_parser(file_name, folder_name, season, suffix, "pn") == "Boku no Kokoro no Yabai Yatsu S01E01.mp4"
assert torrent_parser(file_name, folder_name, season, suffix, "advance") == "我内心的糟糕念头(2023) S01E01.mp4"
assert torrent_parser(file_name, folder_name, season, suffix, "none") == "[Lilith-Raws] Boku no Kokoro no Yabai Yatsu - 01 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4].mp4"
assert (
torrent_parser(file_name, folder_name, season, suffix, "pn")
== "Boku no Kokoro no Yabai Yatsu S01E01.mp4"
)
assert (
torrent_parser(file_name, folder_name, season, suffix, "advance")
== "我内心的糟糕念头(2023) S01E01.mp4"
)
assert (
torrent_parser(file_name, folder_name, season, suffix, "none")
== "[Lilith-Raws] Boku no Kokoro no Yabai Yatsu - 01 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4].mp4"
)
file_name = "[Sakurato] Tonikaku Kawaii S2 [01][AVC-8bit 1080p AAC][CHS].mp4"
folder_name = "总之就是非常可爱(2021)"
season = 2
suffix = ".mp4"
assert torrent_parser(file_name, folder_name, season, suffix, "pn") == "Tonikaku Kawaii S02E01.mp4"
assert torrent_parser(file_name, folder_name, season, suffix, "advance") == "总之就是非常可爱(2021) S02E01.mp4"
assert (
torrent_parser(file_name, folder_name, season, suffix, "pn")
== "Tonikaku Kawaii S02E01.mp4"
)
assert (
torrent_parser(file_name, folder_name, season, suffix, "advance")
== "总之就是非常可爱(2021) S02E01.mp4"
)
file_name = "[SweetSub&LoliHouse] Heavenly Delusion - 01 [WebRip 1080p HEVC-10bit AAC ASSx2].mkv"
folder_name = "天国大魔境(2023)"
season = 1
suffix = ".mkv"
assert torrent_parser(file_name, folder_name, season, suffix, "pn") == "Heavenly Delusion S01E01.mkv"
assert torrent_parser(file_name, folder_name, season, suffix, "advance") == "天国大魔境(2023) S01E01.mkv"
assert (
torrent_parser(file_name, folder_name, season, suffix, "pn")
== "Heavenly Delusion S01E01.mkv"
)
assert (
torrent_parser(file_name, folder_name, season, suffix, "advance")
== "天国大魔境(2023) S01E01.mkv"
)
file_name = "[SBSUB][Kanojo mo Kanojo][01][GB][1080P](456E234).mp4"
folder_name = "女友也要有"
season = 1
suffix = ".mp4"
assert torrent_parser(file_name, folder_name, season, suffix, "pn") == "Kanojo mo Kanojo S01E01.mp4"
assert torrent_parser(file_name, folder_name, season, suffix, "advance") == "女友也要有 S01E01.mp4"
assert (
torrent_parser(file_name, folder_name, season, suffix, "pn")
== "Kanojo mo Kanojo S01E01.mp4"
)
assert (
torrent_parser(file_name, folder_name, season, suffix, "advance")
== "女友也要有 S01E01.mp4"
)
file_name = "[SBSUB][CONAN][1082][V2][1080P][AVC_AAC][CHS_JP](C1E4E331).mp4"
folder_name = "名侦探柯南(1996)"
season = 1
suffix = ".mp4"
assert torrent_parser(file_name, folder_name, season, suffix, "pn") == "CONAN S01E1082.mp4"
assert torrent_parser(file_name, folder_name, season, suffix, "advance") == "名侦探柯南(1996) S01E1082.mp4"
assert (
torrent_parser(file_name, folder_name, season, suffix, "pn")
== "CONAN S01E1082.mp4"
)
assert (
torrent_parser(file_name, folder_name, season, suffix, "advance")
== "名侦探柯南(1996) S01E1082.mp4"
)
file_name = "海盗战记 S01E01.mp4"
folder_name = "海盗战记(2021)"
season = 1
suffix = ".mp4"
assert torrent_parser(file_name, folder_name, season, suffix, "pn") == "海盗战记 S01E01.mp4"
assert torrent_parser(file_name, folder_name, season, suffix, "advance") == "海盗战记(2021) S01E01.mp4"
assert (
torrent_parser(file_name, folder_name, season, suffix, "pn")
== "海盗战记 S01E01.mp4"
)
assert (
torrent_parser(file_name, folder_name, season, suffix, "advance")
== "海盗战记(2021) S01E01.mp4"
)
file_name = "海盗战记 S01E01.zh-tw.ass"
folder_name = "海盗战记(2021)"
season = 1
suffix = ".ass"
assert torrent_parser(file_name, folder_name, season, suffix, "subtitle_pn") == "海盗战记 S01E01.zh-tw.ass"
assert torrent_parser(file_name, folder_name, season, suffix, "subtitle_advance") == "海盗战记(2021) S01E01.zh-tw.ass"
assert (
torrent_parser(file_name, folder_name, season, suffix, "subtitle_pn")
== "海盗战记 S01E01.zh-tw.ass"
)
assert (
torrent_parser(file_name, folder_name, season, suffix, "subtitle_advance")
== "海盗战记(2021) S01E01.zh-tw.ass"
)
file_name = "海盗战记 S01E01.SC.ass"
folder_name = "海盗战记(2021)"
season = 1
suffix = ".ass"
assert torrent_parser(file_name, folder_name, season, suffix, "subtitle_pn") == "海盗战记 S01E01.zh.ass"
assert torrent_parser(file_name, folder_name, season, suffix, "subtitle_advance") == "海盗战记(2021) S01E01.zh.ass"
assert (
torrent_parser(file_name, folder_name, season, suffix, "subtitle_pn")
== "海盗战记 S01E01.zh.ass"
)
assert (
torrent_parser(file_name, folder_name, season, suffix, "subtitle_advance")
== "海盗战记(2021) S01E01.zh.ass"
)