diff --git a/backend/src/module/core/program.py b/backend/src/module/core/program.py index 77f07ea0..64b1acf5 100644 --- a/backend/src/module/core/program.py +++ b/backend/src/module/core/program.py @@ -48,7 +48,6 @@ class Program(RenameThread, RSSThread, OffsetScanThread, CalendarRefreshThread): # Prevent duplicate startup due to nested router lifespan events if self._startup_done: return - self._startup_done = True self.__start_info() if not self.database: first_run() @@ -75,9 +74,9 @@ class Program(RenameThread, RSSThread, OffsetScanThread, CalendarRefreshThread): logger.info("[Core] No image cache exists, create image cache.") await cache_image() await self.start() + self._startup_done = True async def start(self): - self.stop_event.clear() settings.load() max_retries = 10 retry_count = 0 @@ -103,6 +102,7 @@ class Program(RenameThread, RSSThread, OffsetScanThread, CalendarRefreshThread): self.scan_start() # Start calendar refresh (every 24 hours) self.calendar_start() + self._tasks_started = True logger.info("Program running.") return ResponseModel( status=True, @@ -113,11 +113,11 @@ class Program(RenameThread, RSSThread, OffsetScanThread, CalendarRefreshThread): async def stop(self): if self.is_running: - self.stop_event.set() await self.rename_stop() await self.rss_stop() await self.scan_stop() await self.calendar_stop() + self._tasks_started = False return ResponseModel( status=True, status_code=200, @@ -133,14 +133,39 @@ class Program(RenameThread, RSSThread, OffsetScanThread, CalendarRefreshThread): ) async def restart(self): - await self.stop() - await self.start() - return ResponseModel( - status=True, - status_code=200, - msg_en="Program restarted.", - msg_zh="程序重启成功。", - ) + stop_ok = True + try: + await self.stop() + except Exception as e: + logger.warning(f"[Core] Error during stop in restart: {e}") + stop_ok = False + start_ok = True + try: + await self.start() + except Exception as e: + logger.error(f"[Core] Error during start in restart: {e}") + start_ok = False + if start_ok and stop_ok: + return ResponseModel( + status=True, + status_code=200, + msg_en="Program restarted.", + msg_zh="程序重启成功。", + ) + elif start_ok: + return ResponseModel( + status=True, + status_code=200, + msg_en="Program restarted (stop had warnings).", + msg_zh="程序重启成功(停止时有警告)。", + ) + else: + return ResponseModel( + status=False, + status_code=500, + msg_en="Program failed to restart.", + msg_zh="程序重启失败。", + ) def update_database(self): need_update, _ = self.version_update diff --git a/backend/src/module/core/status.py b/backend/src/module/core/status.py index c632ca18..442b8eea 100644 --- a/backend/src/module/core/status.py +++ b/backend/src/module/core/status.py @@ -1,8 +1,11 @@ import asyncio +import time from module.checker import Checker from module.conf import LEGACY_DATA_PATH +DOWNLOADER_STATUS_TTL = 60 + class ProgramStatus(Checker): def __init__(self): @@ -10,27 +13,34 @@ class ProgramStatus(Checker): self.stop_event = asyncio.Event() self.lock = asyncio.Lock() self._downloader_status = False + self._downloader_last_check: float = 0 self._torrents_status = False + self._tasks_started = False self.event = asyncio.Event() @property def is_running(self): - if self.stop_event.is_set() or self.check_first_run(): + if not self._tasks_started or self.check_first_run(): return False else: return True @property def is_stopped(self): - return self.stop_event.is_set() + return not self._tasks_started @property def downloader_status(self): return self._downloader_status async def check_downloader_status(self) -> bool: - if not self._downloader_status: + now = time.monotonic() + if ( + not self._downloader_status + or (now - self._downloader_last_check) >= DOWNLOADER_STATUS_TTL + ): self._downloader_status = await self.check_downloader() + self._downloader_last_check = now return self._downloader_status @property diff --git a/backend/src/module/core/sub_thread.py b/backend/src/module/core/sub_thread.py index bc49d4c8..a1d3c9ec 100644 --- a/backend/src/module/core/sub_thread.py +++ b/backend/src/module/core/sub_thread.py @@ -20,34 +20,39 @@ class RSSThread(ProgramStatus): def __init__(self): super().__init__() self._rss_task: asyncio.Task | None = None + self._rss_stop_event = asyncio.Event() self.analyser = RSSAnalyser() async def rss_loop(self): - while not self.stop_event.is_set(): - async with DownloadClient() as client: - with RSSEngine() as engine: - # Analyse RSS - rss_list = engine.rss.search_aggregate() - for rss in rss_list: - await self.analyser.rss_to_data(rss, engine) - # Run RSS Engine - await engine.refresh_rss(client) - if settings.bangumi_manage.eps_complete: - await eps_complete() + while not self._rss_stop_event.is_set(): + try: + async with DownloadClient() as client: + with RSSEngine() as engine: + # Analyse RSS + rss_list = engine.rss.search_aggregate() + for rss in rss_list: + await self.analyser.rss_to_data(rss, engine) + # Run RSS Engine + await engine.refresh_rss(client) + if settings.bangumi_manage.eps_complete: + await eps_complete() + except Exception as e: + logger.error(f"[RSSThread] Error during RSS loop: {e}") try: await asyncio.wait_for( - self.stop_event.wait(), + self._rss_stop_event.wait(), timeout=settings.program.rss_time, ) except asyncio.TimeoutError: pass def rss_start(self): + self._rss_stop_event.clear() self._rss_task = asyncio.create_task(self.rss_loop()) async def rss_stop(self): if self._rss_task and not self._rss_task.done(): - self.stop_event.set() + self._rss_stop_event.set() self._rss_task.cancel() try: await self._rss_task @@ -60,29 +65,34 @@ class RenameThread(ProgramStatus): def __init__(self): super().__init__() self._rename_task: asyncio.Task | None = None + self._rename_stop_event = asyncio.Event() async def rename_loop(self): - while not self.stop_event.is_set(): - async with Renamer() as renamer: - renamed_info = await renamer.rename() - if settings.notification.enable and renamed_info: - manager = NotificationManager() - for info in renamed_info: - await manager.send_all(info) + while not self._rename_stop_event.is_set(): + try: + async with Renamer() as renamer: + renamed_info = await renamer.rename() + if settings.notification.enable and renamed_info: + manager = NotificationManager() + for info in renamed_info: + await manager.send_all(info) + except Exception as e: + logger.error(f"[RenameThread] Error during rename loop: {e}") try: await asyncio.wait_for( - self.stop_event.wait(), + self._rename_stop_event.wait(), timeout=settings.program.rename_time, ) except asyncio.TimeoutError: pass def rename_start(self): + self._rename_stop_event.clear() self._rename_task = asyncio.create_task(self.rename_loop()) async def rename_stop(self): if self._rename_task and not self._rename_task.done(): - self.stop_event.set() + self._rename_stop_event.set() self._rename_task.cancel() try: await self._rename_task @@ -101,34 +111,38 @@ class OffsetScanThread(ProgramStatus): def __init__(self): super().__init__() self._scan_task: asyncio.Task | None = None + self._scan_stop_event = asyncio.Event() self._scanner = OffsetScanner() async def scan_loop(self): # Initial delay to let the system stabilize await asyncio.sleep(60) - while not self.stop_event.is_set(): + while not self._scan_stop_event.is_set(): try: flagged = await self._scanner.scan_all() - logger.info(f"[OffsetScanThread] Scan complete, flagged {flagged} bangumi") + logger.info( + f"[OffsetScanThread] Scan complete, flagged {flagged} bangumi" + ) except Exception as e: logger.error(f"[OffsetScanThread] Error during scan: {e}") try: await asyncio.wait_for( - self.stop_event.wait(), + self._scan_stop_event.wait(), timeout=OFFSET_SCAN_INTERVAL, ) except asyncio.TimeoutError: pass def scan_start(self): + self._scan_stop_event.clear() self._scan_task = asyncio.create_task(self.scan_loop()) logger.info("[OffsetScanThread] Started offset scanner") async def scan_stop(self): if self._scan_task and not self._scan_task.done(): - self.stop_event.set() + self._scan_stop_event.set() self._scan_task.cancel() try: await self._scan_task @@ -144,17 +158,20 @@ class CalendarRefreshThread(ProgramStatus): def __init__(self): super().__init__() self._calendar_task: asyncio.Task | None = None + self._calendar_stop_event = asyncio.Event() async def calendar_loop(self): # Initial delay to let the system stabilize await asyncio.sleep(120) - while not self.stop_event.is_set(): + while not self._calendar_stop_event.is_set(): try: with TorrentManager() as manager: resp = await manager.refresh_calendar() if resp.status: - logger.info("[CalendarRefreshThread] Calendar refresh completed") + logger.info( + "[CalendarRefreshThread] Calendar refresh completed" + ) else: logger.warning( f"[CalendarRefreshThread] Calendar refresh failed: {resp.msg_en}" @@ -164,19 +181,20 @@ class CalendarRefreshThread(ProgramStatus): try: await asyncio.wait_for( - self.stop_event.wait(), + self._calendar_stop_event.wait(), timeout=CALENDAR_REFRESH_INTERVAL, ) except asyncio.TimeoutError: pass def calendar_start(self): + self._calendar_stop_event.clear() self._calendar_task = asyncio.create_task(self.calendar_loop()) logger.info("[CalendarRefreshThread] Started calendar refresh (every 24h)") async def calendar_stop(self): if self._calendar_task and not self._calendar_task.done(): - self.stop_event.set() + self._calendar_stop_event.set() self._calendar_task.cancel() try: await self._calendar_task