diff --git a/backend/src/module/database/bangumi.py b/backend/src/module/database/bangumi.py index cf7c9999..e25cc4b6 100644 --- a/backend/src/module/database/bangumi.py +++ b/backend/src/module/database/bangumi.py @@ -365,6 +365,14 @@ class BangumiDatabase: logger.debug(f"[Database] Find bangumi id: {_id}.") return bangumi + def search_ids(self, ids: list[int]) -> list[Bangumi]: + """Batch lookup multiple bangumi by their IDs.""" + if not ids: + return [] + statement = select(Bangumi).where(Bangumi.id.in_(ids)) + result = self.session.execute(statement) + return list(result.scalars().all()) + def match_poster(self, bangumi_name: str) -> str: statement = select(Bangumi).where( func.instr(bangumi_name, Bangumi.official_title) > 0 diff --git a/backend/src/module/database/torrent.py b/backend/src/module/database/torrent.py index 7ec6665e..9225e3fc 100644 --- a/backend/src/module/database/torrent.py +++ b/backend/src/module/database/torrent.py @@ -36,9 +36,7 @@ class TorrentDatabase: logger.debug(f"Update {data.name} in database.") def search(self, _id: int) -> Torrent | None: - result = self.session.execute( - select(Torrent).where(Torrent.id == _id) - ) + result = self.session.execute(select(Torrent).where(Torrent.id == _id)) return result.scalar_one_or_none() def search_all(self) -> list[Torrent]: @@ -46,9 +44,7 @@ class TorrentDatabase: return list(result.scalars().all()) def search_rss(self, rss_id: int) -> list[Torrent]: - result = self.session.execute( - select(Torrent).where(Torrent.rss_id == rss_id) - ) + result = self.session.execute(select(Torrent).where(Torrent.rss_id == rss_id)) return list(result.scalars().all()) def check_new(self, torrents_list: list[Torrent]) -> list[Torrent]: @@ -62,16 +58,21 @@ class TorrentDatabase: def search_by_qb_hash(self, qb_hash: str) -> Torrent | None: """Find torrent by qBittorrent hash.""" - result = self.session.execute( - select(Torrent).where(Torrent.qb_hash == qb_hash) - ) + result = self.session.execute(select(Torrent).where(Torrent.qb_hash == qb_hash)) return result.scalar_one_or_none() + def search_by_qb_hashes(self, qb_hashes: list[str]) -> list[Torrent]: + """Find torrents by multiple qBittorrent hashes (batch query).""" + if not qb_hashes: + return [] + result = self.session.execute( + select(Torrent).where(Torrent.qb_hash.in_(qb_hashes)) + ) + return list(result.scalars().all()) + def search_by_url(self, url: str) -> Torrent | None: """Find torrent by URL.""" - result = self.session.execute( - select(Torrent).where(Torrent.url == url) - ) + result = self.session.execute(select(Torrent).where(Torrent.url == url)) return result.scalar_one_or_none() def update_qb_hash(self, torrent_id: int, qb_hash: str) -> bool: diff --git a/backend/src/module/downloader/client/qb_downloader.py b/backend/src/module/downloader/client/qb_downloader.py index 099132e1..7825ed9d 100644 --- a/backend/src/module/downloader/client/qb_downloader.py +++ b/backend/src/module/downloader/client/qb_downloader.py @@ -193,7 +193,9 @@ class QbDownloader: data={"hashes": hashes}, ) - async def torrents_rename_file(self, torrent_hash, old_path, new_path) -> bool: + async def torrents_rename_file( + self, torrent_hash, old_path, new_path, verify: bool = True + ) -> bool: try: resp = await self._client.post( self._url("torrents/renameFile"), @@ -205,20 +207,31 @@ class QbDownloader: if resp.status_code != 200: return False + if not verify: + return True + # Verify the rename actually happened by checking file list # qBittorrent can return 200 but delay the actual rename (e.g., while seeding) - await asyncio.sleep(0.5) # Brief delay to allow qBittorrent to process - files = await self.torrents_files(torrent_hash) - for f in files: - if f.get("name") == new_path: - return True - if f.get("name") == old_path: - # File still has old name - rename didn't actually happen - logger.debug( - f"[Downloader] Rename API returned 200 but file unchanged: {old_path}" - ) - return False - return True # new_path found or old_path not found + # Use exponential backoff: 0.1s, 0.2s, 0.4s (max 3 attempts) + for attempt in range(3): + delay = 0.1 * (2**attempt) + await asyncio.sleep(delay) + files = await self.torrents_files(torrent_hash) + for f in files: + if f.get("name") == new_path: + return True + if f.get("name") == old_path: + # File still has old name - try again + if attempt < 2: + continue + # Final attempt failed + logger.debug( + f"[Downloader] Rename API returned 200 but file unchanged: {old_path}" + ) + return False + # new_path found or old_path not found + return True + return True except (httpx.ConnectError, httpx.RequestError, httpx.TimeoutException) as e: logger.warning(f"[Downloader] Failed to rename file {old_path}: {e}") return False diff --git a/backend/src/module/downloader/download_client.py b/backend/src/module/downloader/download_client.py index ef00ab3a..a11aaab4 100644 --- a/backend/src/module/downloader/download_client.py +++ b/backend/src/module/downloader/download_client.py @@ -120,9 +120,11 @@ class DownloadClient(TorrentPath): async def get_torrent_files(self, torrent_hash: str): return await self.client.torrents_files(torrent_hash=torrent_hash) - async def rename_torrent_file(self, _hash, old_path, new_path) -> bool: + async def rename_torrent_file( + self, _hash, old_path, new_path, verify: bool = True + ) -> bool: result = await self.client.torrents_rename_file( - torrent_hash=_hash, old_path=old_path, new_path=new_path + torrent_hash=_hash, old_path=old_path, new_path=new_path, verify=verify ) if result: logger.info(f"{old_path} >> {new_path}") diff --git a/backend/src/module/manager/renamer.py b/backend/src/module/manager/renamer.py index cc510ca2..47496411 100644 --- a/backend/src/module/manager/renamer.py +++ b/backend/src/module/manager/renamer.py @@ -16,6 +16,8 @@ logger = logging.getLogger(__name__) # This prevents spamming the same rename when qBittorrent returns 200 but doesn't actually rename _pending_renames: dict[tuple[str, str, str], float] = {} _PENDING_RENAME_COOLDOWN = 300 # 5 minutes cooldown before retrying same rename +_CLEANUP_INTERVAL = 60 # Clean up pending cache at most once per minute +_last_cleanup_time: float = 0 class Renamer(DownloadClient): @@ -23,6 +25,23 @@ class Renamer(DownloadClient): super().__init__() self._parser = TitleParser() self.check_pool = {} + self._offset_cache: dict[str, tuple[int, int]] = {} + + @staticmethod + def _cleanup_pending_cache(): + """Clean up expired entries from pending renames cache (throttled).""" + global _last_cleanup_time + current_time = time.time() + if current_time - _last_cleanup_time < _CLEANUP_INTERVAL: + return + _last_cleanup_time = current_time + expired_keys = [ + k + for k, v in _pending_renames.items() + if current_time - v > _PENDING_RENAME_COOLDOWN * 2 + ] + for k in expired_keys: + _pending_renames.pop(k, None) @staticmethod def print_result(torrent_count, rename_count): @@ -111,7 +130,10 @@ class Renamer(DownloadClient): # (qBittorrent can return 200 but delay actual rename while seeding) pending_key = (_hash, media_path, new_path) last_attempt = _pending_renames.get(pending_key) - if last_attempt and (time.time() - last_attempt) < _PENDING_RENAME_COOLDOWN: + if ( + last_attempt + and (time.time() - last_attempt) < _PENDING_RENAME_COOLDOWN + ): logger.debug( f"[Renamer] Skipping rename (pending cooldown): {media_path}" ) @@ -137,14 +159,8 @@ class Renamer(DownloadClient): # Rename API returned success but file wasn't actually renamed # Add to pending cache to avoid spamming _pending_renames[pending_key] = time.time() - # Clean up old entries from cache - current_time = time.time() - expired_keys = [ - k for k, v in _pending_renames.items() - if current_time - v > _PENDING_RENAME_COOLDOWN * 2 - ] - for k in expired_keys: - _pending_renames.pop(k, None) + # Periodic cleanup of expired entries (at most once per minute) + self._cleanup_pending_cache() else: logger.warning(f"[Renamer] {media_path} parse failed") if settings.bangumi_manage.remove_bad_torrent: @@ -216,8 +232,12 @@ class Renamer(DownloadClient): season_offset=season_offset, ) if subtitle_path != new_path: + # Skip verification for subtitles to reduce latency renamed = await self.rename_torrent_file( - _hash=_hash, old_path=subtitle_path, new_path=new_path + _hash=_hash, + old_path=subtitle_path, + new_path=new_path, + verify=False, ) if not renamed: logger.warning(f"[Renamer] {subtitle_path} rename failed") @@ -249,6 +269,99 @@ class Renamer(DownloadClient): # Remove trailing slashes return normalized.rstrip("/") + def _batch_lookup_offsets( + self, torrents_info: list[dict] + ) -> dict[str, tuple[int, int]]: + """Batch lookup offsets for all torrents in a single database session. + + Returns a dict mapping torrent_hash to (episode_offset, season_offset). + """ + result: dict[str, tuple[int, int]] = {} + if not torrents_info: + return result + + try: + with Database() as db: + # Collect all hashes for batch query + hashes = [info["hash"] for info in torrents_info] + torrent_records = db.torrent.search_by_qb_hashes(hashes) + hash_to_bangumi_id = { + r.qb_hash: r.bangumi_id for r in torrent_records if r.bangumi_id + } + + # Collect unique bangumi IDs to fetch + bangumi_ids_to_fetch = set(hash_to_bangumi_id.values()) + + # Also collect bangumi IDs from tags + tag_bangumi_ids = {} + for info in torrents_info: + tags = info.get("tags", "") + bangumi_id = self._parse_bangumi_id_from_tags(tags) + if bangumi_id: + tag_bangumi_ids[info["hash"]] = bangumi_id + bangumi_ids_to_fetch.add(bangumi_id) + + # Batch fetch all bangumi records + bangumi_map = {} + if bangumi_ids_to_fetch: + bangumi_records = db.bangumi.search_ids(list(bangumi_ids_to_fetch)) + bangumi_map = { + b.id: b for b in bangumi_records if b and not b.deleted + } + + # Now resolve offsets for each torrent + for info in torrents_info: + torrent_hash = info["hash"] + torrent_name = info["name"] + save_path = info["save_path"] + + # 1. Try by qb_hash + bangumi_id = hash_to_bangumi_id.get(torrent_hash) + if bangumi_id and bangumi_id in bangumi_map: + b = bangumi_map[bangumi_id] + result[torrent_hash] = (b.episode_offset, b.season_offset) + continue + + # 2. Try by tag + bangumi_id = tag_bangumi_ids.get(torrent_hash) + if bangumi_id and bangumi_id in bangumi_map: + b = bangumi_map[bangumi_id] + result[torrent_hash] = (b.episode_offset, b.season_offset) + continue + + # 3. Try by torrent name (individual query, but less common path) + bangumi = db.bangumi.match_torrent(torrent_name) + if bangumi: + result[torrent_hash] = ( + bangumi.episode_offset, + bangumi.season_offset, + ) + continue + + # 4. Try by save_path (individual query, fallback) + normalized_save_path = self._normalize_path(save_path) + bangumi = db.bangumi.match_by_save_path(save_path) + if not bangumi: + bangumi = db.bangumi.match_by_save_path(normalized_save_path) + if bangumi: + result[torrent_hash] = ( + bangumi.episode_offset, + bangumi.season_offset, + ) + continue + + # Default: no offset + result[torrent_hash] = (0, 0) + + except Exception as e: + logger.debug(f"[Renamer] Batch offset lookup failed: {e}") + # Fall back to individual lookups on error + for info in torrents_info: + if info["hash"] not in result: + result[info["hash"]] = (0, 0) + + return result + def _lookup_offsets( self, torrent_hash: str, torrent_name: str, save_path: str, tags: str = "" ) -> tuple[int, int]: @@ -331,17 +444,16 @@ class Renamer(DownloadClient): all_files = await asyncio.gather( *[self.get_torrent_files(info["hash"]) for info in torrents_info] ) + # Batch lookup all offsets in a single database session + offset_map = self._batch_lookup_offsets(torrents_info) for info, files in zip(torrents_info, all_files): torrent_hash = info["hash"] torrent_name = info["name"] save_path = info["save_path"] - tags = info.get("tags", "") media_list, subtitle_list = self.check_files(files) bangumi_name, season = self._path_to_bangumi(save_path) - # Look up offsets from database (use hash/tags/bangumi_id for accurate matching) - episode_offset, season_offset = self._lookup_offsets( - torrent_hash, torrent_name, save_path, tags - ) + # Use pre-fetched offsets + episode_offset, season_offset = offset_map.get(torrent_hash, (0, 0)) kwargs = { "torrent_name": torrent_name, "bangumi_name": bangumi_name,