fix(core): improve program lifecycle and background task management

- Use per-task stop events instead of shared stop_event to prevent
  stopping one task from killing all others
- Track running state via _tasks_started flag instead of stop_event
- Add error handling in RSS, rename, scan, and calendar loops
- Make restart() resilient to stop failures (catch and continue)
- Cache downloader status check with 60s TTL
- Fix _startup_done set before start() completes (race condition)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Estrella Pan
2026-02-23 11:46:22 +01:00
parent c7c709fa66
commit ec4aca5aba
3 changed files with 97 additions and 44 deletions

View File

@@ -48,7 +48,6 @@ class Program(RenameThread, RSSThread, OffsetScanThread, CalendarRefreshThread):
# Prevent duplicate startup due to nested router lifespan events
if self._startup_done:
return
self._startup_done = True
self.__start_info()
if not self.database:
first_run()
@@ -75,9 +74,9 @@ class Program(RenameThread, RSSThread, OffsetScanThread, CalendarRefreshThread):
logger.info("[Core] No image cache exists, create image cache.")
await cache_image()
await self.start()
self._startup_done = True
async def start(self):
self.stop_event.clear()
settings.load()
max_retries = 10
retry_count = 0
@@ -103,6 +102,7 @@ class Program(RenameThread, RSSThread, OffsetScanThread, CalendarRefreshThread):
self.scan_start()
# Start calendar refresh (every 24 hours)
self.calendar_start()
self._tasks_started = True
logger.info("Program running.")
return ResponseModel(
status=True,
@@ -113,11 +113,11 @@ class Program(RenameThread, RSSThread, OffsetScanThread, CalendarRefreshThread):
async def stop(self):
if self.is_running:
self.stop_event.set()
await self.rename_stop()
await self.rss_stop()
await self.scan_stop()
await self.calendar_stop()
self._tasks_started = False
return ResponseModel(
status=True,
status_code=200,
@@ -133,14 +133,39 @@ class Program(RenameThread, RSSThread, OffsetScanThread, CalendarRefreshThread):
)
async def restart(self):
await self.stop()
await self.start()
return ResponseModel(
status=True,
status_code=200,
msg_en="Program restarted.",
msg_zh="程序重启成功。",
)
stop_ok = True
try:
await self.stop()
except Exception as e:
logger.warning(f"[Core] Error during stop in restart: {e}")
stop_ok = False
start_ok = True
try:
await self.start()
except Exception as e:
logger.error(f"[Core] Error during start in restart: {e}")
start_ok = False
if start_ok and stop_ok:
return ResponseModel(
status=True,
status_code=200,
msg_en="Program restarted.",
msg_zh="程序重启成功。",
)
elif start_ok:
return ResponseModel(
status=True,
status_code=200,
msg_en="Program restarted (stop had warnings).",
msg_zh="程序重启成功(停止时有警告)。",
)
else:
return ResponseModel(
status=False,
status_code=500,
msg_en="Program failed to restart.",
msg_zh="程序重启失败。",
)
def update_database(self):
need_update, _ = self.version_update

View File

@@ -1,8 +1,11 @@
import asyncio
import time
from module.checker import Checker
from module.conf import LEGACY_DATA_PATH
DOWNLOADER_STATUS_TTL = 60
class ProgramStatus(Checker):
def __init__(self):
@@ -10,27 +13,34 @@ class ProgramStatus(Checker):
self.stop_event = asyncio.Event()
self.lock = asyncio.Lock()
self._downloader_status = False
self._downloader_last_check: float = 0
self._torrents_status = False
self._tasks_started = False
self.event = asyncio.Event()
@property
def is_running(self):
if self.stop_event.is_set() or self.check_first_run():
if not self._tasks_started or self.check_first_run():
return False
else:
return True
@property
def is_stopped(self):
return self.stop_event.is_set()
return not self._tasks_started
@property
def downloader_status(self):
return self._downloader_status
async def check_downloader_status(self) -> bool:
if not self._downloader_status:
now = time.monotonic()
if (
not self._downloader_status
or (now - self._downloader_last_check) >= DOWNLOADER_STATUS_TTL
):
self._downloader_status = await self.check_downloader()
self._downloader_last_check = now
return self._downloader_status
@property

View File

@@ -20,34 +20,39 @@ class RSSThread(ProgramStatus):
def __init__(self):
super().__init__()
self._rss_task: asyncio.Task | None = None
self._rss_stop_event = asyncio.Event()
self.analyser = RSSAnalyser()
async def rss_loop(self):
while not self.stop_event.is_set():
async with DownloadClient() as client:
with RSSEngine() as engine:
# Analyse RSS
rss_list = engine.rss.search_aggregate()
for rss in rss_list:
await self.analyser.rss_to_data(rss, engine)
# Run RSS Engine
await engine.refresh_rss(client)
if settings.bangumi_manage.eps_complete:
await eps_complete()
while not self._rss_stop_event.is_set():
try:
async with DownloadClient() as client:
with RSSEngine() as engine:
# Analyse RSS
rss_list = engine.rss.search_aggregate()
for rss in rss_list:
await self.analyser.rss_to_data(rss, engine)
# Run RSS Engine
await engine.refresh_rss(client)
if settings.bangumi_manage.eps_complete:
await eps_complete()
except Exception as e:
logger.error(f"[RSSThread] Error during RSS loop: {e}")
try:
await asyncio.wait_for(
self.stop_event.wait(),
self._rss_stop_event.wait(),
timeout=settings.program.rss_time,
)
except asyncio.TimeoutError:
pass
def rss_start(self):
self._rss_stop_event.clear()
self._rss_task = asyncio.create_task(self.rss_loop())
async def rss_stop(self):
if self._rss_task and not self._rss_task.done():
self.stop_event.set()
self._rss_stop_event.set()
self._rss_task.cancel()
try:
await self._rss_task
@@ -60,29 +65,34 @@ class RenameThread(ProgramStatus):
def __init__(self):
super().__init__()
self._rename_task: asyncio.Task | None = None
self._rename_stop_event = asyncio.Event()
async def rename_loop(self):
while not self.stop_event.is_set():
async with Renamer() as renamer:
renamed_info = await renamer.rename()
if settings.notification.enable and renamed_info:
manager = NotificationManager()
for info in renamed_info:
await manager.send_all(info)
while not self._rename_stop_event.is_set():
try:
async with Renamer() as renamer:
renamed_info = await renamer.rename()
if settings.notification.enable and renamed_info:
manager = NotificationManager()
for info in renamed_info:
await manager.send_all(info)
except Exception as e:
logger.error(f"[RenameThread] Error during rename loop: {e}")
try:
await asyncio.wait_for(
self.stop_event.wait(),
self._rename_stop_event.wait(),
timeout=settings.program.rename_time,
)
except asyncio.TimeoutError:
pass
def rename_start(self):
self._rename_stop_event.clear()
self._rename_task = asyncio.create_task(self.rename_loop())
async def rename_stop(self):
if self._rename_task and not self._rename_task.done():
self.stop_event.set()
self._rename_stop_event.set()
self._rename_task.cancel()
try:
await self._rename_task
@@ -101,34 +111,38 @@ class OffsetScanThread(ProgramStatus):
def __init__(self):
super().__init__()
self._scan_task: asyncio.Task | None = None
self._scan_stop_event = asyncio.Event()
self._scanner = OffsetScanner()
async def scan_loop(self):
# Initial delay to let the system stabilize
await asyncio.sleep(60)
while not self.stop_event.is_set():
while not self._scan_stop_event.is_set():
try:
flagged = await self._scanner.scan_all()
logger.info(f"[OffsetScanThread] Scan complete, flagged {flagged} bangumi")
logger.info(
f"[OffsetScanThread] Scan complete, flagged {flagged} bangumi"
)
except Exception as e:
logger.error(f"[OffsetScanThread] Error during scan: {e}")
try:
await asyncio.wait_for(
self.stop_event.wait(),
self._scan_stop_event.wait(),
timeout=OFFSET_SCAN_INTERVAL,
)
except asyncio.TimeoutError:
pass
def scan_start(self):
self._scan_stop_event.clear()
self._scan_task = asyncio.create_task(self.scan_loop())
logger.info("[OffsetScanThread] Started offset scanner")
async def scan_stop(self):
if self._scan_task and not self._scan_task.done():
self.stop_event.set()
self._scan_stop_event.set()
self._scan_task.cancel()
try:
await self._scan_task
@@ -144,17 +158,20 @@ class CalendarRefreshThread(ProgramStatus):
def __init__(self):
super().__init__()
self._calendar_task: asyncio.Task | None = None
self._calendar_stop_event = asyncio.Event()
async def calendar_loop(self):
# Initial delay to let the system stabilize
await asyncio.sleep(120)
while not self.stop_event.is_set():
while not self._calendar_stop_event.is_set():
try:
with TorrentManager() as manager:
resp = await manager.refresh_calendar()
if resp.status:
logger.info("[CalendarRefreshThread] Calendar refresh completed")
logger.info(
"[CalendarRefreshThread] Calendar refresh completed"
)
else:
logger.warning(
f"[CalendarRefreshThread] Calendar refresh failed: {resp.msg_en}"
@@ -164,19 +181,20 @@ class CalendarRefreshThread(ProgramStatus):
try:
await asyncio.wait_for(
self.stop_event.wait(),
self._calendar_stop_event.wait(),
timeout=CALENDAR_REFRESH_INTERVAL,
)
except asyncio.TimeoutError:
pass
def calendar_start(self):
self._calendar_stop_event.clear()
self._calendar_task = asyncio.create_task(self.calendar_loop())
logger.info("[CalendarRefreshThread] Started calendar refresh (every 24h)")
async def calendar_stop(self):
if self._calendar_task and not self._calendar_task.done():
self.stop_event.set()
self._calendar_stop_event.set()
self._calendar_task.cancel()
try:
await self._calendar_task