- fix rename bug
- temp save
This commit is contained in:
EstrellaXD
2023-03-14 21:22:55 +08:00
parent 2745047924
commit 34216245f7
56 changed files with 86 additions and 101 deletions

0
src/module/__init__.py Normal file
View File

View File

@@ -0,0 +1,29 @@
import logging
import time
logger = logging.getLogger(__name__)
def qb_connect_failed_wait(func):
def wrapper(*args, **kwargs):
times = 0
while times < 5:
try:
return func(*args, **kwargs)
except Exception as e:
logger.debug(f"URL: {args[0]}")
logger.warning("Cannot connect to qBittorrent. Wait 5 min and retry...")
time.sleep(300)
times += 1
return wrapper
def api_failed(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.debug(f"URL: {args[0]}")
logger.warning("Wrong API response.")
logger.debug(e)
return wrapper

70
src/module/api.py Normal file
View File

@@ -0,0 +1,70 @@
import uvicorn
from uvicorn.config import LOGGING_CONFIG
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, FileResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
import logging
from .core import APIProcess
from .conf import settings, DATA_PATH, LOG_PATH
from .utils import json_config
from .models.api import *
logger = logging.getLogger(__name__)
app = FastAPI()
api_func = APIProcess()
app.mount("/assets", StaticFiles(directory="templates/assets"), name="assets")
templates = Jinja2Templates(directory="templates")
@app.get("/", response_class=HTMLResponse)
def index(request: Request):
context = {"request": request}
return templates.TemplateResponse("index.html", context)
@app.get("/api/v1/data")
def get_data():
data = json_config.load(DATA_PATH)
return data
@app.get("/api/v1/log")
async def get_log():
return FileResponse(LOG_PATH)
@app.get("/api/v1/resetRule")
def reset_rule():
return api_func.reset_rule()
@app.get("api/v1/removeRule/{bangumi_title}")
def remove_rule(bangumi_title: str):
return api_func.remove_rule(bangumi_title)
@app.post("/api/v1/collection")
async def collection(link: RssLink):
return api_func.download_collection(link.rss_link)
@app.post("/api/v1/subscribe")
async def subscribe(link: RssLink):
return api_func.add_subscribe(link.rss_link)
@app.post("/api/v1/addRule")
async def add_rule(info: AddRule):
return api_func.add_rule(info.title, info.season)
def run():
LOGGING_CONFIG["formatters"]["default"]["fmt"] = "[%(asctime)s] %(levelprefix)s %(message)s"
uvicorn.run(app, host="0.0.0.0", port=settings.program.webui_port)

90
src/module/app.py Normal file
View File

@@ -0,0 +1,90 @@
import os
import time
import logging
from module.conf import settings, setup_logger, LOG_PATH, DATA_PATH, VERSION
from module.utils import json_config
from module.core import DownloadClient
from module.manager import Renamer, FullSeasonGet
from module.rss import RSSAnalyser
logger = logging.getLogger(__name__)
def reset_log():
if os.path.exists(LOG_PATH):
os.remove(LOG_PATH)
def load_data_file():
if not os.path.exists(DATA_PATH):
bangumi_data = {
"rss_link": settings.rss_parser.link,
"data_version": settings.program.data_version,
"bangumi_info": []
}
logger.info("Building data information...")
else:
bangumi_data = json_config.load(DATA_PATH)
if bangumi_data["data_version"] != settings.program.data_version or bangumi_data["rss_link"] != settings.rss_parser.link:
bangumi_data = {
"rss_link": settings.rss_parser.link,
"data_version": settings.program.data_version,
"bangumi_info": []
}
logger.info("Rebuilding data information...")
return bangumi_data
def save_data_file(bangumi_data):
json_config.save(DATA_PATH, bangumi_data)
logger.debug("Saved")
def show_info():
logger.info(r" _ ____ _ ")
logger.info(r" /\ | | | _ \ (_)")
logger.info(r" / \ _ _| |_ ___ | |_) | __ _ _ __ __ _ _ _ _ __ ___ _ ")
logger.info(r" / /\ \| | | | __/ _ \| _ < / _` | '_ \ / _` | | | | '_ ` _ \| |")
logger.info(r" / ____ \ |_| | || (_) | |_) | (_| | | | | (_| | |_| | | | | | | |")
logger.info(r" /_/ \_\__,_|\__\___/|____/ \__,_|_| |_|\__, |\__,_|_| |_| |_|_|")
logger.info(" __/ | ")
logger.info(" |___/ ")
logger.info(f"Version {VERSION} Author: EstrellaXD Twitter: https://twitter.com/Estrella_Pan")
logger.info("GitHub: https://github.com/EstrellaXD/Auto_Bangumi/")
logger.info("Starting AutoBangumi...")
def main_process(bangumi_data, download_client: DownloadClient):
rename = Renamer(download_client)
rss_analyser = RSSAnalyser()
while True:
times = 0
if settings.rss_parser.enable:
rss_analyser.run(bangumi_data["bangumi_info"], download_client)
if settings.bangumi_manage.eps_complete and bangumi_data["bangumi_info"] != []:
FullSeasonGet().eps_complete(bangumi_data["bangumi_info"], download_client)
logger.info("Running....")
save_data_file(bangumi_data)
while times < settings.program.times:
if settings.bangumi_manage.enable:
rename.run()
times += 1
time.sleep(settings.program.sleep_time/settings.program.times)
def run():
# 初始化
reset_log()
setup_logger()
show_info()
download_client = DownloadClient()
download_client.init_downloader()
if settings.rss_parser.link is None:
logger.error("Please add RIGHT RSS url.")
quit()
download_client.rss_feed()
bangumi_data = load_data_file()
# 主程序循环
main_process(bangumi_data, download_client)

View File

@@ -0,0 +1,6 @@
from .log import setup_logger, LOG_PATH
from .config import settings, VERSION
TMDB_API = "32b19d6a05b512190a056fa4e747cbbc"
DATA_PATH = "data/data.json"

75
src/module/conf/config.py Normal file
View File

@@ -0,0 +1,75 @@
import json
import os
from dataclasses import dataclass
from .const import DEFAULT_SETTINGS, ENV_TO_ATTR
try:
from ..__version__ import VERSION
except ImportError:
VERSION = "DEV_VERSION"
class ConfLoad(dict):
def __getattr__(self, item):
return self.get(item)
def __setattr__(self, key, value):
self[key] = value
@dataclass
class Settings:
program: ConfLoad
downloader: ConfLoad
rss_parser: ConfLoad
bangumi_manage: ConfLoad
debug: ConfLoad
proxy: ConfLoad
notification: ConfLoad
def __init__(self, path: str | None):
self.load(path)
def load(self, path: str | None):
if path is None:
conf = DEFAULT_SETTINGS
elif os.path.isfile(path):
with open(path, "r") as f:
conf = json.load(f)
else:
conf = self._create_config()
for key, section in conf.items():
setattr(self, key, ConfLoad(section))
@staticmethod
def _val_from_env(env, attr):
val = os.environ[env]
if isinstance(attr, tuple):
conv_func = attr[1]
val = conv_func(val)
return val
def _create_config(self):
settings = DEFAULT_SETTINGS
for key, section in ENV_TO_ATTR.items():
for env, attr in section.items():
if env in os.environ:
attr_name = attr[0] if isinstance(attr, tuple) else attr
settings[key][attr_name] = self._val_from_env(env, attr)
with open(CONFIG_PATH, "w") as f:
json.dump(settings, f, indent=4)
return settings
if os.path.isdir("config") and VERSION == "DEV_VERSION":
CONFIG_PATH = "config/config_dev.json"
elif os.path.isdir("config") and VERSION != "DEV_VERSION":
CONFIG_PATH = "config/config.json"
else:
CONFIG_PATH = None
settings = Settings(CONFIG_PATH)

106
src/module/conf/const.py Normal file
View File

@@ -0,0 +1,106 @@
# -*- encoding: utf-8 -*-
DEFAULT_SETTINGS = {
"program": {
"sleep_time": 7200,
"times": 20,
"webui_port": 7892,
"data_version": 4.0
},
"downloader": {
"type": "qbittorrent",
"host": "127.0.0.1:8080",
"username": "admin",
"password": "adminadmin",
"path": "/downloads/Bangumi",
"ssl": False
},
"rss_parser": {
"enable": True,
"type": "mikan",
"link": "",
"enable_tmdb": False,
"filter": ["720", "\\d+-\\d+"],
"language": "zh"
},
"bangumi_manage": {
"enable": True,
"eps_complete": False,
"rename_method": "pn",
"group_tag": False,
"remove_bad_torrent": False
},
"debug": {
"enable": False,
"level": "info",
"file": "bangumi.log",
"dev_debug": False
},
"proxy": {
"enable": False,
"type": "http",
"host": "",
"port": 1080,
"username": "",
"password": ""
},
"notification": {
"enable": False,
"type": "telegram",
"token": "",
"chat_id": ""
}
}
ENV_TO_ATTR = {
"program": {
"AB_INTERVAL_TIME": ("sleep_time", lambda e: float(e)),
"AB_RENAME_FREQ": ("times", lambda e: float(e)),
"AB_WEBUI_PORT": ("webui_port", lambda e: int(e)),
},
"downloader": {
"AB_DOWNLOADER_HOST": "host",
"AB_DOWNLOADER_USERNAME": "username",
"AB_DOWNLOADER_PASSWORD": "password",
"AB_DOWNLOAD_PATH": "path",
},
"rss_parser": {
"AB_RSS_COLLECTOR": ("enable", lambda e: e.lower() in ("true", "1", "t")),
"AB_RSS": "link",
"AB_NOT_CONTAIN": ("filter", lambda e: e.split("|")),
"AB_LANGUAGE": "language",
"AB_ENABLE_TMDB": ("enable_tmdb", lambda e: e.lower() in ("true", "1", "t")),
},
"bangumi_manage": {
"AB_RENAME": ("enable", lambda e: e.lower() in ("true", "1", "t")),
"AB_METHOD": "method",
"AB_GROUP_TAG": ("group_tag", lambda e: e.lower() in ("true", "1", "t")),
"AB_EP_COMPLETE": ("eps_complete", lambda e: e.lower() in ("true", "1", "t")),
"AB_REMOVE_BAD_BT": ("remove_bad_torrent", lambda e: e.lower() in ("true", "1", "t")),
},
"debug": {
"AB_DEBUG_MODE": ("enable", lambda e: e.lower() in ("true", "1", "t")),
},
"proxy": {
"AB_HTTP_PROXY": "http",
"AB_SOCKS": "socks",
},
}
class BCOLORS:
@staticmethod
def _(color: str, *args: str) -> str:
strings = [str(s) for s in args]
return f"{color}{', '.join(strings)}{BCOLORS.ENDC}"
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"

22
src/module/conf/log.py Normal file
View File

@@ -0,0 +1,22 @@
import logging
from .config import settings
LOG_PATH = "data/log.txt"
def setup_logger():
level = logging.DEBUG if settings.debug.enable else logging.INFO
logging.addLevelName(logging.DEBUG, 'DEBUG:')
logging.addLevelName(logging.INFO, 'INFO:')
logging.addLevelName(logging.WARNING, 'WARNING:')
LOGGING_FORMAT = "[%(asctime)s] %(levelname)-8s %(message)s"
logging.basicConfig(
level=level,
format=LOGGING_FORMAT,
encoding="utf-8",
handlers=[
logging.FileHandler(LOG_PATH, encoding="utf-8"),
logging.StreamHandler(),
]
)

View File

@@ -0,0 +1,72 @@
[loggers]
keys=root, gunicorn.error, gunicorn.access,uvicorn.error,uvicorn.access
[handlers]
keys=console, error_file, access_file, accesscustom
[formatters]
keys=generic, access, AccessFormatter
[logger_root]
level=INFO
handlers=console
propagate=1
[logger_gunicorn.error]
level=INFO
handlers=error_file
propagate=0
qualname=gunicorn.error
[logger_gunicorn.access]
level=INFO
handlers=accesscustom
propagate=0
qualname=gunicorn.access
[logger_uvicorn.error]
level=INFO
handlers=error_file
propagate=0
qualname=uvicorn.error
[logger_uvicorn.access]
level=INFO
handlers=accesscustom
propagate=0
qualname=uvicorn.access
[handler_console]
class=StreamHandler
formatter=generic
args=(sys.stdout, )
[handler_error_file]
class=StreamHandler
formatter=generic
args=(sys.stdout, )
[handler_access_file]
class=StreamHandler
formatter=access
args=(sys.stdout, )
[handler_accesscustom]
class=StreamHandler
formatter=AccessFormatter
args=(sys.stdout, )
[formatter_generic]
format=%(levelname)s: %(message)s
datefmt=%Y-%m-%dT%H:%M:%S
class=uvicorn.logging.DefaultFormatter
[formatter_access]
format=%(levelname)s: %(message)s
datefmt=%Y-%m-%dT%H:%M:%S
class=customlogger.CustomFormatter
[formatter_AccessFormatter]
format={"event":"access_log","ip":"%(h)s","status":"%(s)s","method":"%(m)s","path":"%(U)s","referer":"%(f)s","x_session_id":"%(x-session-id)s","x_google_id":"%(x-google-id)s","x_server_time":"%(x-server-time)s","agent":"%(a)s"}
datefmt=%Y-%m-%dT%H:%M:%S
class=customlogger.CustomFormatter

15
src/module/conf/parse.py Normal file
View File

@@ -0,0 +1,15 @@
import argparse
def parse():
parser = argparse.ArgumentParser(
prog="Auto Bangumi",
description="""
本项目是基于 Mikan Project、qBittorrent 的全自动追番整理下载工具。
只需要在 Mikan Project 上订阅番剧,就可以全自动追番。
并且整理完成的名称和目录可以直接被 Plex、Jellyfin 等媒体库软件识别,
无需二次刮削。""",
)
parser.add_argument("-d", "--debug", action="store_true", help="debug mode")
return parser.parse_args()

View File

@@ -0,0 +1,2 @@
from .download_client import DownloadClient
from .api_func import APIProcess

View File

@@ -0,0 +1,66 @@
import re
from module.core import DownloadClient
from module.manager import FullSeasonGet
from module.rss import RSSAnalyser
from module.utils import json_config
from module.conf import DATA_PATH
from module.ab_decorator import api_failed
class APIProcess:
def __init__(self):
self._rss_analyser = RSSAnalyser()
self._download_client = DownloadClient()
self._full_season_get = FullSeasonGet()
def link_process(self, link):
return self._rss_analyser.rss_to_data(link)
@api_failed
def download_collection(self, link):
data = self.link_process(link)
self._full_season_get.download_collection(data, link, self._download_client)
return data
@api_failed
def add_subscribe(self, link):
data = self.link_process(link)
self._download_client.add_rss_feed(link, data.get("official_title"))
self._download_client.set_rule(data, link)
return data
@staticmethod
def reset_rule():
data = json_config.load(DATA_PATH)
data["bangumi_info"] = []
json_config.save(DATA_PATH, data)
return "Success"
@staticmethod
def remove_rule(name):
datas = json_config.load(DATA_PATH)["bangumi_info"]
for data in datas:
if re.search(name.lower(), data["title_raw"].lower()):
datas.remove(data)
json_config.save(DATA_PATH, datas)
return "Success"
return "Not matched"
@staticmethod
def add_rule(title, season):
data = json_config.load(DATA_PATH)
extra_data = {
"official_title": title,
"title_raw": title,
"season": season,
"season_raw": "",
"dpi": "",
"group": "",
"eps_complete": False,
"added": False,
}
data["bangumi_info"].append(extra_data)
json_config.save(DATA_PATH, data)
return "Success"

View File

@@ -0,0 +1,116 @@
import re
import logging
import os
from module.downloader import getClient
from module.conf import settings
logger = logging.getLogger(__name__)
class DownloadClient:
def __init__(self):
self.client = getClient()
def init_downloader(self):
prefs = {
"rss_auto_downloading_enabled": True,
"rss_max_articles_per_feed": 500,
"rss_processing_enabled": True,
"rss_refresh_interval": 30,
}
self.client.prefs_init(prefs=prefs)
if settings.downloader.download_path == "":
prefs = self.client.get_app_prefs()
settings.downloader.path = os.path.join(prefs["save_path"], "Bangumi")
def set_rule(self, info: dict, rss_link):
official_name, raw_name, season, group = info["official_title"], info["title_raw"], info["season"], info["group"]
rule = {
"enable": True,
"mustContain": raw_name,
"mustNotContain": "|".join(settings.rss_parser.filter),
"useRegex": True,
"episodeFilter": "",
"smartFilter": False,
"previouslyMatchedEpisodes": [],
"affectedFeeds": [rss_link],
"ignoreDays": 0,
"lastMatch": "",
"addPaused": settings.debug.dev_debug,
"assignedCategory": "Bangumi",
"savePath": str(
os.path.join(
settings.downloader.path,
re.sub(r"[:/.]", " ", official_name).strip(),
f"Season {season}",
)
),
}
rule_name = f"[{group}] {official_name}" if settings.bangumi_manage.group_tag else official_name
self.client.rss_set_rule(rule_name=f"{rule_name} S{season}", rule_def=rule)
logger.info(f"Add {official_name} Season {season}")
def rss_feed(self):
# TODO: 定时刷新 RSS
if self.client.get_rss_info() == settings.rss_parser.link:
logger.info("RSS Already exists.")
else:
logger.info("No feed exists, start adding feed.")
self.client.rss_add_feed(url=settings.rss_parser.link, item_path="Mikan_RSS")
logger.info("Add RSS Feed successfully.")
def add_collection_feed(self, rss_link, item_path):
self.client.rss_add_feed(url=rss_link, item_path=item_path)
logger.info("Add RSS Feed successfully.")
def add_rules(self, bangumi_info, rss_link=settings.rss_parser.link):
logger.debug("Start adding rules.")
for info in bangumi_info:
if not info["added"]:
self.set_rule(info, rss_link)
info["added"] = True
# logger.info("to rule.")
logger.debug("Finished.")
def get_torrent_info(self):
return self.client.torrents_info(
status_filter="completed", category="Bangumi"
)
def rename_torrent_file(self, hash, new_file_name, old_path, new_path):
self.client.torrents_rename_file(
torrent_hash=hash, new_file_name=new_file_name, old_path=old_path, new_path=new_path
)
logger.info(f"{old_path} >> {new_path}, new name {new_file_name}")
def delete_torrent(self, hashes):
self.client.torrents_delete(
hashes
)
logger.info(f"Remove bad torrents.")
def add_torrent(self, torrent: dict):
self.client.torrents_add(
urls=torrent["url"],
save_path=torrent["save_path"],
category="Bangumi"
)
def move_torrent(self, hashes, location):
self.client.move_torrent(
hashes=hashes,
new_location=location
)
def add_rss_feed(self, rss_link, item_path):
self.client.rss_add_feed(url=rss_link, item_path=item_path)
logger.info("Add RSS Feed successfully.")
def get_download_rules(self):
return self.client.get_download_rule()
def get_torrent_path(self, hashes):
return self.client.get_torrent_path(hashes)

View File

@@ -0,0 +1,26 @@
import re
import logging
from bs4 import BeautifulSoup
from module.conf import settings
from module.utils import json_config
logger = logging.getLogger(__name__)
class RSSFilter:
def __init__(self):
self.filter_rule = json_config.load(settings.filter_rule)
def filter(self, item: BeautifulSoup):
title = item.title.string
torrent = item.find("enclosure")
download = False
for rule in self.filter_rule:
if re.search(rule["include"], title):
if not re.search(rule["exclude"], title):
download = True
logger.debug(f"{title} added")
return download, torrent

View File

@@ -0,0 +1,11 @@
from module.conf import settings
def getClient():
host = settings.downloader.host
username = settings.downloader.username
password = settings.downloader.password
# TODO 多下载器支持
# 从 settings 里读取下载器名称,然后返回对应 Client
from .qb_downloader import QbDownloader
return QbDownloader(host, username, password)

View File

@@ -0,0 +1,38 @@
import logging
import time
from aria2p import Client, ClientException, API
from conf import settings
from downloader.exceptions import ConflictError
logger = logging.getLogger(__name__)
class QbDownloader:
def __init__(self, host, username, password):
while True:
try:
self._client = API(
Client(
host=host,
port=6800,
secret=password
)
)
break
except ClientException:
logger.warning(
f"Can't login Aria2 Server {host} by {username}, retry in {settings.connect_retry_interval}"
)
time.sleep(settings.connect_retry_interval)
def torrents_add(self, urls, save_path, category):
return self._client.add_torrent(
is_paused=settings.dev_debug,
torrent_file_path=urls,
save_path=save_path,
category=category,
)

View File

@@ -0,0 +1,2 @@
class ConflictError(Exception):
pass

View File

@@ -0,0 +1,96 @@
import logging
import time
from qbittorrentapi import Client, LoginFailed
from qbittorrentapi.exceptions import Conflict409Error
from module.conf import settings
from module.ab_decorator import qb_connect_failed_wait
from .exceptions import ConflictError
logger = logging.getLogger(__name__)
class QbDownloader:
@qb_connect_failed_wait
def __init__(self, host, username, password):
self._client = Client(
host=host,
username=username,
password=password,
)
while True:
try:
self._client.auth_log_in()
break
except LoginFailed:
logger.debug(
f"Can't login qBittorrent Server {host} by {username}, retry in {5} seconds."
)
time.sleep(5)
@qb_connect_failed_wait
def prefs_init(self, prefs):
return self._client.app_set_preferences(prefs=prefs)
@qb_connect_failed_wait
def get_app_prefs(self):
return self._client.app_preferences()
@qb_connect_failed_wait
def torrents_info(self, status_filter, category):
return self._client.torrents_info(status_filter, category)
def torrents_add(self, urls, save_path, category):
return self._client.torrents_add(
is_paused=settings.DEBUG["enable"],
urls=urls,
save_path=save_path,
category=category,
)
def torrents_delete(self, hash):
return self._client.torrents_delete(
delete_files=False,
torrent_hashes=hash
)
def torrents_rename_file(self, torrent_hash, new_file_name, old_path, new_path):
self._client.torrents_rename_file(torrent_hash=torrent_hash, new_file_name=new_file_name,
old_path=old_path, new_path=new_path)
def get_rss_info(self):
item = self._client.rss_items().get("Mikan_RSS")
if item is not None:
return item.url
else:
return None
def rss_add_feed(self, url, item_path):
try:
if self.get_rss_info() is not None:
self.rss_remove_item(item_path)
self._client.rss_add_feed(url, item_path)
except Conflict409Error:
logger.exception("RSS Exist.")
def rss_remove_item(self, item_path):
try:
self._client.rss_remove_item(item_path)
except Conflict409Error as e:
logger.debug(e)
logger.info("Add new RSS")
raise ConflictError()
def rss_set_rule(self, rule_name, rule_def):
self._client.rss_set_rule(rule_name, rule_def)
def move_torrent(self, hashes, new_location):
self._client.torrents_set_location(new_location, hashes)
def get_download_rule(self):
return self._client.rss_rules()
def get_torrent_path(self, hash):
return self._client.torrents_info(hashes=hash)[0].save_path

View File

View File

@@ -0,0 +1,2 @@
from .eps_complete import FullSeasonGet
from .renamer import Renamer

View File

@@ -0,0 +1,83 @@
import os.path
import re
import logging
from module.conf import settings
from module.network import RequestContent
from module.core.download_client import DownloadClient
logger = logging.getLogger(__name__)
SEARCH_KEY = ["group", "title_raw", "season_raw", "subtitle", "source", "dpi"]
class FullSeasonGet:
def __init__(self):
pass
@staticmethod
def init_eps_complete_search_str(data: dict):
test = [data.get(key).strip() for key in SEARCH_KEY if data.get(key) is not None]
search_str_pre = "+".join(test)
search_str = re.sub(r"[\W_ ]", "+", search_str_pre)
return search_str
def get_season_torrents(self, data: dict):
keyword = self.init_eps_complete_search_str(data)
with RequestContent() as req:
torrents = req.get_torrents(f"https://mikanani.me/RSS/Search?searchstr={keyword}")
return torrents
@staticmethod
def collect_season_torrents(data: dict, torrents):
downloads = []
for torrent in torrents:
download_info = {
"url": torrent.torrent_link,
"save_path": os.path.join(
settings.download_path,
data["official_title"],
f"Season {data['season']}")
}
downloads.append(download_info)
return downloads
def download_eps(self, data, download_client: DownloadClient):
logger.info(f"Start collecting {data['official_title']} Season {data['season']}...")
torrents = self.get_season_torrents(data)
downloads = self.collect_season_torrents(data, torrents)
for download in downloads:
download_client.add_torrent(download)
logger.info("Completed!")
data["eps_collect"] = False
def eps_complete(self, bangumi_info, download_client: DownloadClient):
for data in bangumi_info:
if data["eps_collect"]:
self.download_eps(data, download_client)
def download_collection(self, data, link, download_client: DownloadClient):
with RequestContent() as req:
torrents = req.get_torrents(link)
downloads = self.collect_season_torrents(data, torrents)
logger.info(f"Starting download {data.get('official_title')}")
for download in downloads:
download_client.add_torrent(download)
logger.info("Completed!")
if __name__ == "__main__":
a = FullSeasonGet()
data = {
"official_title": "指名!",
"title_raw": "CUE!",
"season": 1,
"season_raw": "",
"group": "喵萌Production",
"dpi": "1080p",
"source": None,
"subtitle": "简日双语",
"added": True,
"eps_collect": True
}
print(a.init_eps_complete_search_str(data))

View File

@@ -0,0 +1,89 @@
import logging
import os.path
import re
from pathlib import PurePath, PureWindowsPath
from module.core.download_client import DownloadClient
from module.conf import settings
from module.parser import TitleParser
from module.network import PostNotification, ServerChanNotification
logger = logging.getLogger(__name__)
class Renamer:
def __init__(self, download_client: DownloadClient):
self.client = download_client
self._renamer = TitleParser()
@staticmethod
def print_result(torrent_count, rename_count):
if rename_count != 0:
logger.info(f"Finished checking {torrent_count} files' name, renamed {rename_count} files.")
logger.debug(f"Checked {torrent_count} files")
def get_torrent_info(self):
recent_info = self.client.get_torrent_info()
torrent_count = len(recent_info)
return recent_info, torrent_count
@staticmethod
def split_path(path: str):
suffix = os.path.splitext(path)[-1]
path = path.replace(settings.downloader.path, "")
path_parts = PurePath(path).parts \
if PurePath(path).name != path \
else PureWindowsPath(path).parts
path_name = path_parts[-1]
try:
if re.search(r"S\d{1,2}|[Ss]eason", path_parts[-2]) is not None:
season = int(re.search(r"\d{1,2}", path_parts[-2]).group())
else:
season = 1
except Exception as e:
logger.debug(e)
logger.debug("No Season info")
season = 1
folder_name = path_parts[1] if path_parts[0] == "/" else path_parts[0]
try:
download_path = path_parts[1]
except IndexError:
download_path = ""
return path_name, season, folder_name, suffix, download_path
def run(self):
recent_info, torrent_count = self.get_torrent_info()
rename_count = 0
for info in recent_info:
name = info.name
torrent_hash = info.hash
path_name, season, folder_name, suffix, _ = self.split_path(info.content_path)
if path_name is folder_name:
logger.warning("Wrong bangumi path, please check your qbittorrent settings.")
else:
try:
new_name = self._renamer.download_parser(name, folder_name, season, suffix, settings.bangumi_manage.rename_method)
if path_name != new_name:
old_name = os.path.basename(info.content_path)
self.client.rename_torrent_file(torrent_hash, new_name, old_name, new_name)
rename_count += 1
else:
continue
except Exception as e:
logger.warning(f"{path_name} rename failed")
logger.warning(f"Folder name: {folder_name}, Season: {season}, Suffix: {suffix}")
logger.debug(e)
if settings.bangumi_manage.remove_bad_torrent:
self.client.delete_torrent(torrent_hash)
self.print_result(torrent_count, rename_count)
def set_folder(self):
recent_info, _ = self.get_torrent_info()
for info in recent_info:
torrent_hash = info.hash
_, season, folder_name, _, download_path = self.split_path(info.content_path)
new_path = os.path.join(settings.downloader.path, folder_name, f"Season {season}")
# print(new_path)
self.client.move_torrent(torrent_hash, new_path)

View File

@@ -0,0 +1,87 @@
import logging
import re
from dataclasses import dataclass
from pathlib import PurePath, PureWindowsPath
from module.core import DownloadClient
from module.conf import settings
from module.utils import json_config
logger = logging.getLogger(__name__)
@dataclass
class RuleInfo:
rule_name: str
contain: str
season: int
folder_name: str
new_path: str
@dataclass
class RePathInfo:
path: str
hashes: list
class RePath:
def __init__(self, download_client: DownloadClient):
self._client = download_client
self.re_season = re.compile(r"S\d{1,2}")
@staticmethod
def analyse_path(path: str):
path_parts = PurePath(path).parts
folder_name = path_parts[-2]
season_folder = path_parts[-1]
season = int(re.search(r"\d{1,2}", season_folder).group())
return season, folder_name
def get_rule(self) -> [RuleInfo]:
rules = self._client.get_download_rules()
all_rule = []
for rule in rules:
path = rules.get(rule).savePath
must_contain = rules.get(rule).mustContain
season, folder_name = self.analyse_path(path)
new_path = PurePath(settings.download_path, folder_name, f"Season {season}").__str__()
all_rule.append(RuleInfo(rule, must_contain, season, folder_name, new_path))
return all_rule
@staticmethod
def get_difference(bangumi_data: list, rules: [RuleInfo]) -> [RuleInfo]:
different_data = []
for data in bangumi_data:
for rule in rules:
rule_name = re.sub(r"S\d", "", rule.rule_name).strip()
if data.get("official_title") == rule_name:
if data.get("season") != rule.season:
different_data.append(rule)
data["season"] = rule.season
break
return different_data
def get_matched_torrents_list(self, repath_rules: [RuleInfo]) -> [RePathInfo]:
infos = self._client.get_torrent_info()
repath_list = []
for rule in repath_rules:
hashes = []
for info in infos:
if re.search(rule.contain, info.name):
if rule.new_path != info.save_path:
hashes.append(info.hash)
infos.remove(info)
if hashes:
repath_list.append(RePathInfo(rule.new_path, hashes))
return repath_list
def re_path(self, repath_info: RePathInfo):
self._client.move_torrent(repath_info.hashes, repath_info.path)
def run(self):
rules = self.get_rule()
match_list = self.get_matched_torrents_list(rules)
logging.info(f"Starting repath process.")
for list in match_list:
self.re_path(list)

View File

18
src/module/models/api.py Normal file
View File

@@ -0,0 +1,18 @@
from pydantic import BaseModel
class RssLink(BaseModel):
rss_link: str
class AddRule(BaseModel):
title: str
season: int
class ChangeConfig(BaseModel):
config: dict
class ChangeRule(BaseModel):
rule: dict

View File

@@ -0,0 +1,14 @@
from dataclasses import dataclass
@dataclass
class MatchRule:
keyword: str
filter: list
rss_link: str
class GroupFilter:
name: str
filter: list

View File

@@ -0,0 +1,2 @@
from .request_contents import RequestContent
from .notification import PostNotification, ServerChanNotification

View File

@@ -0,0 +1,62 @@
import logging
import requests
from module.conf import settings
logger = logging.getLogger(__name__)
class PostNotification:
def __init__(self):
self.token = settings.notification_token
self.notification_url = lambda message: f"https://api.pushbullet.com/v2/{self.token}/{message}"
def ifttt_post(self, message):
url = self.notification_url(message)
response = requests.get(url)
return response.status_code == 200
class TelegramNotification:
def __init__(self):
self.token = settings.notification_token
self.notification_url = f"https://api.telegram.org/bot{self.token}/sendMessage"
def send_msg(self, title: str, desp: str) -> bool:
if not settings.notification_enable:
return False
data = {
"chat_id": settings.notification_chat_id,
"text": f"{title}\n{desp}",
}
class ServerChanNotification:
"""Server酱推送"""
def __init__(self):
self.token = settings.notification.token
self.notification_url = f"https://sctapi.ftqq.com/{self.token}.send"
def send_msg(self, title: str, desp: str) -> bool:
if not settings.notification.enable:
return False
data = {
"title": title,
"desp": desp,
}
try:
resp = requests.post(self.notification_url, json=data, timeout=3)
resp.raise_for_status()
except requests.RequestException as e:
logging.error("[ServerChanNotification] send fail, error: %s" % e)
return False
return True
if __name__ == '__main__':
name = "勇者、辞职不干了"
notification = ServerChanNotification()
notification.send_msg(f"{name[:10]}》缓存成功", f"[Auto Bangumi]《{name}》缓存成功")

View File

@@ -0,0 +1,43 @@
from dataclasses import dataclass
from bs4 import BeautifulSoup
from .request_url import RequestURL
from module.conf import settings
import re
FILTER = "|".join(settings.rss_parser.filter)
@dataclass
class TorrentInfo:
name: str
torrent_link: str
class RequestContent(RequestURL):
# Mikanani RSS
def get_torrents(self, _url: str) -> [TorrentInfo]:
soup = self.get_xml(_url)
torrent_titles = [item.title.string for item in soup.find_all("item")]
torrent_urls = [item.get("url") for item in soup.find_all("enclosure")]
torrents = []
for _title, torrent_url in zip(torrent_titles, torrent_urls):
if re.search(FILTER, _title) is None:
torrents.append(TorrentInfo(_title, torrent_url))
return torrents
def get_torrent(self, _url) -> TorrentInfo:
soup = self.get_xml(_url)
item = soup.find("item")
enclosure = item.find("enclosure")
return TorrentInfo(item.title.string, enclosure["url"])
def get_xml(self, url):
return BeautifulSoup(self.get_url(url).text, "xml")
# API JSON
def get_json(self, _url) -> dict:
return self.get_url(_url).json()

View File

@@ -0,0 +1,55 @@
import time
import requests
import socket
import socks
import logging
from module.conf import settings
logger = logging.getLogger(__name__)
class RequestURL:
def __init__(self):
self.header = {
"user-agent": "Mozilla/5.0",
"Accept": "application/xml"
}
def get_url(self, url):
times = 0
while times < 5:
try:
req = self.session.get(url=url, headers=self.header)
req.raise_for_status()
return req
except requests.RequestException as e:
logger.debug(f"URL: {url}")
logger.debug(e)
logger.warning("ERROR with Connection.Please check DNS/Connection settings")
time.sleep(5)
times += 1
except Exception as e:
logger.debug(f"URL: {url}")
logger.debug(e)
break
def __enter__(self):
self.session = requests.Session()
if settings.proxy.enable:
if settings.proxy.type == "http":
url = f"http://{settings.proxy.host}:{settings.proxy.port}"
self.session.proxies = {
"https": url,
"http": url,
}
elif settings.proxy.type == "socks5":
socks.set_default_proxy(socks.SOCKS5, addr=settings.proxy.host, port=settings.proxy.port, rdns=True,
username=settings.proxy.username, password=settings.proxy.password)
socket.socket = socks.socksocket
def __exit__(self, exc_type, exc_val, exc_tb):
self.session.close()

View File

@@ -0,0 +1 @@
from .title_parser import TitleParser

View File

@@ -0,0 +1,4 @@
from .raw_parser import RawParser
from .rename_parser import DownloadParser
from .tmdb_parser import TMDBMatcher

View File

@@ -0,0 +1,17 @@
from module.network import RequestContent
class BgmAPI:
def __init__(self):
self.search_url = lambda e: \
f"https://api.bgm.tv/search/subject/{e}?type=2"
self.info_url = lambda e: \
f"https://api.bgm.tv/subject/{e}"
def search(self, title):
url = self.search_url(title)
with RequestContent() as req:
contents = req.get_json(url)["list"]
if contents.__len__() == 0:
return None
return contents[0]["name"], contents[0]["name_cn"]

View File

@@ -0,0 +1,193 @@
import logging
import re
from dataclasses import dataclass
# from parser.episode import Episode
logger = logging.getLogger(__name__)
EPISODE_RE = re.compile(r"\d+")
TITLE_RE = re.compile(
r"(.*|\[.*])( -? \d+|\[\d+]|\[\d+.?[vV]\d{1}]|[第]?\d+[话話集]|\[\d+.?END])(.*)"
)
RESOLUTION_RE = re.compile(r"1080|720|2160|4K")
SOURCE_RE = re.compile(r"B-Global|[Bb]aha|[Bb]ilibili|AT-X|Web")
SUB_RE = re.compile(r"[简繁日字幕]|CH|BIG5|GB")
PREFIX_RE = re.compile(r"[^\w\s\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff-]")
CHINESE_NUMBER_MAP = {
"": 1,
"": 2,
"": 3,
"": 4,
"": 5,
"": 6,
"": 7,
"": 8,
"": 9,
"": 10,
}
@dataclass
class Episode:
title_en: str or None
title_zh: str or None
title_jp: str or None
season: int
season_raw: str
episode: int
sub: str
group: str
resolution: str
source: str
class RawParser:
@staticmethod
def get_group(name: str) -> str:
return re.split(r"[\[\]]", name)[1]
@staticmethod
def pre_process(raw_name: str) -> str:
return raw_name.replace("", "[").replace("", "]")
@staticmethod
def prefix_process(raw: str, group: str) -> str:
raw = re.sub(f".{group}.", "", raw)
raw_process = PREFIX_RE.sub("/", raw)
arg_group = raw_process.split("/")
for arg in arg_group:
if re.search(r"新番|月?番", arg) and len(arg) <= 5:
raw = re.sub(f".{arg}.", "", raw)
elif re.search(r"港澳台地区", arg):
raw = re.sub(f".{arg}.", "", raw)
return raw
@staticmethod
def season_process(season_info: str):
name_season = season_info
# if re.search(r"新番|月?番", season_info):
# name_season = re.sub(".*新番.", "", season_info)
# # 去除「新番」信息
# name_season = re.sub(r"^[^]】]*[]】]", "", name_season).strip()
season_rule = r"S\d{1,2}|Season \d{1,2}|[第].[季期]"
name_season = re.sub(r"[\[\]]", " ", name_season)
seasons = re.findall(season_rule, name_season)
if not seasons:
return name_season, "", 1
name = re.sub(season_rule, "", name_season)
for season in seasons:
season_raw = season
if re.search(r"Season|S", season) is not None:
season = int(re.sub(r"Season|S", "", season))
break
elif re.search(r"[第 ].*[季期(部分)]|部分", season) is not None:
season_pro = re.sub(r"[第季期 ]", "", season)
try:
season = int(season_pro)
except ValueError:
season = CHINESE_NUMBER_MAP[season_pro]
break
return name, season_raw, season
@staticmethod
def name_process(name: str):
name_en, name_zh, name_jp = None, None, None
name = name.strip()
name = re.sub(r"[(]仅限港澳台地区[)]", "", name)
split = re.split("/|\s{2}|-\s{2}", name)
while "" in split:
split.remove("")
if len(split) == 1:
if re.search("_{1}", name) is not None:
split = re.split("_", name)
elif re.search(" - {1}", name) is not None:
split = re.split("-", name)
if len(split) == 1:
split_space = split[0].split(" ")
for idx, item in enumerate(split_space):
if re.search(r"^[\u4e00-\u9fa5]{2,}", item) is not None:
split_space.remove(item)
split = [item.strip(), " ".join(split_space).strip()]
break
for item in split:
if re.search(r"[\u0800-\u4e00]{2,}", item) and not name_jp:
name_jp = item.strip()
elif re.search(r"[\u4e00-\u9fa5]{2,}", item) and not name_zh:
name_zh = item.strip()
elif re.search(r"[a-zA-Z]{3,}", item) and not name_en:
name_en = item.strip()
return name_en, name_zh, name_jp
@staticmethod
def find_tags(other):
elements = re.sub(r"[\[\]()]", " ", other).split(" ")
# find CHT
sub, resolution, source = None, None, None
for element in filter(lambda x: x != "", elements):
if SUB_RE.search(element):
sub = element
elif RESOLUTION_RE.search(element):
resolution = element
elif SOURCE_RE.search(element):
source = element
return RawParser.clean_sub(sub), resolution, source
@staticmethod
def clean_sub(sub: str | None) -> str | None:
if sub is None:
return sub
return re.sub(r"_MP4|_MKV", "", sub)
def process(self, raw_title: str):
raw_title = raw_title.strip()
content_title = self.pre_process(raw_title)
# 预处理标题
group = self.get_group(content_title)
# 翻译组的名字
match_obj = TITLE_RE.match(content_title)
# 处理标题
season_info, episode_info, other = list(map(
lambda x: x.strip(), match_obj.groups()
))
process_raw = self.prefix_process(season_info, group)
# 处理 前缀
raw_name, season_raw, season = self.season_process(process_raw)
# 处理 第n季
name_en, name_zh, name_jp = "", "", ""
try:
name_en, name_zh, name_jp = self.name_process(raw_name)
# 处理 名字
except ValueError:
pass
# 处理 集数
raw_episode = EPISODE_RE.search(episode_info)
episode = 0
if raw_episode is not None:
episode = int(raw_episode.group())
sub, dpi, source = self.find_tags(other) # 剩余信息处理
return name_en, name_zh, name_jp, season, season_raw, episode, sub, dpi, source, group
def analyse(self, raw: str) -> Episode | None:
ret = self.process(raw)
if ret is None:
logger.error(f"Parser cannot analyse {raw}")
return None
name_en, name_zh, name_jp, season, sr, episode, \
sub, dpi, source, group = ret
return Episode(name_en, name_zh, name_jp, season, sr, episode, sub, group, dpi, source)
if __name__ == '__main__':
test_list = [
"[Lilith-Raws] 关于我在无意间被隔壁的天使变成废柴这件事 / Otonari no Tenshi-sama - 09 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4]",
"【幻樱字幕组】【4月新番】【古见同学有交流障碍症 第二季 Komi-san wa, Komyushou Desu. S02】【22】【GB_MP4】【1920X1080】",
"[百冬练习组&LoliHouse] BanG Dream! 少女乐团派对☆PICO FEVER / Garupa Pico: Fever! - 26 [WebRip 1080p HEVC-10bit AAC][简繁内封字幕][END]"
]
parser = RawParser()
for l in test_list:
ep = parser.analyse(l)
print(f"en: {ep.title_en}, zh: {ep.title_zh}, jp: {ep.title_jp}, group: {ep.group}")

View File

@@ -0,0 +1,102 @@
import re
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class DownloadInfo:
name: str
season: int
suffix: str
file_name: str
folder_name: str
class DownloadParser:
def __init__(self):
self.rules = [
r"(.*) - (\d{1,4}|\d{1,4}\.\d{1,2})(?:v\d{1,2})?(?: )?(?:END)?(.*)",
r"(.*)[\[ E](\d{1,3}|\d{1,3}\.\d{1,2})(?:v\d{1,2})?(?: )?(?:END)?[\] ](.*)",
r"(.*)\[第(\d*\.*\d*)话(?:END)?\](.*)",
r"(.*)\[第(\d*\.*\d*)話(?:END)?\](.*)",
r"(.*)第(\d*\.*\d*)话(?:END)?(.*)",
r"(.*)第(\d*\.*\d*)話(?:END)?(.*)",
]
@staticmethod
def rename_init(name, folder_name, season, suffix) -> DownloadInfo:
n = re.split(r"[\[\]()【】()]", name)
suffix = suffix if suffix is not None else n[-1]
file_name = name.replace(f"[{n[1]}]", "")
if season < 10:
season = f"0{season}"
return DownloadInfo(name, season, suffix, file_name, folder_name)
def rename_normal(self, info: DownloadInfo):
for rule in self.rules:
match_obj = re.match(rule, info.name, re.I)
if match_obj is not None:
title = re.sub(r"([Ss]|Season )\d{1,3}", "", match_obj.group(1)).strip()
new_name = f"{title} S{info.season}E{match_obj.group(2)}{match_obj.group(3)}"
return new_name
def rename_pn(self, info: DownloadInfo):
for rule in self.rules:
match_obj = re.match(rule, info.file_name, re.I)
if match_obj is not None:
title = re.sub(r"([Ss]|Season )\d{1,3}", "", match_obj.group(1)).strip()
title = title if title != "" else info.folder_name
new_name = re.sub(
r"[\[\]]",
"",
f"{title} S{info.season}E{match_obj.group(2)}{info.suffix}",
)
return new_name
def rename_advance(self, info: DownloadInfo):
for rule in self.rules:
match_obj = re.match(rule, info.file_name, re.I)
if match_obj is not None:
new_name = re.sub(
r"[\[\]]",
"",
f"{info.folder_name} S{info.season}E{match_obj.group(2)}{info.suffix}",
)
return new_name
def rename_no_season_pn(self, info: DownloadInfo):
for rule in self.rules:
match_obj = re.match(rule, info.file_name, re.I)
if match_obj is not None:
title = match_obj.group(1).strip()
new_name = re.sub(
r"[\[\]]",
"",
f"{title} E{match_obj.group(2)}{info.suffix}",
)
return new_name
@staticmethod
def rename_none(info: DownloadInfo):
return info.name
def download_rename(self, name, folder_name, season, suffix, method):
rename_info = self.rename_init(name, folder_name, season, suffix)
method_dict = {
"normal": self.rename_normal,
"pn": self.rename_pn,
"advance": self.rename_advance,
"no_season_pn": self.rename_no_season_pn,
"none": self.rename_none
}
return method_dict[method.lower()](rename_info)
if __name__ == "__main__":
name = "[Lilith-Raws] Tate no Yuusha no Nariagari S02 - 02 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4]"
rename = DownloadParser()
new_name = rename.download_rename(name, "异世界舅舅2022", 1, ".mp4", "normal")
print(new_name)

View File

@@ -0,0 +1,73 @@
import re
import time
from dataclasses import dataclass
from module.network import RequestContent
from module.conf import TMDB_API
@dataclass
class TMDBInfo:
id: int
title_jp: str
title_zh: str
season: dict
last_season: int
year_number: int
class TMDBMatcher:
def __init__(self):
self.search_url = lambda e: \
f"https://api.themoviedb.org/3/search/tv?api_key={TMDB_API}&page=1&query={e}&include_adult=false"
self.info_url = lambda e: \
f"https://api.themoviedb.org/3/tv/{e}?api_key={TMDB_API}&language=zh-CN"
def is_animation(self, tv_id) -> bool:
url_info = self.info_url(tv_id)
with RequestContent() as req:
type_id = req.get_json(url_info)["genres"]
for type in type_id:
if type.get("id") == 16:
return True
return False
# def get_zh_title(self, id):
# alt_title_url = self.alt_title_url(id)
# titles = self._request.get_content(alt_title_url, content="json")
# for title in titles:
# if title["iso_3166_1"] == "CN":
# return title["title"]
# return None
@staticmethod
def get_season(seasons: list) -> int:
for season in seasons:
if re.search(r"\d 季", season.get("season")) is not None:
date = season.get("air_date").split("-")
[year, _ , _] = date
now_year = time.localtime().tm_year
if int(year) == now_year:
return int(re.findall(r"\d", season.get("season"))[0])
def tmdb_search(self, title) -> TMDBInfo:
with RequestContent() as req:
url = self.search_url(title)
contents = req.get_json(url).get("results")
if contents.__len__() == 0:
url = self.search_url(title.replace(" ", ""))
contents = req.get_json(url).get("results")
# 判断动画
for content in contents:
id = content["id"]
if self.is_animation(id):
break
url_info = self.info_url(id)
info_content = req.get_json(url_info)
season = [{"season": s.get("name"), "air_date": s.get("air_date")} for s in info_content.get("seasons")]
last_season = self.get_season(season)
title_jp = info_content.get("original_name")
title_zh = info_content.get("name")
year_number = info_content.get("first_air_date").split("-")[0]
return TMDBInfo(id, title_jp, title_zh, season, last_season, year_number)

View File

@@ -0,0 +1,52 @@
from thefuzz import fuzz
import logging
from utils import json_config
from conf import settings
logger = logging.getLogger(__name__)
class FuzzMatch:
def __init__(self):
try:
anidb_data = json_config.get(settings.anidb_url)
json_config.save(settings.anidb_path, anidb_data)
except Exception as e:
logger.debug(e)
logger.info(f"Fail to get anidb data, reading local data")
anidb_data = json_config.load(settings.anidb_path)
self.match_data = anidb_data
@staticmethod
def match(title_raw, info: dict):
compare_value = []
for tag in ["main", "en", "ja", "zh-Hans", "zh-Hant"]:
if info[tag] is not None:
a = fuzz.token_sort_ratio(title_raw.lower(), info[tag].lower())
compare_value.append(a)
for compare in info["other"]:
a = fuzz.token_sort_ratio(title_raw.lower(), compare.lower())
compare_value.append(a)
return max(compare_value)
def find_max_name(self, title_raw):
max_value = 0
max_info = None
for info in self.match_data:
a = self.match(title_raw, info)
if a > max_value:
max_value = a
max_info = info
return max_value, max_info["main"]
# logger.debug(max(value))
if __name__ == "__main__":
from conf.const_dev import DEV_SETTINGS
settings.init(DEV_SETTINGS)
f = FuzzMatch()
name = "勇者、辞职不干了"
value, title = f.find_max_name(name)
print(f"Raw Name: {name} \n"
f"Match Name: {title} \n"
f"Match Value: {value}")

View File

@@ -0,0 +1,64 @@
import logging
from .analyser import RawParser, DownloadParser, TMDBMatcher
from module.conf import settings
logger = logging.getLogger(__name__)
LANGUAGE = settings.rss_parser.language
class TitleParser:
def __init__(self):
self._raw_parser = RawParser()
self._download_parser = DownloadParser()
self._tmdb_parser = TMDBMatcher()
def raw_parser(self, raw: str):
return self._raw_parser.analyse(raw)
def download_parser(self, download_raw, folder_name, season, suffix, method=settings.bangumi_manage.method):
return self._download_parser.download_rename(download_raw, folder_name, season, suffix, method)
def tmdb_parser(self, title: str, season: int):
official_title, tmdb_season = None, None
try:
tmdb_info = self._tmdb_parser.tmdb_search(title)
logger.debug(f"TMDB Matched, official title is {tmdb_info.title_zh}")
except Exception as e:
logger.debug(e)
logger.warning("Not Matched with TMDB")
return title, season
if LANGUAGE == "zh":
official_title = f"{tmdb_info.title_zh} ({tmdb_info.year_number})"
elif LANGUAGE == "jp":
official_title = f"{tmdb_info.title_jp} ({tmdb_info.year_number})"
tmdb_season = tmdb_info.last_season if tmdb_info.last_season else season
official_title = official_title if official_title else title
return official_title, tmdb_season
def return_dict(self, _raw: str):
try:
episode = self.raw_parser(_raw)
title_search = episode.title_zh if episode.title_zh else episode.title_en
title_raw = episode.title_en if episode.title_en else episode.title_zh
if settings.rss_parser.enable_tmdb:
official_title, _season = self.tmdb_parser(title_search, episode.season)
else:
official_title = title_search if LANGUAGE == "zh" else title_raw
_season = episode.season
data = {
"official_title": official_title,
"title_raw": title_raw,
"season": _season,
"season_raw": episode.season_raw,
"group": episode.group,
"dpi": episode.resolution,
"source": episode.source,
"subtitle": episode.sub,
"added": False,
"eps_collect": True if episode.episode > 1 else False,
}
logger.debug(f"RAW:{_raw} >> {episode.title_en}")
return data
except Exception as e:
logger.debug(e)

View File

@@ -0,0 +1 @@
from .rss_analyser import RSSAnalyser

View File

@@ -0,0 +1,52 @@
import re
import logging
from module.network import RequestContent
from module.parser import TitleParser
from module.conf import settings
from module.core import DownloadClient
logger = logging.getLogger(__name__)
class RSSAnalyser:
def __init__(self):
self._title_analyser = TitleParser()
def rss_to_datas(self, bangumi_info: list) -> list:
with RequestContent() as req:
rss_torrents = req.get_torrents(settings.rss_parser.link)
for torrent in rss_torrents:
raw_title = torrent.name
extra_add = True
if bangumi_info is not []:
for d in bangumi_info:
if re.search(d["title_raw"], raw_title) is not None:
logger.debug(f"Had added {d['title_raw']} in auto_download rule before")
extra_add = False
break
if extra_add:
data = self._title_analyser.return_dict(raw_title)
if data is not None and data["official_title"] not in bangumi_info:
bangumi_info.append(data)
return bangumi_info
def rss_to_data(self, url) -> dict:
with RequestContent() as req:
rss_torrents = req.get_torrents(url)
for torrent in rss_torrents:
try:
data = self._title_analyser.return_dict(torrent.name)
return data
except Exception as e:
logger.debug(e)
def run(self, bangumi_info: list, download_client: DownloadClient):
logger.info("Start collecting RSS info.")
try:
self.rss_to_datas(bangumi_info)
download_client.add_rules(bangumi_info, rss_link=settings.rss_parser.link)
except Exception as e:
logger.debug(e)
logger.info("Finished")

10
src/module/setID.sh Normal file
View File

@@ -0,0 +1,10 @@
#!/bin/bash
echo "设置文件夹权限"
echo "PUID=${PUID}"
echo "PGID=${PGID}"
groupmod -o -g "$PGID" auto_bangumi
usermod -o -u "$PUID" auto_bangumi
chown -R auto_bangumi:auto_bangumi /src /templates /config

View File

View File

@@ -0,0 +1,18 @@
import json
import requests
def load(filename):
with open(filename, "r", encoding="utf-8") as f:
return json.load(f)
def save(filename, obj):
with open(filename, "w", encoding="utf-8") as f:
json.dump(obj, f, indent=4, separators=(",", ": "), ensure_ascii=False)
pass
def get(url):
req = requests.get(url)
return req.json()