diff --git a/app/actions/__init__.py b/app/actions/__init__.py index c9855cb5..1b15d5d9 100644 --- a/app/actions/__init__.py +++ b/app/actions/__init__.py @@ -21,5 +21,22 @@ class BaseAction(BaseModel, ABC): pass @abstractmethod - async def execute(self, params: ActionParams, context: ActionContext) -> ActionContext: + def execute(self, params: ActionParams, context: ActionContext) -> ActionContext: + """ + 执行动作 + """ + raise NotImplementedError + + @abstractmethod + def is_done(self, context: ActionContext) -> bool: + """ + 判断动作是否完成 + """ + raise NotImplementedError + + @abstractmethod + def is_success(self, context: ActionContext) -> bool: + """ + 判断动作是否成功 + """ raise NotImplementedError diff --git a/app/actions/fetch_rss.py b/app/actions/fetch_rss.py index e152785e..dcbeceba 100644 --- a/app/actions/fetch_rss.py +++ b/app/actions/fetch_rss.py @@ -21,6 +21,7 @@ class FetchRssAction(BaseAction): """ 获取RSS资源列表 """ + @property def name(self) -> str: return "获取RSS资源列表" @@ -31,3 +32,9 @@ class FetchRssAction(BaseAction): async def execute(self, params: FetchRssParams, context: ActionContext) -> ActionContext: pass + + def is_done(self, context: ActionContext) -> bool: + pass + + def is_success(self, context: ActionContext) -> bool: + pass diff --git a/app/actions/search_torrents.py b/app/actions/search_torrents.py index 550808e2..6fa0b36f 100644 --- a/app/actions/search_torrents.py +++ b/app/actions/search_torrents.py @@ -32,3 +32,9 @@ class SearchTorrentsAction(BaseAction): async def execute(self, params: SearchTorrentsParams, context: ActionContext) -> ActionContext: pass + + def is_done(self, context: ActionContext) -> bool: + pass + + def is_success(self, context: ActionContext) -> bool: + pass diff --git a/app/core/workflow.py b/app/core/workflow.py index 15cbf79e..38267b40 100644 --- a/app/core/workflow.py +++ b/app/core/workflow.py @@ -1,24 +1,46 @@ +from typing import Dict, Any + +from app.actions import BaseAction +from app.helper.module import ModuleHelper +from app.log import logger +from app.utils.singleton import Singleton -class WorkFlowManager: +class WorkFlowManager(metaclass=Singleton): """ 工作流管理器 """ + + # 所有动作定义 + _actions: Dict[str, BaseAction] = {} + def __init__(self): - self.workflows = {} + self.init() - def register(self, workflow): + def init(self): """ - 注册工作流 - :param workflow: 工作流对象 - :return: + 初始化 """ - self.workflows[workflow.name] = workflow + def check_module(module: Any): + """ + 检查模块 + """ + if not hasattr(module, 'execute') or not hasattr(module, "name"): + return False + return True - def get_workflow(self, name): + # 加载所有动作 + self._actions = {} + actions = ModuleHelper.load( + "app.actions", + filter_func=lambda _, obj: check_module(obj) + ) + for action in actions: + logger.debug(f"加载动作: {action.__name__}") + self._actions[action.__name__] = action + + def stop(self): """ - 获取工作流 - :param name: 工作流名称 - :return: + 停止 """ - return self.workflows.get(name) + pass diff --git a/app/schemas/workflow.py b/app/schemas/workflow.py index fa37f6a7..6b4efa60 100644 --- a/app/schemas/workflow.py +++ b/app/schemas/workflow.py @@ -1,8 +1,13 @@ -from typing import Optional, List, Tuple +from typing import Optional, List from pydantic import BaseModel, Field -from app.schemas import Context, MediaInfo, FileItem, Site, Subscribe, Notification, DownloadTask +from app.schemas.context import Context, MediaInfo +from app.schemas.file import FileItem +from app.schemas.download import DownloadTask +from app.schemas.site import Site +from app.schemas.subscribe import Subscribe +from app.schemas.message import Notification class Workflow(BaseModel): diff --git a/app/startup/lifecycle.py b/app/startup/lifecycle.py index 9a6fd09e..df0f2ab7 100644 --- a/app/startup/lifecycle.py +++ b/app/startup/lifecycle.py @@ -3,6 +3,7 @@ from contextlib import asynccontextmanager from fastapi import FastAPI +from app.startup.workflow_initializer import init_workflow, stop_workflow from app.startup.modules_initializer import shutdown_modules, start_modules from app.startup.plugins_initializer import init_plugins_async from app.startup.routers_initializer import init_routers @@ -16,6 +17,8 @@ async def lifespan(app: FastAPI): print("Starting up...") # 启动模块 start_modules(app) + # 初始化工作流动作 + init_workflow(app) # 初始化路由 init_routers(app) # 初始化插件 @@ -35,3 +38,6 @@ async def lifespan(app: FastAPI): print(f"Error during plugin installation shutdown: {e}") # 清理模块 shutdown_modules(app) + # 关闭工作流 + stop_workflow(app) + diff --git a/app/startup/workflow_initializer.py b/app/startup/workflow_initializer.py new file mode 100644 index 00000000..a0b937ea --- /dev/null +++ b/app/startup/workflow_initializer.py @@ -0,0 +1,17 @@ +from fastapi import FastAPI + +from app.core.workflow import WorkFlowManager + + +def init_workflow(_: FastAPI): + """ + 初始化动作 + """ + WorkFlowManager() + + +def stop_workflow(_: FastAPI): + """ + 停止动作 + """ + WorkFlowManager().stop()