mirror of
https://github.com/EstrellaXD/Auto_Bangumi.git
synced 2026-04-13 11:39:45 +08:00
Move rss_analyser.py to database
This commit is contained in:
@@ -115,7 +115,7 @@ class DataOperator(DataConnector):
|
||||
self._conn.commit()
|
||||
return self._cursor.rowcount == 1
|
||||
|
||||
def search(self, _id: int) -> BangumiData | None:
|
||||
def search_id(self, _id: int) -> BangumiData | None:
|
||||
self._cursor.execute('''
|
||||
SELECT * FROM bangumi WHERE id = :id
|
||||
''', {"id": _id})
|
||||
@@ -126,6 +126,17 @@ class DataOperator(DataConnector):
|
||||
dict_data = dict(zip(keys, values))
|
||||
return self.db_to_data(dict_data)
|
||||
|
||||
def search_official_title(self, official_title: str) -> BangumiData | None:
|
||||
self._cursor.execute('''
|
||||
SELECT * FROM bangumi WHERE official_title = :official_title
|
||||
''', {"official_title": official_title})
|
||||
values = self._cursor.fetchone()
|
||||
if values is None:
|
||||
return None
|
||||
keys = [x[0] for x in self._cursor.description]
|
||||
dict_data = dict(zip(keys, values))
|
||||
return self.db_to_data(dict_data)
|
||||
|
||||
def match_title(self, title: str) -> bool:
|
||||
# Select all title_raw
|
||||
self._cursor.execute('''
|
||||
@@ -138,6 +149,19 @@ class DataOperator(DataConnector):
|
||||
return True
|
||||
return False
|
||||
|
||||
def not_exist_titles(self, titles: list[str]) -> list[str]:
|
||||
# Select all title_raw
|
||||
self._cursor.execute('''
|
||||
SELECT title_raw FROM bangumi
|
||||
''')
|
||||
title_raws = [x[0] for x in self._cursor.fetchall()]
|
||||
# Match title
|
||||
for title_raw in title_raws:
|
||||
for title in titles:
|
||||
if title_raw in title:
|
||||
titles.remove(title)
|
||||
return titles
|
||||
|
||||
def gen_id(self) -> int:
|
||||
self._cursor.execute('''
|
||||
SELECT id FROM bangumi ORDER BY id DESC LIMIT 1
|
||||
|
||||
@@ -3,7 +3,7 @@ from dataclasses import dataclass
|
||||
|
||||
|
||||
class BangumiData(BaseModel):
|
||||
id: int | None = Field(None, alias="id", title="番剧ID")
|
||||
id: int = Field(..., alias="id", title="番剧ID")
|
||||
official_title: str = Field(..., alias="official_title", title="番剧中文名")
|
||||
year: int | None = Field(None, alias="year", title="番剧年份")
|
||||
title_raw: str = Field(..., alias="title_raw", title="番剧原名")
|
||||
@@ -13,11 +13,11 @@ class BangumiData(BaseModel):
|
||||
dpi: str | None = Field(None, alias="dpi", title="分辨率")
|
||||
source: str | None = Field(None, alias="source", title="来源")
|
||||
subtitle: str | None = Field(None, alias="subtitle", title="字幕")
|
||||
added: bool = Field(False, alias="added", title="是否已添加")
|
||||
eps_collect: bool = Field(False, alias="eps_collect", title="是否已收集")
|
||||
offset: int = Field(0, alias="offset", title="番剧偏移量")
|
||||
filter: list[str] = Field(..., alias="filter", title="番剧过滤器")
|
||||
rss: list[str] = Field(None, alias="rss", title="番剧RSS链接")
|
||||
poster_link: str | None = Field(None, alias="poster_link", title="番剧海报链接")
|
||||
|
||||
|
||||
class ProgramData(BaseModel):
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import re
|
||||
import xml.etree.ElementTree
|
||||
from dataclasses import dataclass
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from .request_url import RequestURL
|
||||
from module.conf import settings
|
||||
@@ -20,10 +21,12 @@ class RequestContent(RequestURL):
|
||||
soup = self.get_xml(_url)
|
||||
torrent_titles = []
|
||||
torrent_urls = []
|
||||
torrent_homepage = []
|
||||
|
||||
for item in soup.findall("./channel/item"):
|
||||
torrent_titles.append(item.find("title").text)
|
||||
torrent_urls.append(item.find("enclosure").attrib['url'])
|
||||
torrent_homepage.append(item.find("link").text)
|
||||
|
||||
torrents = []
|
||||
for _title, torrent_url in zip(torrent_titles, torrent_urls):
|
||||
@@ -34,7 +37,16 @@ class RequestContent(RequestURL):
|
||||
torrents.append(TorrentInfo(_title, torrent_url))
|
||||
return torrents
|
||||
|
||||
def get_xml(self, _url) -> xml.etree.ElementTree.ElementTree:
|
||||
def get_poster(self, _url):
|
||||
content = self.get_html(_url).text
|
||||
soup = BeautifulSoup(content, 'html.parser')
|
||||
div = soup.find('div', {'class': 'bangumi-poster'})
|
||||
style = div.get('style')
|
||||
if style:
|
||||
return style.split('url(\'')[1].split('\')')[0]
|
||||
return None
|
||||
|
||||
def get_xml(self, _url) -> xml.etree.ElementTree.Element:
|
||||
return xml.etree.ElementTree.fromstring(self.get_url(_url).text)
|
||||
|
||||
# API JSON
|
||||
@@ -52,3 +64,4 @@ class RequestContent(RequestURL):
|
||||
|
||||
def get_content(self, _url):
|
||||
return self.get_url(_url).content
|
||||
|
||||
|
||||
@@ -43,7 +43,7 @@ class TitleParser:
|
||||
self,
|
||||
raw: str,
|
||||
settings: Config,
|
||||
_id: int | None = None
|
||||
_id: int = 0
|
||||
) -> BangumiData:
|
||||
language = settings.rss_parser.language
|
||||
try:
|
||||
@@ -74,10 +74,10 @@ class TitleParser:
|
||||
dpi=episode.resolution,
|
||||
source=episode.source,
|
||||
subtitle=episode.sub,
|
||||
added=False,
|
||||
eps_collect=True if episode.episode > 1 else False,
|
||||
offset=0,
|
||||
filter=settings.rss_parser.filter
|
||||
filter=settings.rss_parser.filter,
|
||||
rss=rss_link,
|
||||
)
|
||||
logger.debug(f"RAW:{raw} >> {episode.title_en}")
|
||||
return data
|
||||
|
||||
@@ -3,8 +3,8 @@ import logging
|
||||
|
||||
from module.network import RequestContent
|
||||
from module.parser import TitleParser
|
||||
from module.core import DownloadClient
|
||||
from module.models import BangumiData, Config
|
||||
from module.database import DataOperator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -14,37 +14,24 @@ class RSSAnalyser:
|
||||
self._title_analyser = TitleParser()
|
||||
self.settings = settings
|
||||
|
||||
@staticmethod
|
||||
def find_id(bangumi_info: list[BangumiData]) -> int:
|
||||
_id = 0
|
||||
for info in bangumi_info:
|
||||
if info.id > _id:
|
||||
_id = info.id
|
||||
return _id
|
||||
|
||||
def rss_to_datas(self, bangumi_info: list[BangumiData], rss_link: str) -> list[BangumiData]:
|
||||
def rss_to_datas(self, rss_link: str) -> list[BangumiData]:
|
||||
with RequestContent() as req:
|
||||
rss_torrents = req.get_torrents(rss_link)
|
||||
# Find largest bangumi id
|
||||
_id = self.find_id(bangumi_info)
|
||||
for torrent in rss_torrents:
|
||||
raw_title = torrent.name
|
||||
extra_add = True
|
||||
if bangumi_info is not []:
|
||||
for info in bangumi_info:
|
||||
if re.search(info.title_raw, raw_title) is not None:
|
||||
logger.debug(f"Had added {info.official_title} in auto_download rule before")
|
||||
extra_add = False
|
||||
break
|
||||
if extra_add:
|
||||
_id += 1
|
||||
title_list = [torrent.name for torrent in rss_torrents]
|
||||
data_list = []
|
||||
with DataOperator() as op:
|
||||
add_title_list = op.not_exist_titles(title_list)
|
||||
_id = op.gen_id()
|
||||
for raw_title in add_title_list:
|
||||
data = self._title_analyser.raw_parser(
|
||||
raw=raw_title,
|
||||
_id=_id,
|
||||
settings=self.settings)
|
||||
if data is not None and data.official_title not in bangumi_info:
|
||||
bangumi_info.append(data)
|
||||
return bangumi_info
|
||||
if data is not None and op.match_title(data.official_title) is None:
|
||||
data_list.append(data)
|
||||
_id += 1
|
||||
op.insert_list(data_list)
|
||||
return data_list
|
||||
|
||||
def rss_to_data(self, url, _filter: bool = True) -> BangumiData:
|
||||
with RequestContent() as req:
|
||||
@@ -59,10 +46,9 @@ class RSSAnalyser:
|
||||
except Exception as e:
|
||||
logger.debug(e)
|
||||
|
||||
def run(self, bangumi_info: list[BangumiData], rss_link: str):
|
||||
def run(self, rss_link: str):
|
||||
logger.info("Start collecting RSS info.")
|
||||
try:
|
||||
self.rss_to_datas(bangumi_info, rss_link)
|
||||
return self.rss_to_datas(rss_link)
|
||||
except Exception as e:
|
||||
logger.debug(e)
|
||||
logger.info("Finished")
|
||||
|
||||
Reference in New Issue
Block a user