Files
Auto_Bangumi/backend/src/module/rss/engine.py
Estrella Pan c5f4919e15 feat(rss): add connection status tracking and display
Track RSS feed reachability during refresh cycles. Each feed now stores
connection_status (healthy/error), last_checked_at, and last_error.
The RSS management page shows a green "Connected" tag for healthy feeds
and a red "Error" tag with tooltip for failed feeds.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 21:45:04 +01:00

184 lines
6.6 KiB
Python

import asyncio
import logging
import re
from datetime import datetime, timezone
from typing import Optional
from module.database import Database, engine
from module.downloader import DownloadClient
from module.models import Bangumi, ResponseModel, RSSItem, Torrent
from module.network import RequestContent
logger = logging.getLogger(__name__)
class RSSEngine(Database):
def __init__(self, _engine=engine):
super().__init__(_engine)
self._to_refresh = False
@staticmethod
async def _get_torrents(rss: RSSItem) -> list[Torrent]:
async with RequestContent() as req:
torrents = await req.get_torrents(rss.url)
# Add RSS ID
for torrent in torrents:
torrent.rss_id = rss.id
return torrents
def get_rss_torrents(self, rss_id: int) -> list[Torrent]:
rss = self.rss.search_id(rss_id)
if rss:
return self.torrent.search_rss(rss_id)
else:
return []
async def add_rss(
self,
rss_link: str,
name: str | None = None,
aggregate: bool = True,
parser: str = "mikan",
):
if not name:
async with RequestContent() as req:
name = await req.get_rss_title(rss_link)
if not name:
return ResponseModel(
status=False,
status_code=406,
msg_en="Failed to get RSS title.",
msg_zh="无法获取 RSS 标题。",
)
rss_data = RSSItem(name=name, url=rss_link, aggregate=aggregate, parser=parser)
if self.rss.add(rss_data):
return ResponseModel(
status=True,
status_code=200,
msg_en="RSS added successfully.",
msg_zh="RSS 添加成功。",
)
else:
return ResponseModel(
status=False,
status_code=406,
msg_en="RSS added failed.",
msg_zh="RSS 添加失败。",
)
def disable_list(self, rss_id_list: list[int]):
self.rss.disable_batch(rss_id_list)
return ResponseModel(
status=True,
status_code=200,
msg_en="Disable RSS successfully.",
msg_zh="禁用 RSS 成功。",
)
def enable_list(self, rss_id_list: list[int]):
self.rss.enable_batch(rss_id_list)
return ResponseModel(
status=True,
status_code=200,
msg_en="Enable RSS successfully.",
msg_zh="启用 RSS 成功。",
)
def delete_list(self, rss_id_list: list[int]):
for rss_id in rss_id_list:
self.rss.delete(rss_id)
return ResponseModel(
status=True,
status_code=200,
msg_en="Delete RSS successfully.",
msg_zh="删除 RSS 成功。",
)
async def pull_rss(self, rss_item: RSSItem) -> list[Torrent]:
torrents = await self._get_torrents(rss_item)
new_torrents = self.torrent.check_new(torrents)
return new_torrents
async def _pull_rss_with_status(
self, rss_item: RSSItem
) -> tuple[list[Torrent], Optional[str]]:
try:
torrents = await self.pull_rss(rss_item)
return torrents, None
except Exception as e:
logger.warning(f"[Engine] Failed to fetch RSS {rss_item.name}: {e}")
return [], str(e)
_filter_cache: dict[str, re.Pattern] = {}
def _get_filter_pattern(self, filter_str: str) -> re.Pattern:
if filter_str not in self._filter_cache:
self._filter_cache[filter_str] = re.compile(
filter_str.replace(",", "|"), re.IGNORECASE
)
return self._filter_cache[filter_str]
def match_torrent(self, torrent: Torrent) -> Optional[Bangumi]:
matched: Bangumi = self.bangumi.match_torrent(torrent.name)
if matched:
if matched.filter == "":
return matched
pattern = self._get_filter_pattern(matched.filter)
if not pattern.search(torrent.name):
torrent.bangumi_id = matched.id
return matched
return None
async def refresh_rss(self, client: DownloadClient, rss_id: Optional[int] = None):
# Get All RSS Items
if not rss_id:
rss_items: list[RSSItem] = self.rss.search_active()
else:
rss_item = self.rss.search_id(rss_id)
rss_items = [rss_item] if rss_item else []
# From RSS Items, fetch all torrents concurrently
logger.debug(f"[Engine] Get {len(rss_items)} RSS items")
results = await asyncio.gather(
*[self._pull_rss_with_status(rss_item) for rss_item in rss_items]
)
now = datetime.now(timezone.utc).isoformat()
# Process results sequentially (DB operations)
for rss_item, (new_torrents, error) in zip(rss_items, results):
# Update connection status
rss_item.connection_status = "error" if error else "healthy"
rss_item.last_checked_at = now
rss_item.last_error = error
self.add(rss_item)
for torrent in new_torrents:
matched_data = self.match_torrent(torrent)
if matched_data:
if await client.add_torrent(torrent, matched_data):
logger.debug(f"[Engine] Add torrent {torrent.name} to client")
torrent.downloaded = True
# Add all torrents to database
self.torrent.add_all(new_torrents)
self.commit()
async def download_bangumi(self, bangumi: Bangumi):
async with RequestContent() as req:
torrents = await req.get_torrents(
bangumi.rss_link, bangumi.filter.replace(",", "|")
)
if torrents:
async with DownloadClient() as client:
await client.add_torrent(torrents, bangumi)
self.torrent.add_all(torrents)
return ResponseModel(
status=True,
status_code=200,
msg_en=f"[Engine] Download {bangumi.official_title} successfully.",
msg_zh=f"下载 {bangumi.official_title} 成功。",
)
else:
return ResponseModel(
status=False,
status_code=406,
msg_en=f"[Engine] Download {bangumi.official_title} failed.",
msg_zh=f"[Engine] 下载 {bangumi.official_title} 失败。",
)