From 491456b0a28d6268ed2ffce944b99a0830fa11f7 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Mon, 25 Nov 2024 16:30:11 +0800 Subject: [PATCH 1/3] feat(scheduler): support plugin replacement for system services --- app/scheduler.py | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/app/scheduler.py b/app/scheduler.py index 73e11c95..3e979b39 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -41,7 +41,7 @@ class Scheduler(metaclass=Singleton): # 退出事件 _event = threading.Event() # 锁 - _lock = threading.Lock() + _lock = threading.RLock() # 各服务的运行状态 _jobs = {} # 用户认证失败次数 @@ -434,6 +434,7 @@ class Scheduler(metaclass=Singleton): try: sid = f"{service['id']}" job_id = sid.split("|")[0] + self.remove_plugin_job(pid, job_id) self._jobs[job_id] = { "func": service["func"], "name": service["name"], @@ -446,7 +447,7 @@ class Scheduler(metaclass=Singleton): service["trigger"], id=sid, name=service["name"], - **service["kwargs"], + **(service.get("kwargs") or {}), kwargs={"job_id": job_id}, replace_existing=True ) @@ -457,23 +458,34 @@ class Scheduler(metaclass=Singleton): message=str(e), role="system") - def remove_plugin_job(self, pid: str): + def remove_plugin_job(self, pid: str, job_id: str = None): """ - 移除插件定时服务 + 移除定时服务,可以是单个服务(包括默认服务)或整个插件的所有服务 + :param pid: 插件 ID + :param job_id: 可选,指定要移除的单个服务的 job_id。如果不提供,则移除该插件的所有服务,当移除单个服务时,默认服务也包含在内 """ if not self._scheduler: return with self._lock: - # 获取插件名称 - plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name") - # 先从 _jobs 中查找匹配的服务 - jobs_to_remove = [(job_id, service) for job_id, service in self._jobs.items() if service.get("pid") == pid] + if job_id: + # 移除单个服务 + service = self._jobs.pop(job_id, None) + if not service: + return + jobs_to_remove = [(job_id, service)] + else: + # 移除插件的所有服务 + jobs_to_remove = [ + (job_id, service) for job_id, service in self._jobs.items() if service.get("pid") == pid + ] + for job_id, _ in jobs_to_remove: + self._jobs.pop(job_id, None) if not jobs_to_remove: return + plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name") + # 遍历移除任务 for job_id, service in jobs_to_remove: try: - # 移除服务 - self._jobs.pop(job_id, None) # 在调度器中查找并移除对应的 job job_removed = False for job in list(self._scheduler.get_jobs()): From b83c7a56568f0f2754ec00d0d93121529731cd8f Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Mon, 25 Nov 2024 16:31:30 +0800 Subject: [PATCH 2/3] feat(scheduler): support plugin method arguments via func_kwargs --- app/core/plugin.py | 3 ++- app/scheduler.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/app/core/plugin.py b/app/core/plugin.py index c5d90e50..89f499c3 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -526,7 +526,8 @@ class PluginManager(metaclass=Singleton): "name": "服务名称", "trigger": "触发器:cron、interval、date、CronTrigger.from_crontab()", "func": self.xxx, - "kwargs": {} # 定时器参数 + "kwargs": {} # 定时器参数, + "func_kwargs": {} # 方法参数 }] """ ret_services = [] diff --git a/app/scheduler.py b/app/scheduler.py index 3e979b39..43f91efd 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -440,6 +440,7 @@ class Scheduler(metaclass=Singleton): "name": service["name"], "pid": pid, "plugin_name": plugin_name, + "kwargs": service.get("func_kwargs") or {}, "running": False, } self._scheduler.add_job( From aed68253e9bce0e0ee04634c5073805a71d15d91 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Mon, 25 Nov 2024 16:33:17 +0800 Subject: [PATCH 3/3] feat(scheduler): expose internal methods for external invocation --- app/scheduler.py | 98 ++++++++++++++++++++++++------------------------ 1 file changed, 49 insertions(+), 49 deletions(-) diff --git a/app/scheduler.py b/app/scheduler.py index 43f91efd..3dc87174 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -54,53 +54,6 @@ class Scheduler(metaclass=Singleton): """ 初始化定时服务 """ - - def clear_cache(): - """ - 清理缓存 - """ - TorrentsChain().clear_cache() - SchedulerChain().clear_cache() - - def user_auth(): - """ - 用户认证检查 - """ - if SitesHelper().auth_level >= 2: - return - # 最大重试次数 - __max_try__ = 30 - if self._auth_count > __max_try__: - SchedulerChain().messagehelper.put(title=f"用户认证失败", - message="用户认证失败次数过多,将不再尝试认证!", - role="system") - return - logger.info("用户未认证,正在尝试认证...") - auth_conf = SystemConfigOper().get(SystemConfigKey.UserSiteAuthParams) - if auth_conf: - status, msg = SitesHelper().check_user(**auth_conf) - else: - status, msg = SitesHelper().check_user() - if status: - self._auth_count = 0 - logger.info(f"{msg} 用户认证成功") - SchedulerChain().post_message( - Notification( - mtype=NotificationType.Manual, - title="MoviePilot用户认证成功", - text=f"使用站点:{msg}", - link=settings.MP_DOMAIN('#/site') - ) - ) - PluginManager().init_config() - self.init_plugin_jobs() - - else: - self._auth_count += 1 - logger.error(f"用户认证失败:{msg},共失败 {self._auth_count} 次") - if self._auth_count >= __max_try__: - logger.error("用户认证失败次数过多,将不再尝试认证!") - # 各服务的运行状态 self._jobs = { "cookiecloud": { @@ -146,12 +99,12 @@ class Scheduler(metaclass=Singleton): }, "clear_cache": { "name": "缓存清理", - "func": clear_cache, + "func": self.clear_cache, "running": False, }, "user_auth": { "name": "用户认证检查", - "func": user_auth, + "func": self.user_auth, "running": False, }, "scheduler_job": { @@ -570,3 +523,50 @@ class Scheduler(metaclass=Singleton): logger.info("定时任务停止完成") except Exception as e: logger.error(f"停止定时任务失败::{str(e)} - {traceback.format_exc()}") + + @staticmethod + def clear_cache(): + """ + 清理缓存 + """ + TorrentsChain().clear_cache() + SchedulerChain().clear_cache() + + def user_auth(self): + """ + 用户认证检查 + """ + if SitesHelper().auth_level >= 2: + return + # 最大重试次数 + __max_try__ = 30 + if self._auth_count > __max_try__: + SchedulerChain().messagehelper.put(title=f"用户认证失败", + message="用户认证失败次数过多,将不再尝试认证!", + role="system") + return + logger.info("用户未认证,正在尝试认证...") + auth_conf = SystemConfigOper().get(SystemConfigKey.UserSiteAuthParams) + if auth_conf: + status, msg = SitesHelper().check_user(**auth_conf) + else: + status, msg = SitesHelper().check_user() + if status: + self._auth_count = 0 + logger.info(f"{msg} 用户认证成功") + SchedulerChain().post_message( + Notification( + mtype=NotificationType.Manual, + title="MoviePilot用户认证成功", + text=f"使用站点:{msg}", + link=settings.MP_DOMAIN('#/site') + ) + ) + PluginManager().init_config() + self.init_plugin_jobs() + + else: + self._auth_count += 1 + logger.error(f"用户认证失败:{msg},共失败 {self._auth_count} 次") + if self._auth_count >= __max_try__: + logger.error("用户认证失败次数过多,将不再尝试认证!")