mirror of
https://github.com/EstrellaXD/Auto_Bangumi.git
synced 2026-05-11 10:35:50 +08:00
Rewrite rss, remove api class
This commit is contained in:
@@ -1,2 +1 @@
|
||||
from .api_func import APIProcess
|
||||
from .sub_thread import *
|
||||
|
||||
@@ -1,100 +0,0 @@
|
||||
import re
|
||||
import logging
|
||||
|
||||
from module.downloader import DownloadClient
|
||||
from module.manager import FullSeasonGet
|
||||
from module.rss import RSSAnalyser
|
||||
from module.utils import json_config
|
||||
from module.conf import DATA_PATH, settings
|
||||
from module.models import Config
|
||||
from module.network import RequestContent
|
||||
|
||||
from module.ab_decorator import api_failed
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class APIProcess:
|
||||
def __init__(self):
|
||||
self._rss_analyser = RSSAnalyser()
|
||||
self._client = DownloadClient()
|
||||
self._full_season_get = FullSeasonGet()
|
||||
self._custom_url = settings.rss_parser.custom_url
|
||||
|
||||
def link_process(self, link):
|
||||
return self._rss_analyser.rss_to_data(link)
|
||||
|
||||
@api_failed
|
||||
def download_collection(self, link):
|
||||
if not self._client.authed:
|
||||
self._client.auth()
|
||||
data = self.link_process(link)
|
||||
self._full_season_get.download_collection(data, link)
|
||||
return data
|
||||
|
||||
@api_failed
|
||||
def add_subscribe(self, link):
|
||||
if not self._client.authed:
|
||||
self._client.auth()
|
||||
data = self.link_process(link)
|
||||
self._client.add_rss_feed(link, data.official_title)
|
||||
self._client.set_rule(data, link)
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def reset_rule():
|
||||
data = json_config.load(DATA_PATH)
|
||||
data["bangumi_info"] = []
|
||||
json_config.save(DATA_PATH, data)
|
||||
return "Success"
|
||||
|
||||
@staticmethod
|
||||
def remove_rule(_id: int):
|
||||
datas = json_config.load(DATA_PATH)["bangumi_info"]
|
||||
for data in datas:
|
||||
if data["id"] == _id:
|
||||
datas.remove(data)
|
||||
break
|
||||
json_config.save(DATA_PATH, datas)
|
||||
return "Success"
|
||||
|
||||
@staticmethod
|
||||
def add_rule(title, season):
|
||||
data = json_config.load(DATA_PATH)
|
||||
extra_data = {
|
||||
"official_title": title,
|
||||
"title_raw": title,
|
||||
"season": season,
|
||||
"season_raw": "",
|
||||
"dpi": "",
|
||||
"group": "",
|
||||
"eps_complete": False,
|
||||
"added": False,
|
||||
}
|
||||
data["bangumi_info"].append(extra_data)
|
||||
json_config.save(DATA_PATH, data)
|
||||
return "Success"
|
||||
|
||||
@staticmethod
|
||||
def update_config(config: Config):
|
||||
settings.load()
|
||||
return {"message": "Success"}
|
||||
|
||||
@staticmethod
|
||||
def get_config() -> dict:
|
||||
return settings.dict()
|
||||
|
||||
def get_rss(self, full_path: str):
|
||||
url = f"https://mikanani.me/RSS/{full_path}"
|
||||
custom_url = self._custom_url
|
||||
if "://" not in custom_url:
|
||||
custom_url = f"https://{custom_url}"
|
||||
with RequestContent() as request:
|
||||
content = request.get_html(url)
|
||||
return re.sub(r"https://mikanani.me", custom_url, content)
|
||||
|
||||
@staticmethod
|
||||
def get_torrent(full_path):
|
||||
url = f"https://mikanani.me/Download/{full_path}"
|
||||
with RequestContent() as request:
|
||||
return request.get_content(url)
|
||||
@@ -6,8 +6,6 @@ from bs4 import BeautifulSoup
|
||||
from .request_url import RequestURL
|
||||
from module.conf import settings
|
||||
|
||||
FILTER = "|".join(settings.rss_parser.filter)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TorrentInfo:
|
||||
@@ -18,7 +16,10 @@ class TorrentInfo:
|
||||
|
||||
class RequestContent(RequestURL):
|
||||
# Mikanani RSS
|
||||
def get_torrents(self, _url: str, _filter: bool = True) -> [TorrentInfo]:
|
||||
def get_torrents(
|
||||
self,
|
||||
_url: str,
|
||||
_filter: str = "|".join(settings.rss_parser.filter)) -> [TorrentInfo]:
|
||||
soup = self.get_xml(_url)
|
||||
torrent_titles = []
|
||||
torrent_urls = []
|
||||
@@ -31,10 +32,7 @@ class RequestContent(RequestURL):
|
||||
|
||||
torrents = []
|
||||
for _title, torrent_url, homepage in zip(torrent_titles, torrent_urls, torrent_homepage):
|
||||
if _filter:
|
||||
if re.search(FILTER, _title) is None:
|
||||
torrents.append(TorrentInfo(_title, torrent_url, homepage))
|
||||
else:
|
||||
if re.search(_filter, _title) is None:
|
||||
torrents.append(TorrentInfo(_title, torrent_url, homepage))
|
||||
return torrents
|
||||
|
||||
|
||||
@@ -28,9 +28,35 @@ class RSSAnalyser:
|
||||
else:
|
||||
pass
|
||||
|
||||
def rss_to_data(self, rss_link: str, full_parse: bool = True) -> list[BangumiData]:
|
||||
@staticmethod
|
||||
def get_rss_torrents(rss_link: str, full_parse: bool = True) -> list:
|
||||
with RequestContent() as req:
|
||||
rss_torrents = req.get_torrents(rss_link)
|
||||
if full_parse:
|
||||
rss_torrents = req.get_torrents(rss_link)
|
||||
else:
|
||||
rss_torrents = req.get_torrents(rss_link, "\\d+-\\d+")
|
||||
return rss_torrents
|
||||
|
||||
def get_new_data_list(self, new_dict: dict, rss_link: str, _id: int, full_parse: bool = True) -> list:
|
||||
new_data = []
|
||||
with RequestContent() as req:
|
||||
for raw_title, homepage in new_dict.items():
|
||||
data = self._title_analyser.raw_parser(
|
||||
raw=raw_title, rss_link=rss_link, _id=_id
|
||||
)
|
||||
if data and data.title_raw not in [i.title_raw for i in new_data]:
|
||||
poster_link, mikan_title = req.get_mikan_info(homepage)
|
||||
data.poster_link = poster_link
|
||||
self.official_title_parser(data, mikan_title)
|
||||
if not full_parse:
|
||||
return [data]
|
||||
new_data.append(data)
|
||||
_id += 1
|
||||
logger.debug(f"New title found: {data.official_title}")
|
||||
return new_data
|
||||
|
||||
def rss_to_data(self, rss_link: str, full_parse: bool = True) -> list[BangumiData]:
|
||||
rss_torrents = self.get_rss_torrents(rss_link, full_parse)
|
||||
title_dict = {torrent.name: torrent.homepage for torrent in rss_torrents}
|
||||
with BangumiDatabase() as database:
|
||||
new_dict = database.match_list(title_dict, rss_link)
|
||||
@@ -38,26 +64,10 @@ class RSSAnalyser:
|
||||
logger.debug("No new title found.")
|
||||
return []
|
||||
_id = database.gen_id()
|
||||
new_data = []
|
||||
# New List
|
||||
with RequestContent() as req:
|
||||
for raw_title, homepage in new_dict.items():
|
||||
data = self._title_analyser.raw_parser(
|
||||
raw=raw_title, rss_link=rss_link, _id=_id
|
||||
)
|
||||
if data and data.title_raw not in [i.title_raw for i in new_data]:
|
||||
poster_link, mikan_title = req.get_mikan_info(homepage)
|
||||
data.poster_link = poster_link
|
||||
# Official title type
|
||||
self.official_title_parser(data, mikan_title)
|
||||
if not full_parse:
|
||||
database.insert(data)
|
||||
return [data]
|
||||
new_data.append(data)
|
||||
_id += 1
|
||||
logger.debug(f"New title found: {data.official_title}")
|
||||
database.insert_list(new_data)
|
||||
return new_data
|
||||
new_data = self.get_new_data_list(new_dict, rss_link, _id, full_parse)
|
||||
database.insert_list(new_data)
|
||||
return new_data
|
||||
|
||||
def run(self, rss_link: str):
|
||||
logger.info("Start collecting RSS info.")
|
||||
|
||||
Reference in New Issue
Block a user