diff --git a/src/main.py b/src/main.py index a6b803e5..bb3a8837 100644 --- a/src/main.py +++ b/src/main.py @@ -2,7 +2,6 @@ import os import logging import uvicorn - from module.api import router from module.conf import settings from module.conf.uvicorn_logging import logging_config diff --git a/src/module/core/__init__.py b/src/module/core/__init__.py index 14c45acb..8ba882af 100644 --- a/src/module/core/__init__.py +++ b/src/module/core/__init__.py @@ -1,2 +1 @@ -from .api_func import APIProcess from .sub_thread import * diff --git a/src/module/core/api_func.py b/src/module/core/api_func.py deleted file mode 100644 index 51d54a1b..00000000 --- a/src/module/core/api_func.py +++ /dev/null @@ -1,100 +0,0 @@ -import re -import logging - -from module.downloader import DownloadClient -from module.manager import FullSeasonGet -from module.rss import RSSAnalyser -from module.utils import json_config -from module.conf import DATA_PATH, settings -from module.models import Config -from module.network import RequestContent - -from module.ab_decorator import api_failed - -logger = logging.getLogger(__name__) - - -class APIProcess: - def __init__(self): - self._rss_analyser = RSSAnalyser() - self._client = DownloadClient() - self._full_season_get = FullSeasonGet() - self._custom_url = settings.rss_parser.custom_url - - def link_process(self, link): - return self._rss_analyser.rss_to_data(link) - - @api_failed - def download_collection(self, link): - if not self._client.authed: - self._client.auth() - data = self.link_process(link) - self._full_season_get.download_collection(data, link) - return data - - @api_failed - def add_subscribe(self, link): - if not self._client.authed: - self._client.auth() - data = self.link_process(link) - self._client.add_rss_feed(link, data.official_title) - self._client.set_rule(data, link) - return data - - @staticmethod - def reset_rule(): - data = json_config.load(DATA_PATH) - data["bangumi_info"] = [] - json_config.save(DATA_PATH, data) - return "Success" - - @staticmethod - def remove_rule(_id: int): - datas = json_config.load(DATA_PATH)["bangumi_info"] - for data in datas: - if data["id"] == _id: - datas.remove(data) - break - json_config.save(DATA_PATH, datas) - return "Success" - - @staticmethod - def add_rule(title, season): - data = json_config.load(DATA_PATH) - extra_data = { - "official_title": title, - "title_raw": title, - "season": season, - "season_raw": "", - "dpi": "", - "group": "", - "eps_complete": False, - "added": False, - } - data["bangumi_info"].append(extra_data) - json_config.save(DATA_PATH, data) - return "Success" - - @staticmethod - def update_config(config: Config): - settings.load() - return {"message": "Success"} - - @staticmethod - def get_config() -> dict: - return settings.dict() - - def get_rss(self, full_path: str): - url = f"https://mikanani.me/RSS/{full_path}" - custom_url = self._custom_url - if "://" not in custom_url: - custom_url = f"https://{custom_url}" - with RequestContent() as request: - content = request.get_html(url) - return re.sub(r"https://mikanani.me", custom_url, content) - - @staticmethod - def get_torrent(full_path): - url = f"https://mikanani.me/Download/{full_path}" - with RequestContent() as request: - return request.get_content(url) diff --git a/src/module/network/request_contents.py b/src/module/network/request_contents.py index d63eae4f..a6efa0f4 100644 --- a/src/module/network/request_contents.py +++ b/src/module/network/request_contents.py @@ -6,8 +6,6 @@ from bs4 import BeautifulSoup from .request_url import RequestURL from module.conf import settings -FILTER = "|".join(settings.rss_parser.filter) - @dataclass class TorrentInfo: @@ -18,7 +16,10 @@ class TorrentInfo: class RequestContent(RequestURL): # Mikanani RSS - def get_torrents(self, _url: str, _filter: bool = True) -> [TorrentInfo]: + def get_torrents( + self, + _url: str, + _filter: str = "|".join(settings.rss_parser.filter)) -> [TorrentInfo]: soup = self.get_xml(_url) torrent_titles = [] torrent_urls = [] @@ -31,10 +32,7 @@ class RequestContent(RequestURL): torrents = [] for _title, torrent_url, homepage in zip(torrent_titles, torrent_urls, torrent_homepage): - if _filter: - if re.search(FILTER, _title) is None: - torrents.append(TorrentInfo(_title, torrent_url, homepage)) - else: + if re.search(_filter, _title) is None: torrents.append(TorrentInfo(_title, torrent_url, homepage)) return torrents diff --git a/src/module/rss/rss_analyser.py b/src/module/rss/rss_analyser.py index bbbfda16..5ccfac84 100644 --- a/src/module/rss/rss_analyser.py +++ b/src/module/rss/rss_analyser.py @@ -28,9 +28,35 @@ class RSSAnalyser: else: pass - def rss_to_data(self, rss_link: str, full_parse: bool = True) -> list[BangumiData]: + @staticmethod + def get_rss_torrents(rss_link: str, full_parse: bool = True) -> list: with RequestContent() as req: - rss_torrents = req.get_torrents(rss_link) + if full_parse: + rss_torrents = req.get_torrents(rss_link) + else: + rss_torrents = req.get_torrents(rss_link, "\\d+-\\d+") + return rss_torrents + + def get_new_data_list(self, new_dict: dict, rss_link: str, _id: int, full_parse: bool = True) -> list: + new_data = [] + with RequestContent() as req: + for raw_title, homepage in new_dict.items(): + data = self._title_analyser.raw_parser( + raw=raw_title, rss_link=rss_link, _id=_id + ) + if data and data.title_raw not in [i.title_raw for i in new_data]: + poster_link, mikan_title = req.get_mikan_info(homepage) + data.poster_link = poster_link + self.official_title_parser(data, mikan_title) + if not full_parse: + return [data] + new_data.append(data) + _id += 1 + logger.debug(f"New title found: {data.official_title}") + return new_data + + def rss_to_data(self, rss_link: str, full_parse: bool = True) -> list[BangumiData]: + rss_torrents = self.get_rss_torrents(rss_link, full_parse) title_dict = {torrent.name: torrent.homepage for torrent in rss_torrents} with BangumiDatabase() as database: new_dict = database.match_list(title_dict, rss_link) @@ -38,26 +64,10 @@ class RSSAnalyser: logger.debug("No new title found.") return [] _id = database.gen_id() - new_data = [] # New List - with RequestContent() as req: - for raw_title, homepage in new_dict.items(): - data = self._title_analyser.raw_parser( - raw=raw_title, rss_link=rss_link, _id=_id - ) - if data and data.title_raw not in [i.title_raw for i in new_data]: - poster_link, mikan_title = req.get_mikan_info(homepage) - data.poster_link = poster_link - # Official title type - self.official_title_parser(data, mikan_title) - if not full_parse: - database.insert(data) - return [data] - new_data.append(data) - _id += 1 - logger.debug(f"New title found: {data.official_title}") - database.insert_list(new_data) - return new_data + new_data = self.get_new_data_list(new_dict, rss_link, _id, full_parse) + database.insert_list(new_data) + return new_data def run(self, rss_link: str): logger.info("Start collecting RSS info.")