mirror of
https://github.com/EstrellaXD/Auto_Bangumi.git
synced 2026-04-25 19:12:11 +08:00
@@ -5,7 +5,7 @@ from module.core import DownloadClient
|
||||
from module.manager import FullSeasonGet
|
||||
from module.rss import RSSAnalyser
|
||||
from module.utils import json_config
|
||||
from module.conf import DATA_PATH, settings
|
||||
from module.conf import DATA_PATH
|
||||
from module.conf.config import save_config_to_file, CONFIG_PATH
|
||||
from module.models import Config
|
||||
from module.network import RequestContent
|
||||
@@ -16,13 +16,14 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class APIProcess:
|
||||
def __init__(self):
|
||||
self._rss_analyser = RSSAnalyser()
|
||||
self._client = DownloadClient()
|
||||
self._full_season_get = FullSeasonGet()
|
||||
def __init__(self, settings: Config):
|
||||
self._rss_analyser = RSSAnalyser(settings)
|
||||
self._client = DownloadClient(settings)
|
||||
self._full_season_get = FullSeasonGet(settings)
|
||||
self._custom_url = settings.rss_parser.custom_url
|
||||
|
||||
def link_process(self, link):
|
||||
return self._rss_analyser.rss_to_data(link, filter=False)
|
||||
return self._rss_analyser.rss_to_data(link, _filter=False)
|
||||
|
||||
@api_failed
|
||||
def download_collection(self, link):
|
||||
@@ -85,10 +86,9 @@ class APIProcess:
|
||||
def get_config() -> dict:
|
||||
return json_config.load(CONFIG_PATH)
|
||||
|
||||
@staticmethod
|
||||
def get_rss(full_path: str):
|
||||
def get_rss(self, full_path: str):
|
||||
url = f"https://mikanani.me/RSS/{full_path}"
|
||||
custom_url = settings.rss_parser.custom_url
|
||||
custom_url = self._custom_url
|
||||
if "://" not in custom_url:
|
||||
custom_url = f"https://{custom_url}"
|
||||
with RequestContent() as request:
|
||||
|
||||
@@ -3,25 +3,22 @@ import logging
|
||||
import os
|
||||
|
||||
from module.downloader import getClient
|
||||
from module.conf import settings
|
||||
from module.models import BangumiData
|
||||
from module.models import BangumiData, Config
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DownloadClient:
|
||||
def __init__(self):
|
||||
self.client = getClient()
|
||||
def __init__(self, settings: Config):
|
||||
self.client = getClient(settings)
|
||||
self.authed = False
|
||||
self.download_path = settings.downloader.path
|
||||
self.group_tag = settings.bangumi_manage.group_tag
|
||||
|
||||
def auth(self):
|
||||
host, username, password = settings.downloader.host, settings.downloader.username, settings.downloader.password
|
||||
try:
|
||||
self.client.auth(host, username, password)
|
||||
self.authed = True
|
||||
except Exception as e:
|
||||
logger.error(f"Can't login {host} by {username}, {e}")
|
||||
self.client.auth()
|
||||
self.authed = True
|
||||
|
||||
def init_downloader(self):
|
||||
prefs = {
|
||||
@@ -36,16 +33,16 @@ class DownloadClient:
|
||||
except Exception as e:
|
||||
logger.warning("Cannot add new category, maybe already exists.")
|
||||
logger.debug(e)
|
||||
if settings.downloader.path == "":
|
||||
if self.download_path == "":
|
||||
prefs = self.client.get_app_prefs()
|
||||
settings.downloader.path = os.path.join(prefs["save_path"], "Bangumi")
|
||||
self.download_path = os.path.join(prefs["save_path"], "Bangumi")
|
||||
|
||||
def set_rule(self, info: BangumiData, rss_link):
|
||||
official_name, raw_name, season, group = info.official_title, info.title_raw, info.season, info.group
|
||||
rule = {
|
||||
"enable": True,
|
||||
"mustContain": raw_name,
|
||||
"mustNotContain": "|".join(settings.rss_parser.filter),
|
||||
"mustNotContain": "|".join(info.filter),
|
||||
"useRegex": True,
|
||||
"episodeFilter": "",
|
||||
"smartFilter": False,
|
||||
@@ -57,28 +54,23 @@ class DownloadClient:
|
||||
"assignedCategory": "Bangumi",
|
||||
"savePath": str(
|
||||
os.path.join(
|
||||
settings.downloader.path,
|
||||
self.download_path,
|
||||
re.sub(r"[:/.]", " ", official_name).strip(),
|
||||
f"Season {season}",
|
||||
)
|
||||
),
|
||||
}
|
||||
rule_name = f"[{group}] {official_name}" if settings.bangumi_manage.group_tag else official_name
|
||||
rule_name = f"[{group}] {official_name}" if self.group_tag else official_name
|
||||
self.client.rss_set_rule(rule_name=f"{rule_name} S{season}", rule_def=rule)
|
||||
logger.info(f"Add {official_name} Season {season}")
|
||||
|
||||
def rss_feed(self, rss_link, item_path="Mikan_RSS"):
|
||||
# TODO: 定时刷新 RSS
|
||||
if self.client.get_rss_info(rss_link):
|
||||
logger.info("RSS Already exists.")
|
||||
else:
|
||||
logger.info("No feed exists, start adding feed.")
|
||||
self.client.rss_add_feed(url=rss_link, item_path="Mikan_RSS")
|
||||
logger.info("Add RSS Feed successfully.")
|
||||
self.client.rss_add_feed(url=rss_link, item_path=item_path)
|
||||
|
||||
def add_collection_feed(self, rss_link, item_path):
|
||||
self.client.rss_add_feed(url=rss_link, item_path=item_path)
|
||||
logger.info("Add RSS Feed successfully.")
|
||||
logger.info("Add Collection RSS Feed successfully.")
|
||||
|
||||
def add_rules(self, bangumi_info: list[BangumiData], rss_link: str):
|
||||
logger.debug("Start adding rules.")
|
||||
|
||||
Reference in New Issue
Block a user