chore: remove TorrentInfo Class. Rmove filter.py

This commit is contained in:
EstrellaXD
2023-08-07 14:40:27 +08:00
parent 2e46676fa9
commit baa4d54dea
6 changed files with 16 additions and 120 deletions

View File

@@ -2,7 +2,7 @@ import logging
from module.conf import settings
from module.models import Bangumi, Torrent
from module.network import RequestContent, TorrentInfo
from module.network import RequestContent
from .path import TorrentPath
@@ -114,9 +114,7 @@ class DownloadClient(TorrentPath):
self.client.torrents_delete(hashes)
logger.info("[Downloader] Remove torrents.")
def add_torrent(
self, torrent: Torrent | TorrentInfo | list, bangumi: Bangumi
) -> bool:
def add_torrent(self, torrent: Torrent | list, bangumi: Bangumi) -> bool:
if not bangumi.save_path:
bangumi.save_path = self._gen_save_path(bangumi)
with RequestContent() as req:

View File

@@ -1 +1 @@
from .request_contents import RequestContent, TorrentInfo
from .request_contents import RequestContent

View File

@@ -1,58 +1,32 @@
import re
import xml.etree.ElementTree
from dataclasses import dataclass
from bs4 import BeautifulSoup
from module.conf import settings
from module.models import Torrent
from .request_url import RequestURL
from .site import mikan_parser
@dataclass
class TorrentInfo:
name: str
url: str
homepage: str
_poster_link: str | None = None
_official_title: str | None = None
def __fetch_mikan_info(self):
if self._poster_link is None or self._official_title is None:
with RequestContent() as req:
self._poster_link, self._official_title = req.get_mikan_info(
self.homepage
)
@property
def poster_link(self) -> str:
self.__fetch_mikan_info()
return self._poster_link
@property
def official_title(self) -> str:
self.__fetch_mikan_info()
return self._official_title
class RequestContent(RequestURL):
def get_torrents(
self,
_url: str,
_filter: str = "|".join(settings.rss_parser.filter),
retry: int = 3,
) -> list[TorrentInfo]:
) -> list[Torrent]:
try:
soup = self.get_xml(_url, retry)
torrent_titles, torrent_urls, torrent_homepage = mikan_parser(soup)
torrents: list[TorrentInfo] = []
torrents: list[Torrent] = []
for _title, torrent_url, homepage in zip(
torrent_titles, torrent_urls, torrent_homepage
):
if re.search(_filter, _title) is None:
torrents.append(
TorrentInfo(name=_title, url=torrent_url, homepage=homepage)
Torrent(name=_title, url=torrent_url, homepage=homepage)
)
return torrents
except ConnectionError:

View File

@@ -4,8 +4,8 @@ import re
from .engine import RSSEngine
from module.conf import settings
from module.models import Bangumi
from module.network import RequestContent, TorrentInfo
from module.models import Bangumi, Torrent
from module.network import RequestContent
from module.parser import TitleParser
logger = logging.getLogger(__name__)
@@ -27,7 +27,7 @@ class RSSAnalyser(TitleParser):
data.official_title = re.sub(r"[/:.\\]", " ", data.official_title)
@staticmethod
def get_rss_torrents(rss_link: str, full_parse: bool = True) -> list[TorrentInfo]:
def get_rss_torrents(rss_link: str, full_parse: bool = True) -> list[Torrent]:
with RequestContent() as req:
if full_parse:
rss_torrents = req.get_torrents(rss_link)
@@ -42,13 +42,8 @@ class RSSAnalyser(TitleParser):
for torrent in torrents:
data = self.raw_parser(raw=torrent.name)
if data and data.title_raw not in [i.title_raw for i in new_data]:
try:
poster_link, mikan_title = (
torrent.poster_link,
torrent.official_title,
)
except AttributeError:
poster_link, mikan_title = None, None
with RequestContent() as req:
poster_link, mikan_title = req.get_mikan_info(torrent.homepage)
data.poster_link = poster_link
data.rss_link = rss_link
self.official_title_parser(data, mikan_title)
@@ -58,16 +53,11 @@ class RSSAnalyser(TitleParser):
logger.debug(f"[RSS] New title found: {data.official_title}")
return new_data
def torrent_to_data(self, torrent: TorrentInfo) -> Bangumi:
def torrent_to_data(self, torrent: Torrent) -> Bangumi:
data = self.raw_parser(raw=torrent.name)
if data:
try:
poster_link, mikan_title = (
torrent.poster_link,
torrent.official_title,
)
except AttributeError:
poster_link, mikan_title = None, None
with RequestContent() as req:
poster_link, mikan_title = req.get_mikan_info(torrent.homepage)
data.poster_link = poster_link
self.official_title_parser(data, mikan_title)
return data

View File

@@ -19,16 +19,7 @@ class RSSEngine(Database):
@staticmethod
def _get_torrents(rss_link: str) -> list[Torrent]:
with RequestContent() as req:
torrent_infos = req.get_torrents(rss_link)
torrents: list[Torrent] = []
for torrent_info in torrent_infos:
torrents.append(
Torrent(
name=torrent_info.name,
url=torrent_info.url,
homepage=torrent_info.homepage,
)
)
torrents = req.get_torrents(rss_link)
return torrents
def get_combine_rss(self) -> list[RSSItem]:

View File

@@ -1,57 +0,0 @@
import logging
from module.conf import settings
from module.database import BangumiDatabase
from module.downloader import DownloadClient
from module.models import Bangumi
from module.network import RequestContent
logger = logging.getLogger(__name__)
def matched(torrent_title: str):
with BangumiDatabase() as db:
return db.match_torrent(torrent_title)
def save_path(data: Bangumi):
folder = (
f"{data.official_title}({data.year})" if data.year else f"{data.official_title}"
)
season = f"Season {data.season}"
return path.join(
settings.downloader.path,
folder,
season,
)
def add_download(data: BangumiData, torrent: TorrentInfo):
torrent = {
"url": torrent.url,
"save_path": save_path(data),
}
with DownloadClient() as client:
client.add_torrent(torrent)
with TorrentDatabase() as db:
db.add_torrent(torrent)
def downloaded(torrent: TorrentInfo):
with TorrentDatabase() as db:
return db.if_downloaded(torrent)
def get_downloads(rss_link: str):
with RequestContent() as req:
torrents = req.get_torrents(rss_link)
for torrent in torrents:
if not downloaded(torrent):
data = matched(torrent.title)
if data:
add_download(data, torrent)
logger.info(f"Add {torrent.title} to download list")
else:
logger.debug(f"{torrent.title} not matched")
else:
logger.debug(f"{torrent.title} already downloaded")