fix workflow

This commit is contained in:
jxxghp
2025-02-17 11:40:32 +08:00
parent 68cba44476
commit 77ae40e3d6
7 changed files with 196 additions and 65 deletions

View File

@@ -27,16 +27,18 @@ class BaseAction(BaseModel, ABC):
"""
raise NotImplementedError
@property
@abstractmethod
def is_done(self, context: ActionContext) -> bool:
def done(self) -> bool:
"""
判断动作是否完成
"""
raise NotImplementedError
pass
@property
@abstractmethod
def is_success(self, context: ActionContext) -> bool:
def success(self) -> bool:
"""
判断动作是否成功
"""
raise NotImplementedError
pass

View File

@@ -33,8 +33,10 @@ class FetchRssAction(BaseAction):
async def execute(self, params: FetchRssParams, context: ActionContext) -> ActionContext:
pass
def is_done(self, context: ActionContext) -> bool:
pass
@property
def done(self) -> bool:
return True
def is_success(self, context: ActionContext) -> bool:
pass
@property
def success(self) -> bool:
return True

View File

@@ -30,11 +30,13 @@ class SearchTorrentsAction(BaseAction):
def description(self) -> str:
return "根据关键字搜索站点种子资源"
@property
def done(self) -> bool:
return True
@property
def success(self) -> bool:
return True
async def execute(self, params: SearchTorrentsParams, context: ActionContext) -> ActionContext:
pass
def is_done(self, context: ActionContext) -> bool:
pass
def is_success(self, context: ActionContext) -> bool:
pass

22
app/chain/workflow.py Normal file
View File

@@ -0,0 +1,22 @@
from typing import List
from app.chain import ChainBase
from app.schemas import Workflow
class WorkflowChain(ChainBase):
"""
工作流链
"""
def process(self, workflow_id: int) -> bool:
"""
处理工作流
"""
pass
def get_workflows(self) -> List[Workflow]:
"""
获取工作流列表
"""
pass

View File

@@ -1,8 +1,10 @@
from typing import Dict, Any
from time import sleep
from typing import Dict, Any, Tuple
from app.actions import BaseAction
from app.helper.module import ModuleHelper
from app.log import logger
from app.schemas import Action, ActionContext
from app.utils.singleton import Singleton
@@ -30,6 +32,8 @@ class WorkFlowManager(metaclass=Singleton):
return False
if not hasattr(obj, 'execute') or not hasattr(obj, "name"):
return False
if obj.__name__ == "BaseAction":
return False
return obj.__module__.startswith("app.actions")
# 加载所有动作
@@ -47,3 +51,27 @@ class WorkFlowManager(metaclass=Singleton):
停止
"""
pass
def excute(self, action: Action, context: ActionContext = None) -> Tuple[bool, ActionContext]:
"""
执行工作流动作
"""
if not context:
context = ActionContext()
if action.id in self._actions:
action_obj = self._actions[action.id]
logger.info(f"执行动作: {action.id} - {action.name}")
result_context = action_obj.execute(action.params, context)
logger.info(f"{action.name} 执行结果: {action_obj.success}")
if action.loop and action.loop_interval:
while not action_obj.done:
logger.info(f"{action.name} 等待 {action.loop_interval} 秒后继续执行")
sleep(action.loop_interval)
logger.info(f"继续执行动作: {action.id} - {action.name}")
result_context = action_obj.execute(action.params, result_context)
logger.info(f"{action.name} 执行结果: {action_obj.success}")
logger.info(f"{action.name} 执行完成")
return action_obj.success, result_context
else:
logger.error(f"未找到动作: {action.id} - {action.name}")
return False, context

View File

@@ -7,6 +7,7 @@ import pytz
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.base import JobLookupError
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from app import schemas
from app.chain import ChainBase
@@ -16,13 +17,14 @@ from app.chain.site import SiteChain
from app.chain.subscribe import SubscribeChain
from app.chain.tmdb import TmdbChain
from app.chain.transfer import TransferChain
from app.chain.workflow import WorkflowChain
from app.core.config import settings
from app.core.event import EventManager
from app.core.plugin import PluginManager
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.sites import SitesHelper
from app.log import logger
from app.schemas import Notification, NotificationType
from app.schemas import Notification, NotificationType, Workflow
from app.schemas.types import EventType, SystemConfigKey
from app.utils.singleton import Singleton
from app.utils.timer import TimerUtils
@@ -345,6 +347,10 @@ class Scheduler(metaclass=Singleton):
}
)
# 初始化工作流服务
self.init_workflow_jobs()
# 初始化插件服务
self.init_plugin_jobs()
# 打印服务
@@ -401,52 +407,42 @@ class Scheduler(metaclass=Singleton):
for pid in PluginManager().get_running_plugin_ids():
self.update_plugin_job(pid)
def update_plugin_job(self, pid: str):
def init_workflow_jobs(self):
"""
更新插件定时服务
初始化工作流定时服务
"""
if not self._scheduler or not pid:
for workflow in WorkflowChain().get_workflows() or []:
self.update_workflow_job(workflow)
def remove_workflow_job(self, workflow: Workflow):
"""
移除工作流服务
"""
if not self._scheduler:
return
# 移除该插件的全部服务
self.remove_plugin_job(pid)
# 获取插件服务列表
with self._lock:
try:
plugin_services = PluginManager().get_plugin_services(pid=pid)
except Exception as e:
logger.error(f"运行插件 {pid} 服务失败:{str(e)} - {traceback.format_exc()}")
job_id = f"workflow-{workflow.id}"
service = self._jobs.pop(job_id, None)
if not service:
return
# 获取插件名称
plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name")
# 开始注册插件服务
for service in plugin_services:
try:
sid = f"{service['id']}"
job_id = sid.split("|")[0]
self.remove_plugin_job(pid, job_id)
self._jobs[job_id] = {
"func": service["func"],
"name": service["name"],
"pid": pid,
"plugin_name": plugin_name,
"kwargs": service.get("func_kwargs") or {},
"running": False,
}
self._scheduler.add_job(
self.start,
service["trigger"],
id=sid,
name=service["name"],
**(service.get("kwargs") or {}),
kwargs={"job_id": job_id},
replace_existing=True
)
logger.info(f"注册插件{plugin_name}服务:{service['name']} - {service['trigger']}")
except Exception as e:
logger.error(f"注册插件{plugin_name}服务失败:{str(e)} - {service}")
SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务注册失败",
message=str(e),
role="system")
try:
# 在调度器中查找并移除对应的 job
job_removed = False
for job in list(self._scheduler.get_jobs()):
if job_id == job.id:
try:
self._scheduler.remove_job(job.id)
job_removed = True
except JobLookupError:
pass
break
if job_removed:
logger.info(f"移除工作流服务:{service.get('name')}")
except Exception as e:
logger.error(f"移除工作流服务失败:{str(e)} - {job_id}: {service}")
SchedulerChain().messagehelper.put(title=f"工作流 {workflow.name} 服务移除失败",
message=str(e),
role="system")
def remove_plugin_job(self, pid: str, job_id: str = None):
"""
@@ -494,6 +490,83 @@ class Scheduler(metaclass=Singleton):
message=str(e),
role="system")
def update_workflow_job(self, workflow: Workflow):
"""
更新工作流定时服务
"""
# 移除该工作流的全部服务
self.remove_workflow_job(workflow)
# 添加工作流服务
with self._lock:
try:
job_id = f"workflow-{workflow.id}"
self._jobs[job_id] = {
"func": WorkflowChain().process,
"name": workflow.name,
"running": False,
}
self._scheduler.add_job(
self.start,
trigger=CronTrigger.from_crontab(workflow.timer),
id=job_id,
name=workflow.name,
kwargs={"job_id": job_id, "workflow_id": job_id},
replace_existing=True
)
logger.info(f"注册工作流服务:{workflow.name} - {workflow.timer}")
except Exception as e:
logger.error(f"注册工作流服务失败:{workflow.name} - {str(e)}")
SchedulerChain().messagehelper.put(title=f"工作流 {workflow.name} 服务注册失败",
message=str(e),
role="system")
def update_plugin_job(self, pid: str):
"""
更新插件定时服务
"""
if not self._scheduler or not pid:
return
# 移除该插件的全部服务
self.remove_plugin_job(pid)
# 获取插件服务列表
with self._lock:
try:
plugin_services = PluginManager().get_plugin_services(pid=pid)
except Exception as e:
logger.error(f"运行插件 {pid} 服务失败:{str(e)} - {traceback.format_exc()}")
return
# 获取插件名称
plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name")
# 开始注册插件服务
for service in plugin_services:
try:
sid = f"{service['id']}"
job_id = sid.split("|")[0]
self.remove_plugin_job(pid, job_id)
self._jobs[job_id] = {
"func": service["func"],
"name": service["name"],
"pid": pid,
"plugin_name": plugin_name,
"kwargs": service.get("func_kwargs") or {},
"running": False,
}
self._scheduler.add_job(
self.start,
service["trigger"],
id=sid,
name=service["name"],
**(service.get("kwargs") or {}),
kwargs={"job_id": job_id},
replace_existing=True
)
logger.info(f"注册插件{plugin_name}服务:{service['name']} - {service['trigger']}")
except Exception as e:
logger.error(f"注册插件{plugin_name}服务失败:{str(e)} - {service}")
SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务注册失败",
message=str(e),
role="system")
def list(self) -> List[schemas.ScheduleInfo]:
"""
当前所有任务

View File

@@ -14,6 +14,7 @@ class Workflow(BaseModel):
"""
工作流信息
"""
id: Optional[str] = Field(None, description="工作流ID")
name: Optional[str] = Field(None, description="工作流名称")
description: Optional[str] = Field(None, description="工作流描述")
timer: Optional[str] = Field(None, description="定时器")
@@ -26,15 +27,23 @@ class Workflow(BaseModel):
last_time: Optional[str] = Field(None, description="最后执行时间")
class ActionParams(BaseModel):
"""
动作基础参数
"""
pass
class Action(BaseModel):
"""
动作信息
"""
id: Optional[str] = Field(None, description="动作ID (类名)")
name: Optional[str] = Field(None, description="动作名称")
description: Optional[str] = Field(None, description="动作描述")
loop: Optional[bool] = Field(False, description="是否需要循环")
loop_interval: Optional[int] = Field(0, description="循环间隔 (秒)")
params: Optional[dict] = Field({}, description="参数")
params: Optional[ActionParams] = Field({}, description="参数")
class ActionContext(BaseModel):
@@ -49,10 +58,3 @@ class ActionContext(BaseModel):
sites: Optional[List[Site]] = Field([], description="站点列表")
subscribes: Optional[List[Subscribe]] = Field([], description="订阅列表")
messages: Optional[List[Notification]] = Field([], description="消息列表")
class ActionParams(BaseModel):
"""
动作基础参数
"""
pass