From 76c84f9bac9ab88495db3e5e4ed17994a12d16c7 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Wed, 6 Nov 2024 19:37:22 +0800 Subject: [PATCH] fix(scheduler): optimize job registration and removal logic --- app/scheduler.py | 66 +++++++++++++++++++++++++----------------------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/app/scheduler.py b/app/scheduler.py index 1e71cc44..d851a0f0 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -1,4 +1,3 @@ -import logging import threading import traceback from datetime import datetime, timedelta @@ -27,12 +26,6 @@ from app.schemas.types import EventType from app.utils.singleton import Singleton from app.utils.timer import TimerUtils -# 获取 apscheduler 的日志记录器 -scheduler_logger = logging.getLogger('apscheduler') - -# 设置日志级别为 WARNING -scheduler_logger.setLevel(logging.WARNING) - class SchedulerChain(ChainBase): pass @@ -436,23 +429,23 @@ class Scheduler(metaclass=Singleton): try: sid = f"{service['id']}" job_id = sid.split("|")[0] - if job_id not in self._jobs: - self._jobs[job_id] = { - "func": service["func"], - "name": service["name"], - "pid": pid, - "plugin_name": plugin_name, - "running": False, - } - self._scheduler.add_job( - self.start, - service["trigger"], - id=sid, - name=service["name"], - **service["kwargs"], - kwargs={"job_id": job_id} - ) - logger.info(f"注册插件{plugin_name}服务:{service['name']} - {service['trigger']}") + self._jobs[job_id] = { + "func": service["func"], + "name": service["name"], + "pid": pid, + "plugin_name": plugin_name, + "running": False, + } + self._scheduler.add_job( + self.start, + service["trigger"], + id=sid, + name=service["name"], + **service["kwargs"], + kwargs={"job_id": job_id}, + replace_existing=True + ) + logger.info(f"注册插件{plugin_name}服务:{service['name']} - {service['trigger']}") except Exception as e: logger.error(f"注册插件{plugin_name}服务失败:{str(e)} - {service}") SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务注册失败", @@ -468,14 +461,25 @@ class Scheduler(metaclass=Singleton): with self._lock: # 获取插件名称 plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name") - for job_id, service in self._jobs.copy().items(): + # 先从 _jobs 中查找匹配的服务 + jobs_to_remove = [(job_id, service) for job_id, service in self._jobs.items() if service.get("pid") == pid] + if not jobs_to_remove: + return + for job_id, service in jobs_to_remove: try: - if service.get("pid") == pid: - self._jobs.pop(job_id, None) - try: - self._scheduler.remove_job(job_id) - except JobLookupError: - pass + # 移除服务 + self._jobs.pop(job_id, None) + # 在调度器中查找并移除对应的 job + job_removed = False + for job in list(self._scheduler.get_jobs()): + job_id_from_service = job.id.split("|")[0] + if job_id == job_id_from_service: + try: + self._scheduler.remove_job(job.id) + job_removed = True + except JobLookupError: + pass + if job_removed: logger.info(f"移除插件服务({plugin_name}):{service.get('name')}") except Exception as e: logger.error(f"移除插件服务失败:{str(e)} - {job_id}: {service}")