diff --git a/WORKFLOW_SHARE_README.md b/WORKFLOW_SHARE_README.md index e8385d6d..52d0f7af 100644 --- a/WORKFLOW_SHARE_README.md +++ b/WORKFLOW_SHARE_README.md @@ -18,7 +18,10 @@ - `app/helper/workflow.py` - 新增工作流分享helper类 ### 修改文件 -- `app/api/endpoints/workflow.py` - 新增工作流分享相关API接口 +- `app/api/endpoints/workflow.py` - 新增工作流分享相关API接口,使用WorkflowOper进行数据库操作 +- `app/db/workflow_oper.py` - 新增list方法 +- `app/db/models/workflow.py` - 新增list静态方法 +- `app/core/config.py` - 新增WORKFLOW_STATISTIC_SHARE配置项 ## API接口 @@ -112,7 +115,10 @@ GET /api/v1/workflow/shares?name=关键词&page=1&count=30 ## 配置说明 -工作流分享功能复用了订阅分享的配置项 `SUBSCRIBE_STATISTIC_SHARE`,当该配置为 `true` 时,工作流分享功能才会启用。 +工作流分享功能使用独立的配置项 `WORKFLOW_STATISTIC_SHARE`,当该配置为 `true` 时,工作流分享功能才会启用。 + +### 配置项 +- `WORKFLOW_STATISTIC_SHARE`: 工作流数据共享开关,默认为 `true` ## 服务器接口 @@ -153,12 +159,19 @@ def workflow_fork(shareid: int, db: Session = Depends(get_db)): ## 使用说明 -1. **启用功能**: 确保 `SUBSCRIBE_STATISTIC_SHARE` 配置为 `true` +1. **启用功能**: 确保 `WORKFLOW_STATISTIC_SHARE` 配置为 `true` 2. **分享工作流**: 通过API接口分享本地工作流到公共服务器 3. **查看分享**: 查询公共服务器上的工作流分享 4. **复用工作流**: 将其他用户分享的工作流复制到本地使用 5. **管理分享**: 删除自己分享的工作流 +## 技术改进 + +1. **独立配置**: 工作流分享功能使用独立的配置开关,不再依赖订阅分享配置 +2. **数据访问层**: 使用WorkflowOper进行数据库操作,提高代码的可维护性和一致性 +3. **错误处理**: 完善的错误处理和参数验证 +4. **类型安全**: 修复了所有类型相关的linter错误 + ## 注意事项 1. 工作流分享功能需要网络连接才能访问公共服务器 diff --git a/app/api/endpoints/workflow.py b/app/api/endpoints/workflow.py index a09b89ec..aabe5b78 100644 --- a/app/api/endpoints/workflow.py +++ b/app/api/endpoints/workflow.py @@ -26,8 +26,8 @@ def list_workflows(db: Session = Depends(get_db), """ 获取工作流列表 """ - from app.db.models.workflow import Workflow as WorkflowModel - return WorkflowModel.list(db) + from app.db.workflow_oper import WorkflowOper + return WorkflowOper().list() @router.post("/", summary="创建工作流", response_model=schemas.Response) @@ -37,13 +37,14 @@ def create_workflow(workflow: schemas.Workflow, """ 创建工作流 """ - from app.db.models.workflow import Workflow as WorkflowModel - if workflow.name and WorkflowModel.get_by_name(db, workflow.name): + from app.db.workflow_oper import WorkflowOper + if workflow.name and WorkflowOper().get_by_name(workflow.name): return schemas.Response(success=False, message="已存在相同名称的工作流") if not workflow.add_time: workflow.add_time = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S") if not workflow.state: workflow.state = "P" + from app.db.models.workflow import Workflow as WorkflowModel WorkflowModel(**workflow.dict()).create(db) return schemas.Response(success=True, message="创建工作流成功") @@ -71,8 +72,8 @@ def get_workflow(workflow_id: int, """ 获取工作流详情 """ - from app.db.models.workflow import Workflow as WorkflowModel - return WorkflowModel.get(db, workflow_id) + from app.db.workflow_oper import WorkflowOper + return WorkflowOper().get(workflow_id) @router.put("/{workflow_id}", summary="更新工作流", response_model=schemas.Response) @@ -82,10 +83,10 @@ def update_workflow(workflow: schemas.Workflow, """ 更新工作流 """ - from app.db.models.workflow import Workflow as WorkflowModel + from app.db.workflow_oper import WorkflowOper if not workflow.id: return schemas.Response(success=False, message="工作流ID不能为空") - wf = WorkflowModel.get(db, workflow.id) + wf = WorkflowOper().get(workflow.id) if not wf: return schemas.Response(success=False, message="工作流不存在") wf.update(db, workflow.dict()) @@ -99,13 +100,14 @@ def delete_workflow(workflow_id: int, """ 删除工作流 """ - from app.db.models.workflow import Workflow as WorkflowModel - workflow = WorkflowModel.get(db, workflow_id) + from app.db.workflow_oper import WorkflowOper + workflow = WorkflowOper().get(workflow_id) if not workflow: return schemas.Response(success=False, message="工作流不存在") # 删除定时任务 Scheduler().remove_workflow_job(workflow) # 删除工作流 + from app.db.models.workflow import Workflow as WorkflowModel WorkflowModel.delete(db, workflow_id) # 删除缓存 SystemConfigOper().delete(f"WorkflowCache-{workflow_id}") @@ -162,12 +164,14 @@ def workflow_fork( } # 检查名称是否重复 - db = next(get_db()) - from app.db.models.workflow import Workflow as WorkflowModel - if WorkflowModel.get_by_name(db, workflow_dict["name"]): + from app.db.workflow_oper import WorkflowOper + if WorkflowOper().get_by_name(workflow_dict["name"]): return schemas.Response(success=False, message="已存在相同名称的工作流") # 创建新工作流 + from app.db.models.workflow import Workflow as WorkflowModel + from app.db import get_db + db = next(get_db()) workflow = WorkflowModel(**workflow_dict) workflow.create(db) @@ -210,8 +214,8 @@ def start_workflow(workflow_id: int, """ 启用工作流 """ - from app.db.models.workflow import Workflow as WorkflowModel - workflow = WorkflowModel.get(db, workflow_id) + from app.db.workflow_oper import WorkflowOper + workflow = WorkflowOper().get(workflow_id) if not workflow: return schemas.Response(success=False, message="工作流不存在") # 添加定时任务 @@ -228,8 +232,8 @@ def pause_workflow(workflow_id: int, """ 停用工作流 """ - from app.db.models.workflow import Workflow as WorkflowModel - workflow = WorkflowModel.get(db, workflow_id) + from app.db.workflow_oper import WorkflowOper + workflow = WorkflowOper().get(workflow_id) if not workflow: return schemas.Response(success=False, message="工作流不存在") # 删除定时任务 @@ -248,8 +252,8 @@ def reset_workflow(workflow_id: int, """ 重置工作流 """ - from app.db.models.workflow import Workflow as WorkflowModel - workflow = WorkflowModel.get(db, workflow_id) + from app.db.workflow_oper import WorkflowOper + workflow = WorkflowOper().get(workflow_id) if not workflow: return schemas.Response(success=False, message="工作流不存在") # 停止工作流 diff --git a/app/core/config.py b/app/core/config.py index 428f8ad1..61035cd1 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -201,6 +201,8 @@ class ConfigModel(BaseModel): SUBSCRIBE_RSS_INTERVAL: int = 30 # 订阅数据共享 SUBSCRIBE_STATISTIC_SHARE: bool = True + # 工作流数据共享 + WORKFLOW_STATISTIC_SHARE: bool = True # 订阅搜索开关 SUBSCRIBE_SEARCH: bool = False # 检查本地媒体库是否存在资源开关 diff --git a/app/db/models/workflow.py b/app/db/models/workflow.py index dd514b06..78c6c2db 100644 --- a/app/db/models/workflow.py +++ b/app/db/models/workflow.py @@ -37,6 +37,11 @@ class Workflow(Base): # 最后执行时间 last_time = Column(String) + @staticmethod + @db_query + def list(db): + return db.query(Workflow).all() + @staticmethod @db_query def get_enabled_workflows(db): diff --git a/app/db/workflow_oper.py b/app/db/workflow_oper.py index 691bd539..d55f940a 100644 --- a/app/db/workflow_oper.py +++ b/app/db/workflow_oper.py @@ -25,6 +25,12 @@ class WorkflowOper(DbOper): """ return Workflow.get(self._db, wid) + def list(self) -> List[Workflow]: + """ + 获取所有工作流列表 + """ + return Workflow.list(self._db) + def list_enabled(self) -> List[Workflow]: """ 获取启用的工作流列表 diff --git a/app/helper/workflow.py b/app/helper/workflow.py index fd427a47..5cf49dc4 100644 --- a/app/helper/workflow.py +++ b/app/helper/workflow.py @@ -4,7 +4,7 @@ import json from app.core.cache import cached, cache_backend from app.core.config import settings -from app.db.models.workflow import Workflow as WorkflowModel +from app.db.workflow_oper import WorkflowOper from app.db.systemconfig_oper import SystemConfigOper from app.log import logger from app.schemas.types import SystemConfigKey @@ -36,13 +36,11 @@ class WorkflowHelper(metaclass=WeakSingleton): """ 分享工作流 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: # 复用订阅分享的配置 - return False, "当前没有开启数据共享功能" + if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 + return False, "当前没有开启工作流数据共享功能" # 获取工作流信息 - from app.db import get_db - db = next(get_db()) - workflow = WorkflowModel.get(db, workflow_id) + workflow = WorkflowOper().get(workflow_id) if not workflow: return False, "工作流不存在" @@ -75,8 +73,8 @@ class WorkflowHelper(metaclass=WeakSingleton): """ 删除分享 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: # 复用订阅分享的配置 - return False, "当前没有开启数据共享功能" + if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 + return False, "当前没有开启工作流数据共享功能" res = RequestUtils(proxies=settings.PROXY or {}, timeout=5).delete_res(f"{self._workflow_share}/{share_id}", @@ -94,8 +92,8 @@ class WorkflowHelper(metaclass=WeakSingleton): """ 复用分享的工作流 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: # 复用订阅分享的配置 - return False, "当前没有开启数据共享功能" + if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 + return False, "当前没有开启工作流数据共享功能" res = RequestUtils(proxies=settings.PROXY or {}, timeout=5, headers={ "Content-Type": "application/json" @@ -112,7 +110,7 @@ class WorkflowHelper(metaclass=WeakSingleton): """ 获取工作流分享数据 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: # 复用订阅分享的配置 + if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 return [] res = RequestUtils(proxies=settings.PROXY or {}, timeout=15).get_res(self._workflow_shares, params={