Merge pull request #3016 from InfinityPacer/feature/scheduler

This commit is contained in:
jxxghp
2024-11-06 20:01:14 +08:00
committed by GitHub

View File

@@ -1,4 +1,3 @@
import logging
import threading
import traceback
from datetime import datetime, timedelta
@@ -27,12 +26,6 @@ from app.schemas.types import EventType
from app.utils.singleton import Singleton
from app.utils.timer import TimerUtils
# 获取 apscheduler 的日志记录器
scheduler_logger = logging.getLogger('apscheduler')
# 设置日志级别为 WARNING
scheduler_logger.setLevel(logging.WARNING)
class SchedulerChain(ChainBase):
pass
@@ -436,23 +429,23 @@ class Scheduler(metaclass=Singleton):
try:
sid = f"{service['id']}"
job_id = sid.split("|")[0]
if job_id not in self._jobs:
self._jobs[job_id] = {
"func": service["func"],
"name": service["name"],
"pid": pid,
"plugin_name": plugin_name,
"running": False,
}
self._scheduler.add_job(
self.start,
service["trigger"],
id=sid,
name=service["name"],
**service["kwargs"],
kwargs={"job_id": job_id}
)
logger.info(f"注册插件{plugin_name}服务:{service['name']} - {service['trigger']}")
self._jobs[job_id] = {
"func": service["func"],
"name": service["name"],
"pid": pid,
"plugin_name": plugin_name,
"running": False,
}
self._scheduler.add_job(
self.start,
service["trigger"],
id=sid,
name=service["name"],
**service["kwargs"],
kwargs={"job_id": job_id},
replace_existing=True
)
logger.info(f"注册插件{plugin_name}服务:{service['name']} - {service['trigger']}")
except Exception as e:
logger.error(f"注册插件{plugin_name}服务失败:{str(e)} - {service}")
SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务注册失败",
@@ -468,14 +461,25 @@ class Scheduler(metaclass=Singleton):
with self._lock:
# 获取插件名称
plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name")
for job_id, service in self._jobs.copy().items():
# 先从 _jobs 中查找匹配的服务
jobs_to_remove = [(job_id, service) for job_id, service in self._jobs.items() if service.get("pid") == pid]
if not jobs_to_remove:
return
for job_id, service in jobs_to_remove:
try:
if service.get("pid") == pid:
self._jobs.pop(job_id, None)
try:
self._scheduler.remove_job(job_id)
except JobLookupError:
pass
# 移除服务
self._jobs.pop(job_id, None)
# 在调度器中查找并移除对应的 job
job_removed = False
for job in list(self._scheduler.get_jobs()):
job_id_from_service = job.id.split("|")[0]
if job_id == job_id_from_service:
try:
self._scheduler.remove_job(job.id)
job_removed = True
except JobLookupError:
pass
if job_removed:
logger.info(f"移除插件服务({plugin_name}){service.get('name')}")
except Exception as e:
logger.error(f"移除插件服务失败:{str(e)} - {job_id}: {service}")