feat(scheduler): support plugin replacement for system services

This commit is contained in:
InfinityPacer
2024-11-25 16:30:11 +08:00
parent 9acbcf4922
commit 491456b0a2

View File

@@ -41,7 +41,7 @@ class Scheduler(metaclass=Singleton):
# 退出事件
_event = threading.Event()
# 锁
_lock = threading.Lock()
_lock = threading.RLock()
# 各服务的运行状态
_jobs = {}
# 用户认证失败次数
@@ -434,6 +434,7 @@ class Scheduler(metaclass=Singleton):
try:
sid = f"{service['id']}"
job_id = sid.split("|")[0]
self.remove_plugin_job(pid, job_id)
self._jobs[job_id] = {
"func": service["func"],
"name": service["name"],
@@ -446,7 +447,7 @@ class Scheduler(metaclass=Singleton):
service["trigger"],
id=sid,
name=service["name"],
**service["kwargs"],
**(service.get("kwargs") or {}),
kwargs={"job_id": job_id},
replace_existing=True
)
@@ -457,23 +458,34 @@ class Scheduler(metaclass=Singleton):
message=str(e),
role="system")
def remove_plugin_job(self, pid: str):
def remove_plugin_job(self, pid: str, job_id: str = None):
"""
移除插件定时服务
移除定时服务,可以是单个服务(包括默认服务)或整个插件的所有服务
:param pid: 插件 ID
:param job_id: 可选,指定要移除的单个服务的 job_id。如果不提供则移除该插件的所有服务当移除单个服务时默认服务也包含在内
"""
if not self._scheduler:
return
with self._lock:
# 获取插件名称
plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name")
# 先从 _jobs 中查找匹配的服务
jobs_to_remove = [(job_id, service) for job_id, service in self._jobs.items() if service.get("pid") == pid]
if job_id:
# 移除单个服务
service = self._jobs.pop(job_id, None)
if not service:
return
jobs_to_remove = [(job_id, service)]
else:
# 移除插件的所有服务
jobs_to_remove = [
(job_id, service) for job_id, service in self._jobs.items() if service.get("pid") == pid
]
for job_id, _ in jobs_to_remove:
self._jobs.pop(job_id, None)
if not jobs_to_remove:
return
plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name")
# 遍历移除任务
for job_id, service in jobs_to_remove:
try:
# 移除服务
self._jobs.pop(job_id, None)
# 在调度器中查找并移除对应的 job
job_removed = False
for job in list(self._scheduler.get_jobs()):