diff --git a/app/actions/__init__.py b/app/actions/__init__.py index 1b15d5d9..aa70125d 100644 --- a/app/actions/__init__.py +++ b/app/actions/__init__.py @@ -27,16 +27,18 @@ class BaseAction(BaseModel, ABC): """ raise NotImplementedError + @property @abstractmethod - def is_done(self, context: ActionContext) -> bool: + def done(self) -> bool: """ 判断动作是否完成 """ - raise NotImplementedError + pass + @property @abstractmethod - def is_success(self, context: ActionContext) -> bool: + def success(self) -> bool: """ 判断动作是否成功 """ - raise NotImplementedError + pass diff --git a/app/actions/fetch_rss.py b/app/actions/fetch_rss.py index dcbeceba..95897214 100644 --- a/app/actions/fetch_rss.py +++ b/app/actions/fetch_rss.py @@ -33,8 +33,10 @@ class FetchRssAction(BaseAction): async def execute(self, params: FetchRssParams, context: ActionContext) -> ActionContext: pass - def is_done(self, context: ActionContext) -> bool: - pass + @property + def done(self) -> bool: + return True - def is_success(self, context: ActionContext) -> bool: - pass + @property + def success(self) -> bool: + return True diff --git a/app/actions/search_torrents.py b/app/actions/search_torrents.py index 6fa0b36f..fb27bf1c 100644 --- a/app/actions/search_torrents.py +++ b/app/actions/search_torrents.py @@ -30,11 +30,13 @@ class SearchTorrentsAction(BaseAction): def description(self) -> str: return "根据关键字搜索站点种子资源" + @property + def done(self) -> bool: + return True + + @property + def success(self) -> bool: + return True + async def execute(self, params: SearchTorrentsParams, context: ActionContext) -> ActionContext: pass - - def is_done(self, context: ActionContext) -> bool: - pass - - def is_success(self, context: ActionContext) -> bool: - pass diff --git a/app/chain/workflow.py b/app/chain/workflow.py new file mode 100644 index 00000000..5bca510f --- /dev/null +++ b/app/chain/workflow.py @@ -0,0 +1,22 @@ +from typing import List + +from app.chain import ChainBase +from app.schemas import Workflow + + +class WorkflowChain(ChainBase): + """ + 工作流链 + """ + + def process(self, workflow_id: int) -> bool: + """ + 处理工作流 + """ + pass + + def get_workflows(self) -> List[Workflow]: + """ + 获取工作流列表 + """ + pass diff --git a/app/core/workflow.py b/app/core/workflow.py index 67237a77..f6233716 100644 --- a/app/core/workflow.py +++ b/app/core/workflow.py @@ -1,8 +1,10 @@ -from typing import Dict, Any +from time import sleep +from typing import Dict, Any, Tuple from app.actions import BaseAction from app.helper.module import ModuleHelper from app.log import logger +from app.schemas import Action, ActionContext from app.utils.singleton import Singleton @@ -30,6 +32,8 @@ class WorkFlowManager(metaclass=Singleton): return False if not hasattr(obj, 'execute') or not hasattr(obj, "name"): return False + if obj.__name__ == "BaseAction": + return False return obj.__module__.startswith("app.actions") # 加载所有动作 @@ -47,3 +51,27 @@ class WorkFlowManager(metaclass=Singleton): 停止 """ pass + + def excute(self, action: Action, context: ActionContext = None) -> Tuple[bool, ActionContext]: + """ + 执行工作流动作 + """ + if not context: + context = ActionContext() + if action.id in self._actions: + action_obj = self._actions[action.id] + logger.info(f"执行动作: {action.id} - {action.name}") + result_context = action_obj.execute(action.params, context) + logger.info(f"{action.name} 执行结果: {action_obj.success}") + if action.loop and action.loop_interval: + while not action_obj.done: + logger.info(f"{action.name} 等待 {action.loop_interval} 秒后继续执行") + sleep(action.loop_interval) + logger.info(f"继续执行动作: {action.id} - {action.name}") + result_context = action_obj.execute(action.params, result_context) + logger.info(f"{action.name} 执行结果: {action_obj.success}") + logger.info(f"{action.name} 执行完成") + return action_obj.success, result_context + else: + logger.error(f"未找到动作: {action.id} - {action.name}") + return False, context diff --git a/app/scheduler.py b/app/scheduler.py index 67ebd2a4..d9512db6 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -7,6 +7,7 @@ import pytz from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.base import JobLookupError from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger from app import schemas from app.chain import ChainBase @@ -16,13 +17,14 @@ from app.chain.site import SiteChain from app.chain.subscribe import SubscribeChain from app.chain.tmdb import TmdbChain from app.chain.transfer import TransferChain +from app.chain.workflow import WorkflowChain from app.core.config import settings from app.core.event import EventManager from app.core.plugin import PluginManager from app.db.systemconfig_oper import SystemConfigOper from app.helper.sites import SitesHelper from app.log import logger -from app.schemas import Notification, NotificationType +from app.schemas import Notification, NotificationType, Workflow from app.schemas.types import EventType, SystemConfigKey from app.utils.singleton import Singleton from app.utils.timer import TimerUtils @@ -345,6 +347,10 @@ class Scheduler(metaclass=Singleton): } ) + # 初始化工作流服务 + self.init_workflow_jobs() + + # 初始化插件服务 self.init_plugin_jobs() # 打印服务 @@ -401,52 +407,42 @@ class Scheduler(metaclass=Singleton): for pid in PluginManager().get_running_plugin_ids(): self.update_plugin_job(pid) - def update_plugin_job(self, pid: str): + def init_workflow_jobs(self): """ - 更新插件定时服务 + 初始化工作流定时服务 """ - if not self._scheduler or not pid: + for workflow in WorkflowChain().get_workflows() or []: + self.update_workflow_job(workflow) + + def remove_workflow_job(self, workflow: Workflow): + """ + 移除工作流服务 + """ + if not self._scheduler: return - # 移除该插件的全部服务 - self.remove_plugin_job(pid) - # 获取插件服务列表 with self._lock: - try: - plugin_services = PluginManager().get_plugin_services(pid=pid) - except Exception as e: - logger.error(f"运行插件 {pid} 服务失败:{str(e)} - {traceback.format_exc()}") + job_id = f"workflow-{workflow.id}" + service = self._jobs.pop(job_id, None) + if not service: return - # 获取插件名称 - plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name") - # 开始注册插件服务 - for service in plugin_services: - try: - sid = f"{service['id']}" - job_id = sid.split("|")[0] - self.remove_plugin_job(pid, job_id) - self._jobs[job_id] = { - "func": service["func"], - "name": service["name"], - "pid": pid, - "plugin_name": plugin_name, - "kwargs": service.get("func_kwargs") or {}, - "running": False, - } - self._scheduler.add_job( - self.start, - service["trigger"], - id=sid, - name=service["name"], - **(service.get("kwargs") or {}), - kwargs={"job_id": job_id}, - replace_existing=True - ) - logger.info(f"注册插件{plugin_name}服务:{service['name']} - {service['trigger']}") - except Exception as e: - logger.error(f"注册插件{plugin_name}服务失败:{str(e)} - {service}") - SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务注册失败", - message=str(e), - role="system") + try: + # 在调度器中查找并移除对应的 job + job_removed = False + for job in list(self._scheduler.get_jobs()): + if job_id == job.id: + try: + self._scheduler.remove_job(job.id) + job_removed = True + except JobLookupError: + pass + break + if job_removed: + logger.info(f"移除工作流服务:{service.get('name')}") + except Exception as e: + logger.error(f"移除工作流服务失败:{str(e)} - {job_id}: {service}") + SchedulerChain().messagehelper.put(title=f"工作流 {workflow.name} 服务移除失败", + message=str(e), + role="system") def remove_plugin_job(self, pid: str, job_id: str = None): """ @@ -494,6 +490,83 @@ class Scheduler(metaclass=Singleton): message=str(e), role="system") + def update_workflow_job(self, workflow: Workflow): + """ + 更新工作流定时服务 + """ + # 移除该工作流的全部服务 + self.remove_workflow_job(workflow) + # 添加工作流服务 + with self._lock: + try: + job_id = f"workflow-{workflow.id}" + self._jobs[job_id] = { + "func": WorkflowChain().process, + "name": workflow.name, + "running": False, + } + self._scheduler.add_job( + self.start, + trigger=CronTrigger.from_crontab(workflow.timer), + id=job_id, + name=workflow.name, + kwargs={"job_id": job_id, "workflow_id": job_id}, + replace_existing=True + ) + logger.info(f"注册工作流服务:{workflow.name} - {workflow.timer}") + except Exception as e: + logger.error(f"注册工作流服务失败:{workflow.name} - {str(e)}") + SchedulerChain().messagehelper.put(title=f"工作流 {workflow.name} 服务注册失败", + message=str(e), + role="system") + + def update_plugin_job(self, pid: str): + """ + 更新插件定时服务 + """ + if not self._scheduler or not pid: + return + # 移除该插件的全部服务 + self.remove_plugin_job(pid) + # 获取插件服务列表 + with self._lock: + try: + plugin_services = PluginManager().get_plugin_services(pid=pid) + except Exception as e: + logger.error(f"运行插件 {pid} 服务失败:{str(e)} - {traceback.format_exc()}") + return + # 获取插件名称 + plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name") + # 开始注册插件服务 + for service in plugin_services: + try: + sid = f"{service['id']}" + job_id = sid.split("|")[0] + self.remove_plugin_job(pid, job_id) + self._jobs[job_id] = { + "func": service["func"], + "name": service["name"], + "pid": pid, + "plugin_name": plugin_name, + "kwargs": service.get("func_kwargs") or {}, + "running": False, + } + self._scheduler.add_job( + self.start, + service["trigger"], + id=sid, + name=service["name"], + **(service.get("kwargs") or {}), + kwargs={"job_id": job_id}, + replace_existing=True + ) + logger.info(f"注册插件{plugin_name}服务:{service['name']} - {service['trigger']}") + except Exception as e: + logger.error(f"注册插件{plugin_name}服务失败:{str(e)} - {service}") + SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务注册失败", + message=str(e), + role="system") + def list(self) -> List[schemas.ScheduleInfo]: """ 当前所有任务 diff --git a/app/schemas/workflow.py b/app/schemas/workflow.py index 6b4efa60..45a90906 100644 --- a/app/schemas/workflow.py +++ b/app/schemas/workflow.py @@ -14,6 +14,7 @@ class Workflow(BaseModel): """ 工作流信息 """ + id: Optional[str] = Field(None, description="工作流ID") name: Optional[str] = Field(None, description="工作流名称") description: Optional[str] = Field(None, description="工作流描述") timer: Optional[str] = Field(None, description="定时器") @@ -26,15 +27,23 @@ class Workflow(BaseModel): last_time: Optional[str] = Field(None, description="最后执行时间") +class ActionParams(BaseModel): + """ + 动作基础参数 + """ + pass + + class Action(BaseModel): """ 动作信息 """ + id: Optional[str] = Field(None, description="动作ID (类名)") name: Optional[str] = Field(None, description="动作名称") description: Optional[str] = Field(None, description="动作描述") loop: Optional[bool] = Field(False, description="是否需要循环") loop_interval: Optional[int] = Field(0, description="循环间隔 (秒)") - params: Optional[dict] = Field({}, description="参数") + params: Optional[ActionParams] = Field({}, description="参数") class ActionContext(BaseModel): @@ -49,10 +58,3 @@ class ActionContext(BaseModel): sites: Optional[List[Site]] = Field([], description="站点列表") subscribes: Optional[List[Subscribe]] = Field([], description="订阅列表") messages: Optional[List[Notification]] = Field([], description="消息列表") - - -class ActionParams(BaseModel): - """ - 动作基础参数 - """ - pass