New TorrentInfo class.

This commit is contained in:
EstrellaXD
2023-05-16 22:10:20 +08:00
parent f6e08f61bc
commit e4d17ee60b
8 changed files with 62 additions and 34 deletions

View File

@@ -11,18 +11,13 @@ def check_status() -> bool:
if settings.rss_parser.token in ["", "token"]:
logger.warning("Please set RSS token")
return False
if check_rss():
if check_downloader():
logger.debug("All check passed")
return True
if check_downloader():
logger.debug("All check passed")
return True
return False
def check_downloader():
with RequestContent() as req:
if not req.check_connection():
logger.warning("Can't connect to downloader")
return False
with DownloadClient() as client:
if client.authed:
logger.debug("Downloader is running")

View File

@@ -157,7 +157,7 @@ class BangumiDatabase(DataConnector):
return poster_link
return ""
def match_list(self, title_dict: dict, rss_link: str) -> dict:
def match_list(self, torrent_list: list, rss_link: str) -> list:
# Match title_raw in database
self._cursor.execute(
"""
@@ -166,17 +166,21 @@ class BangumiDatabase(DataConnector):
)
data = self._cursor.fetchall()
if not data:
return title_dict
return torrent_list
# Match title
for title in title_dict.copy().keys():
i = 0
while i < len(torrent_list):
torrent = torrent_list[i]
for title_raw, rss_set in data:
if title_raw in title:
if title_raw in torrent.name:
if rss_link not in rss_set:
rss_set += "," + rss_link
self.update_rss(title_raw, rss_set)
title_dict.pop(title)
torrent_list.pop(i)
break
return title_dict
else:
i += 1
return torrent_list
def not_complete(self) -> list[BangumiData]:
# Find eps_complete = False

View File

@@ -0,0 +1 @@
from .inspector import Inspector

View File

@@ -0,0 +1,10 @@
class Inspector:
def __init__(self):
pass
def check_downloader(self) -> bool:
pass
def check_link(self, url) -> bool:
pass

View File

@@ -4,6 +4,9 @@ from pydantic import BaseModel, Field
class TorrentInfo(BaseModel):
name: str = Field(...)
link: str = Field(...)
homepage: str | None = Field(None)
poster_link: str = Field(...)
official_title: str = Field(...)
class FileSet(BaseModel):

View File

@@ -12,6 +12,22 @@ class TorrentInfo:
name: str
torrent_link: str
homepage: str = None
_poster_link: str = None
_official_title: str = None
@property
def poster_link(self) -> str:
if self._poster_link is None:
with RequestContent() as req:
self._poster_link, self._official_title = req.get_mikan_info(self.homepage)
return self._poster_link
@property
def official_title(self) -> str:
if self._official_title is None:
with RequestContent() as req:
self._poster_link, self._official_title = req.get_mikan_info(self.homepage)
return self._official_title
class RequestContent(RequestURL):
@@ -67,5 +83,5 @@ class RequestContent(RequestURL):
def get_content(self, _url):
return self.get_url(_url).content
def check_connection(self, _url=settings.downloader.host):
def check_connection(self, _url):
return self.check_url(_url)

View File

@@ -37,35 +37,33 @@ class RSSAnalyser:
rss_torrents = req.get_torrents(rss_link, "\\d+-\\d+")
return rss_torrents
def get_new_data_list(self, new_dict: dict, rss_link: str, _id: int, full_parse: bool = True) -> list:
def get_new_data_list(self, torrents: list, rss_link: str, _id: int, full_parse: bool = True) -> list:
new_data = []
with RequestContent() as req:
for raw_title, homepage in new_dict.items():
data = self._title_analyser.raw_parser(
raw=raw_title, rss_link=rss_link, _id=_id
)
if data and data.title_raw not in [i.title_raw for i in new_data]:
poster_link, mikan_title = req.get_mikan_info(homepage)
data.poster_link = poster_link
self.official_title_parser(data, mikan_title)
if not full_parse:
return [data]
new_data.append(data)
_id += 1
logger.debug(f"New title found: {data.official_title}")
for torrent in torrents:
data = self._title_analyser.raw_parser(
raw=torrent.name, rss_link=rss_link, _id=_id
)
if data and data.title_raw not in [i.title_raw for i in new_data]:
poster_link, mikan_title = torrent.poster_link, torrent.official_title
data.poster_link = poster_link
self.official_title_parser(data, mikan_title)
if not full_parse:
return [data]
new_data.append(data)
_id += 1
logger.debug(f"New title found: {data.official_title}")
return new_data
def rss_to_data(self, rss_link: str, full_parse: bool = True) -> list[BangumiData]:
rss_torrents = self.get_rss_torrents(rss_link, full_parse)
title_dict = {torrent.name: torrent.homepage for torrent in rss_torrents}
with BangumiDatabase() as database:
new_dict = database.match_list(title_dict, rss_link)
if not new_dict:
torrents_to_add = database.match_list(rss_torrents, rss_link)
if not torrents_to_add:
logger.debug("No new title found.")
return []
_id = database.gen_id()
# New List
new_data = self.get_new_data_list(new_dict, rss_link, _id, full_parse)
new_data = self.get_new_data_list(torrents_to_add, rss_link, _id, full_parse)
database.insert_list(new_data)
return new_data
@@ -75,4 +73,5 @@ class RSSAnalyser:
self.rss_to_data(rss_link)
except Exception as e:
logger.debug(e)
print(e)
logger.error("Failed to collect RSS info.")

View File