diff --git a/app/scheduler.py b/app/scheduler.py index 73e11c95..3e979b39 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -41,7 +41,7 @@ class Scheduler(metaclass=Singleton): # 退出事件 _event = threading.Event() # 锁 - _lock = threading.Lock() + _lock = threading.RLock() # 各服务的运行状态 _jobs = {} # 用户认证失败次数 @@ -434,6 +434,7 @@ class Scheduler(metaclass=Singleton): try: sid = f"{service['id']}" job_id = sid.split("|")[0] + self.remove_plugin_job(pid, job_id) self._jobs[job_id] = { "func": service["func"], "name": service["name"], @@ -446,7 +447,7 @@ class Scheduler(metaclass=Singleton): service["trigger"], id=sid, name=service["name"], - **service["kwargs"], + **(service.get("kwargs") or {}), kwargs={"job_id": job_id}, replace_existing=True ) @@ -457,23 +458,34 @@ class Scheduler(metaclass=Singleton): message=str(e), role="system") - def remove_plugin_job(self, pid: str): + def remove_plugin_job(self, pid: str, job_id: str = None): """ - 移除插件定时服务 + 移除定时服务,可以是单个服务(包括默认服务)或整个插件的所有服务 + :param pid: 插件 ID + :param job_id: 可选,指定要移除的单个服务的 job_id。如果不提供,则移除该插件的所有服务,当移除单个服务时,默认服务也包含在内 """ if not self._scheduler: return with self._lock: - # 获取插件名称 - plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name") - # 先从 _jobs 中查找匹配的服务 - jobs_to_remove = [(job_id, service) for job_id, service in self._jobs.items() if service.get("pid") == pid] + if job_id: + # 移除单个服务 + service = self._jobs.pop(job_id, None) + if not service: + return + jobs_to_remove = [(job_id, service)] + else: + # 移除插件的所有服务 + jobs_to_remove = [ + (job_id, service) for job_id, service in self._jobs.items() if service.get("pid") == pid + ] + for job_id, _ in jobs_to_remove: + self._jobs.pop(job_id, None) if not jobs_to_remove: return + plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name") + # 遍历移除任务 for job_id, service in jobs_to_remove: try: - # 移除服务 - self._jobs.pop(job_id, None) # 在调度器中查找并移除对应的 job job_removed = False for job in list(self._scheduler.get_jobs()):