mirror of
https://github.com/EstrellaXD/Auto_Bangumi.git
synced 2026-04-14 02:20:53 +08:00
feat: add rss engine
This commit is contained in:
@@ -1 +1,3 @@
|
||||
from .bangumi import BangumiDatabase
|
||||
from .rss import RSSDatabase
|
||||
from .torrent import TorrentDatabase
|
||||
@@ -1,17 +1,24 @@
|
||||
import logging
|
||||
|
||||
from .connector import DataConnector
|
||||
from .orm import Connector
|
||||
from module.models import TorrentData
|
||||
from module.conf import DATA_PATH
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TorrentDatabase(DataConnector):
|
||||
def update_table(self):
|
||||
table_name = "torrent"
|
||||
db_data = self.__data_to_db()
|
||||
self._update_table(table_name, db_data)
|
||||
class TorrentDatabase(Connector):
|
||||
def __init__(self, database: str = DATA_PATH):
|
||||
super().__init__(
|
||||
table_name="torrent",
|
||||
data=TorrentData().dict(),
|
||||
database=DATA_PATH
|
||||
)
|
||||
|
||||
def __data_to_db(self, data: SaveTorrent):
|
||||
def update_table(self):
|
||||
self.update.table()
|
||||
|
||||
def __data_to_db(self, data: TorrentData) -> dict:
|
||||
db_data = data.dict()
|
||||
for key, value in db_data.items():
|
||||
if isinstance(value, bool):
|
||||
@@ -20,28 +27,18 @@ class TorrentDatabase(DataConnector):
|
||||
db_data[key] = ",".join(value)
|
||||
return db_data
|
||||
|
||||
def __db_to_data(self, db_data: dict):
|
||||
def __db_to_data(self, db_data: dict) -> TorrentData:
|
||||
for key, item in db_data.items():
|
||||
if isinstance(item, int):
|
||||
if key not in ["id", "offset", "season", "year"]:
|
||||
db_data[key] = bool(item)
|
||||
db_data[key] = bool(item)
|
||||
elif key in ["filter", "rss_link"]:
|
||||
db_data[key] = item.split(",")
|
||||
return SaveTorrent(**db_data)
|
||||
return TorrentData(**db_data)
|
||||
|
||||
def if_downloaded(self, torrent_url: str, torrent_name: str) -> bool:
|
||||
self._cursor.execute(
|
||||
"SELECT * FROM torrent WHERE torrent_url = ? OR torrent_name = ?",
|
||||
(torrent_url, torrent_name),
|
||||
)
|
||||
return bool(self._cursor.fetchone())
|
||||
def insert_many(self, data_list: list[TorrentData]):
|
||||
dict_datas = [self.__data_to_db(data) for data in data_list]
|
||||
self.insert.many(dict_datas)
|
||||
|
||||
def insert(self, data: SaveTorrent):
|
||||
db_data = self.__data_to_db(data)
|
||||
columns = ", ".join(db_data.keys())
|
||||
values = ", ".join([f":{key}" for key in db_data.keys()])
|
||||
self._cursor.execute(
|
||||
f"INSERT INTO torrent ({columns}) VALUES ({values})", db_data
|
||||
)
|
||||
logger.debug(f"Add {data.torrent_name} into database.")
|
||||
self._conn.commit()
|
||||
def get_all(self) -> list[TorrentData]:
|
||||
dict_datas = self.select.all()
|
||||
return [self.__db_to_data(data) for data in dict_datas]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from .bangumi import *
|
||||
from .bangumi import BangumiData
|
||||
from .config import Config
|
||||
from .rss import RSSTorrents
|
||||
from .rss import RSSItem, TorrentData
|
||||
from .torrent import EpisodeFile, SubtitleFile, TorrentBase
|
||||
from .user import UserLogin
|
||||
|
||||
@@ -1,9 +1,18 @@
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class RSSTorrents(BaseModel):
|
||||
name: str = Field(..., alias="item_path")
|
||||
class RSSItem(BaseModel):
|
||||
id: int = Field(0, alias="id", title="id")
|
||||
item_path: str = Field("example path", alias="item_path")
|
||||
url: str = Field("https://mikanani.me", alias="url")
|
||||
combine: bool = Field(True, alias="combine")
|
||||
enabled: bool = Field(True, alias="enabled")
|
||||
|
||||
|
||||
class TorrentData(BaseModel):
|
||||
id: int = Field(0, alias="id")
|
||||
name: str = Field(..., alias="name")
|
||||
url: str = Field(..., alias="url")
|
||||
analyze: bool = Field(..., alias="analyze")
|
||||
enabled: bool = Field(..., alias="enabled")
|
||||
torrents: list[str] = Field(..., alias="torrents")
|
||||
matched: bool = Field(..., alias="matched")
|
||||
downloaded: bool = Field(..., alias="downloaded")
|
||||
save_path: str = Field(..., alias="save_path")
|
||||
|
||||
@@ -95,3 +95,12 @@ class RequestContent(RequestURL):
|
||||
|
||||
def check_connection(self, _url):
|
||||
return self.check_url(_url)
|
||||
|
||||
def get_rss_title(self, _url):
|
||||
soup = self.get_xml(_url)
|
||||
return soup.find("title").text
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
with RequestContent() as req:
|
||||
req.get_xml("https://mikanani.me/RSS/Classic")
|
||||
|
||||
@@ -7,3 +7,7 @@ def mikan_parser(soup):
|
||||
torrent_urls.append(item.find("enclosure").attrib["url"])
|
||||
torrent_homepage.append(item.find("link").text)
|
||||
return torrent_titles, torrent_urls, torrent_homepage
|
||||
|
||||
|
||||
def mikan_title(soup):
|
||||
return soup.find("title").text
|
||||
|
||||
75
backend/src/module/rss/engine.py
Normal file
75
backend/src/module/rss/engine.py
Normal file
@@ -0,0 +1,75 @@
|
||||
import re
|
||||
|
||||
from module.database import RSSDatabase, BangumiDatabase, TorrentDatabase
|
||||
from module.models import BangumiData, RSSItem, TorrentData
|
||||
from module.network import RequestContent, TorrentInfo
|
||||
|
||||
|
||||
class RSSEngine(RequestContent):
|
||||
|
||||
@staticmethod
|
||||
def _get_rss_items() -> list[RSSItem]:
|
||||
with RSSDatabase() as db:
|
||||
return db.get_all()
|
||||
|
||||
@staticmethod
|
||||
def _get_bangumi_data(rss_link: str) -> list[BangumiData]:
|
||||
with BangumiDatabase() as db:
|
||||
return db.get_rss_data(rss_link)
|
||||
|
||||
def add_rss(self, rss_link: str, name: str, combine: bool):
|
||||
if not name:
|
||||
name = self.get_rss_title(rss_link)
|
||||
insert_data = RSSItem(item_path=name, url=rss_link, combine=combine)
|
||||
with RSSDatabase() as db:
|
||||
db.insert_one(insert_data)
|
||||
|
||||
def pull_rss(self, rss_item: RSSItem) -> list[TorrentInfo]:
|
||||
torrents = self.get_torrents(rss_item.url)
|
||||
return torrents
|
||||
|
||||
@staticmethod
|
||||
def match_torrent(torrent: TorrentInfo) -> TorrentData | None:
|
||||
with BangumiDatabase() as db:
|
||||
bangumi_data = db.match_torrent(torrent.name)
|
||||
if bangumi_data:
|
||||
_filter = "|".join(bangumi_data.filter)
|
||||
if re.search(_filter, torrent.name):
|
||||
return None
|
||||
else:
|
||||
return TorrentData(
|
||||
name=torrent.name,
|
||||
url=torrent.torrent_link,
|
||||
)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def filter_torrent(torrents: list[TorrentInfo]) -> list[TorrentInfo]:
|
||||
with TorrentDatabase() as db:
|
||||
in_db_torrents = db.get_all()
|
||||
in_db_torrents = [x.name for x in in_db_torrents]
|
||||
i = 0
|
||||
while i < len(torrents):
|
||||
torrent = torrents[i]
|
||||
if torrent.name in in_db_torrents:
|
||||
torrents.pop(i)
|
||||
i += 1
|
||||
return torrents
|
||||
|
||||
def run(self):
|
||||
# Get All RSS Items
|
||||
rss_items: list[RSSItem] = self._get_rss_items()
|
||||
# From RSS Items, get all torrents
|
||||
for rss_item in rss_items:
|
||||
torrents = self.get_torrents(rss_item.url)
|
||||
self.filter_torrent(torrents)
|
||||
# Get all enabled bangumi data
|
||||
matched_torrents = []
|
||||
for torrent in torrents:
|
||||
matched_torrent = self.match_torrent(torrent)
|
||||
if matched_torrent:
|
||||
matched_torrents.append(matched_torrent)
|
||||
# Add to database
|
||||
with TorrentDatabase() as db:
|
||||
db.insert_many(matched_torrents)
|
||||
return matched_torrents
|
||||
@@ -1,26 +0,0 @@
|
||||
import re
|
||||
|
||||
from module.database import RSSDatabase
|
||||
from module.models import BangumiData, RSSTorrents
|
||||
from module.network import RequestContent, TorrentInfo
|
||||
|
||||
|
||||
class RSSPoller(RSSDatabase):
|
||||
@staticmethod
|
||||
def polling(rss_link, req: RequestContent) -> list[TorrentInfo]:
|
||||
return req.get_torrents(rss_link)
|
||||
|
||||
@staticmethod
|
||||
def filter_torrent(data: BangumiData, torrent: TorrentInfo) -> bool:
|
||||
if data.title_raw in torrent.name:
|
||||
_filter = "|".join(data.filter)
|
||||
if not re.search(_filter, torrent.name):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def foo(self):
|
||||
rss_datas: list[RSSTorrents] = self.get_rss_data()
|
||||
with RequestContent() as req:
|
||||
for rss_data in rss_datas:
|
||||
self.polling(rss_data.url, req)
|
||||
Reference in New Issue
Block a user