From 003781e90314eacdb9d9e239a4f9f622ae12cae6 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Sat, 18 Oct 2025 21:55:31 +0800 Subject: [PATCH] add MoviePilot AI agent implementation and workflow manager --- app/agent/__init__.py | 354 ++++++++++++++++++ app/agent/agent.py | 354 ------------------ app/api/endpoints/workflow.py | 2 +- app/chain/message.py | 2 +- app/chain/workflow.py | 2 +- app/startup/agent_initializer.py | 2 +- app/startup/workflow_initializer.py | 3 +- .../workflow.py => workflow/__init__.py} | 2 +- app/{ => workflow}/actions/__init__.py | 0 app/{ => workflow}/actions/add_download.py | 2 +- app/{ => workflow}/actions/add_subscribe.py | 2 +- app/{ => workflow}/actions/fetch_downloads.py | 2 +- app/{ => workflow}/actions/fetch_medias.py | 2 +- app/{ => workflow}/actions/fetch_rss.py | 2 +- app/{ => workflow}/actions/fetch_torrents.py | 2 +- app/{ => workflow}/actions/filter_medias.py | 2 +- app/{ => workflow}/actions/filter_torrents.py | 2 +- app/{ => workflow}/actions/invoke_plugin.py | 2 +- app/{ => workflow}/actions/note.py | 2 +- app/{ => workflow}/actions/scan_file.py | 2 +- app/{ => workflow}/actions/scrape_file.py | 2 +- app/{ => workflow}/actions/send_event.py | 2 +- app/{ => workflow}/actions/send_message.py | 2 +- app/{ => workflow}/actions/transfer_file.py | 2 +- 24 files changed, 375 insertions(+), 376 deletions(-) delete mode 100644 app/agent/agent.py rename app/{core/workflow.py => workflow/__init__.py} (99%) rename app/{ => workflow}/actions/__init__.py (100%) rename app/{ => workflow}/actions/add_download.py (99%) rename app/{ => workflow}/actions/add_subscribe.py (98%) rename app/{ => workflow}/actions/fetch_downloads.py (97%) rename app/{ => workflow}/actions/fetch_medias.py (99%) rename app/{ => workflow}/actions/fetch_rss.py (98%) rename app/{ => workflow}/actions/fetch_torrents.py (98%) rename app/{ => workflow}/actions/filter_medias.py (97%) rename app/{ => workflow}/actions/filter_torrents.py (97%) rename app/{ => workflow}/actions/invoke_plugin.py (98%) rename app/{ => workflow}/actions/note.py (92%) rename app/{ => workflow}/actions/scan_file.py (98%) rename app/{ => workflow}/actions/scrape_file.py (98%) rename app/{ => workflow}/actions/send_event.py (96%) rename app/{ => workflow}/actions/send_message.py (97%) rename app/{ => workflow}/actions/transfer_file.py (99%) diff --git a/app/agent/__init__.py b/app/agent/__init__.py index e69de29b..5011f432 100644 --- a/app/agent/__init__.py +++ b/app/agent/__init__.py @@ -0,0 +1,354 @@ +"""MoviePilot AI智能体实现""" + +import asyncio +import threading +from typing import Dict, List, Any + +from langchain.agents import AgentExecutor, create_openai_tools_agent +from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder +from langchain_community.callbacks import get_openai_callback +from langchain_core.callbacks import AsyncCallbackHandler +from langchain_core.chat_history import InMemoryChatMessageHistory +from langchain_core.messages import HumanMessage, AIMessage, ToolCall +from langchain_core.runnables.history import RunnableWithMessageHistory + +from app.agent.memory import ConversationMemoryManager +from app.agent.prompt import PromptManager +from app.agent.tools import MoviePilotToolFactory +from app.core.config import settings +from app.helper.message import MessageHelper +from app.log import logger + + +class StreamingCallbackHandler(AsyncCallbackHandler): + """流式输出回调处理器""" + + def __init__(self, session_id: str): + self._lock = threading.Lock() + self.session_id = session_id + self.current_message = "" + self.message_helper = MessageHelper() + + async def get_message(self): + """获取当前消息内容,获取后清空""" + with self._lock: + if not self.current_message: + return "" + msg = self.current_message + logger.info(f"Agent消息: {msg}") + self.current_message = "" + return msg + + async def on_llm_new_token(self, token: str, **kwargs): + """处理新的token""" + if not token: + return + with self._lock: + # 缓存当前消息 + self.current_message += token + + +class MoviePilotAgent: + """MoviePilot AI智能体""" + + def __init__(self, session_id: str, user_id: str = None): + self.session_id = session_id + self.user_id = user_id + + # 消息助手 + self.message_helper = MessageHelper() + + # 记忆管理器 + self.memory_manager = ConversationMemoryManager() + + # 提示词管理器 + self.prompt_manager = PromptManager() + + # 回调处理器 + self.callback_handler = StreamingCallbackHandler( + session_id=session_id + ) + + # LLM模型 + self.llm = self._initialize_llm() + + # 工具 + self.tools = self._initialize_tools() + + # 会话存储 + self.session_store = self._initialize_session_store() + + # 提示词模板 + self.prompt = self._initialize_prompt() + + # Agent执行器 + self.agent_executor = self._create_agent_executor() + + def _initialize_llm(self): + """初始化LLM模型""" + provider = settings.LLM_PROVIDER.lower() + api_key = settings.LLM_API_KEY + if not api_key: + raise ValueError("未配置 LLM_API_KEY") + + if provider == "google": + from langchain_google_genai import ChatGoogleGenerativeAI + return ChatGoogleGenerativeAI( + model=settings.LLM_MODEL, + google_api_key=api_key, + max_retries=3, + temperature=settings.LLM_TEMPERATURE, + streaming=True, + callbacks=[self.callback_handler] + ) + elif provider == "deepseek": + from langchain_deepseek import ChatDeepSeek + return ChatDeepSeek( + model=settings.LLM_MODEL, + api_key=api_key, + max_retries=3, + temperature=settings.LLM_TEMPERATURE, + streaming=True, + callbacks=[self.callback_handler], + stream_usage=True + ) + else: + from langchain_openai import ChatOpenAI + return ChatOpenAI( + model=settings.LLM_MODEL, + api_key=api_key, + max_retries=3, + base_url=settings.LLM_BASE_URL, + temperature=settings.LLM_TEMPERATURE, + streaming=True, + callbacks=[self.callback_handler], + stream_usage=True + ) + + def _initialize_tools(self) -> List: + """初始化工具列表""" + return MoviePilotToolFactory.create_tools( + session_id=self.session_id, + user_id=self.user_id, + message_helper=self.message_helper + ) + + @staticmethod + def _initialize_session_store() -> Dict[str, InMemoryChatMessageHistory]: + """初始化内存存储""" + return {} + + def get_session_history(self, session_id: str) -> InMemoryChatMessageHistory: + """获取会话历史""" + if session_id not in self.session_store: + chat_history = InMemoryChatMessageHistory() + messages: List[dict] = self.memory_manager.get_recent_messages_for_agent( + session_id=session_id, + user_id=self.user_id + ) + if messages: + for msg in messages: + if msg.get("role") == "user": + chat_history.add_user_message(HumanMessage(content=msg.get("content", ""))) + elif msg.get("role") == "agent": + chat_history.add_ai_message(AIMessage(content=msg.get("content", ""))) + elif msg.get("role") == "tool_call": + metadata = msg.get("metadata", {}) + chat_history.add_ai_message(AIMessage( + content=msg.get("content", ""), + tool_calls=[ToolCall( + id=metadata.get("call_id"), + name=metadata.get("tool_name"), + args=metadata.get("parameters"), + )] + )) + elif msg.get("role") == "tool_result": + chat_history.add_ai_message(AIMessage(content=msg.get("content", ""))) + elif msg.get("role") == "system": + chat_history.add_ai_message(AIMessage(content=msg.get("content", ""))) + self.session_store[session_id] = chat_history + return self.session_store[session_id] + + @staticmethod + def _initialize_prompt() -> ChatPromptTemplate: + """初始化提示词模板""" + try: + prompt = ChatPromptTemplate.from_messages([ + ("system", "{system_prompt}"), + MessagesPlaceholder(variable_name="chat_history"), + ("user", "{input}"), + MessagesPlaceholder(variable_name="agent_scratchpad"), + ]) + logger.info("LangChain提示词模板初始化成功") + return prompt + except Exception as e: + logger.error(f"初始化提示词失败: {e}") + raise e + + def _create_agent_executor(self) -> RunnableWithMessageHistory: + """创建Agent执行器""" + try: + agent = create_openai_tools_agent( + llm=self.llm, + tools=self.tools, + prompt=self.prompt + ) + executor = AgentExecutor( + agent=agent, + tools=self.tools, + verbose=settings.LLM_VERBOSE, + max_iterations=settings.LLM_MAX_ITERATIONS, + return_intermediate_steps=True, + handle_parsing_errors=True, + early_stopping_method="force" + ) + return RunnableWithMessageHistory( + executor, + self.get_session_history, + input_messages_key="input", + history_messages_key="chat_history" + ) + except Exception as e: + logger.error(f"创建Agent执行器失败: {e}") + raise e + + async def process_message(self, message: str) -> str: + """处理用户消息""" + try: + # 添加用户消息到记忆 + await self.memory_manager.add_memory( + self.session_id, + user_id=self.user_id, + role="user", + content=message + ) + + # 构建输入上下文 + input_context = { + "system_prompt": self.prompt_manager.get_agent_prompt(), + "input": message + } + + # 执行Agent + logger.info(f"Agent执行推理: session_id={self.session_id}, input={message}") + await self._execute_agent(input_context) + + # 获取Agent回复 + agent_message = await self.callback_handler.get_message() + + # 发送Agent回复给用户 + self.message_helper.put( + message=agent_message, + role="system" + ) + + # 添加Agent回复到记忆 + await self.memory_manager.add_memory( + session_id=self.session_id, + user_id=self.user_id, + role="agent", + content=agent_message + ) + + return agent_message + + except Exception as e: + error_message = f"处理消息时发生错误: {str(e)}" + logger.error(error_message) + # 发送错误消息给用户 + self.message_helper.put( + message=error_message, + role="system", + title="MoviePilot助手错误" + ) + return error_message + + async def _execute_agent(self, input_context: Dict[str, Any]) -> Dict[str, Any]: + """执行LangChain Agent""" + try: + with get_openai_callback() as cb: + result = await self.agent_executor.ainvoke( + input_context, + config={"configurable": {"session_id": self.session_id}}, + callbacks=[self.callback_handler] + ) + logger.info(f"LLM调用消耗: \n{cb}") + + if cb.total_tokens > 0: + result["token_usage"] = { + "prompt_tokens": cb.prompt_tokens, + "completion_tokens": cb.completion_tokens, + "total_tokens": cb.total_tokens + } + return result + except asyncio.CancelledError: + logger.info(f"Agent执行被取消: session_id={self.session_id}") + return { + "output": "任务已取消", + "intermediate_steps": [], + "token_usage": {} + } + except Exception as e: + logger.error(f"Agent执行失败: {e}") + return { + "output": f"执行过程中发生错误: {str(e)}", + "intermediate_steps": [], + "token_usage": {} + } + + async def cleanup(self): + """清理智能体资源""" + if self.session_id in self.session_store: + del self.session_store[self.session_id] + logger.info(f"MoviePilot智能体已清理: session_id={self.session_id}") + + +class AgentManager: + """AI智能体管理器""" + + def __init__(self): + self.active_agents: Dict[str, MoviePilotAgent] = {} + self.memory_manager = ConversationMemoryManager() + + async def initialize(self): + """初始化管理器""" + await self.memory_manager.initialize() + + async def close(self): + """关闭管理器""" + await self.memory_manager.close() + # 清理所有活跃的智能体 + for agent in self.active_agents.values(): + await agent.cleanup() + self.active_agents.clear() + + async def process_message(self, session_id: str, user_id: str, message: str) -> str: + """处理用户消息""" + # 获取或创建Agent实例 + if session_id not in self.active_agents: + logger.info(f"创建新的AI智能体实例,session_id: {session_id}, user_id: {user_id}") + agent = MoviePilotAgent( + session_id=session_id, + user_id=user_id + ) + agent.memory_manager = self.memory_manager + self.active_agents[session_id] = agent + else: + agent = self.active_agents[session_id] + agent.user_id = user_id # 确保user_id是最新的 + + # 处理消息 + return await agent.process_message(message) + + async def clear_session(self, session_id: str, user_id: str): + """清空会话""" + if session_id in self.active_agents: + agent = self.active_agents[session_id] + await agent.cleanup() + del self.active_agents[session_id] + await self.memory_manager.clear_memory(session_id, user_id) + logger.info(f"会话 {session_id} 的记忆已清空") + + +# 全局智能体管理器实例 +agent_manager = AgentManager() diff --git a/app/agent/agent.py b/app/agent/agent.py deleted file mode 100644 index 5011f432..00000000 --- a/app/agent/agent.py +++ /dev/null @@ -1,354 +0,0 @@ -"""MoviePilot AI智能体实现""" - -import asyncio -import threading -from typing import Dict, List, Any - -from langchain.agents import AgentExecutor, create_openai_tools_agent -from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder -from langchain_community.callbacks import get_openai_callback -from langchain_core.callbacks import AsyncCallbackHandler -from langchain_core.chat_history import InMemoryChatMessageHistory -from langchain_core.messages import HumanMessage, AIMessage, ToolCall -from langchain_core.runnables.history import RunnableWithMessageHistory - -from app.agent.memory import ConversationMemoryManager -from app.agent.prompt import PromptManager -from app.agent.tools import MoviePilotToolFactory -from app.core.config import settings -from app.helper.message import MessageHelper -from app.log import logger - - -class StreamingCallbackHandler(AsyncCallbackHandler): - """流式输出回调处理器""" - - def __init__(self, session_id: str): - self._lock = threading.Lock() - self.session_id = session_id - self.current_message = "" - self.message_helper = MessageHelper() - - async def get_message(self): - """获取当前消息内容,获取后清空""" - with self._lock: - if not self.current_message: - return "" - msg = self.current_message - logger.info(f"Agent消息: {msg}") - self.current_message = "" - return msg - - async def on_llm_new_token(self, token: str, **kwargs): - """处理新的token""" - if not token: - return - with self._lock: - # 缓存当前消息 - self.current_message += token - - -class MoviePilotAgent: - """MoviePilot AI智能体""" - - def __init__(self, session_id: str, user_id: str = None): - self.session_id = session_id - self.user_id = user_id - - # 消息助手 - self.message_helper = MessageHelper() - - # 记忆管理器 - self.memory_manager = ConversationMemoryManager() - - # 提示词管理器 - self.prompt_manager = PromptManager() - - # 回调处理器 - self.callback_handler = StreamingCallbackHandler( - session_id=session_id - ) - - # LLM模型 - self.llm = self._initialize_llm() - - # 工具 - self.tools = self._initialize_tools() - - # 会话存储 - self.session_store = self._initialize_session_store() - - # 提示词模板 - self.prompt = self._initialize_prompt() - - # Agent执行器 - self.agent_executor = self._create_agent_executor() - - def _initialize_llm(self): - """初始化LLM模型""" - provider = settings.LLM_PROVIDER.lower() - api_key = settings.LLM_API_KEY - if not api_key: - raise ValueError("未配置 LLM_API_KEY") - - if provider == "google": - from langchain_google_genai import ChatGoogleGenerativeAI - return ChatGoogleGenerativeAI( - model=settings.LLM_MODEL, - google_api_key=api_key, - max_retries=3, - temperature=settings.LLM_TEMPERATURE, - streaming=True, - callbacks=[self.callback_handler] - ) - elif provider == "deepseek": - from langchain_deepseek import ChatDeepSeek - return ChatDeepSeek( - model=settings.LLM_MODEL, - api_key=api_key, - max_retries=3, - temperature=settings.LLM_TEMPERATURE, - streaming=True, - callbacks=[self.callback_handler], - stream_usage=True - ) - else: - from langchain_openai import ChatOpenAI - return ChatOpenAI( - model=settings.LLM_MODEL, - api_key=api_key, - max_retries=3, - base_url=settings.LLM_BASE_URL, - temperature=settings.LLM_TEMPERATURE, - streaming=True, - callbacks=[self.callback_handler], - stream_usage=True - ) - - def _initialize_tools(self) -> List: - """初始化工具列表""" - return MoviePilotToolFactory.create_tools( - session_id=self.session_id, - user_id=self.user_id, - message_helper=self.message_helper - ) - - @staticmethod - def _initialize_session_store() -> Dict[str, InMemoryChatMessageHistory]: - """初始化内存存储""" - return {} - - def get_session_history(self, session_id: str) -> InMemoryChatMessageHistory: - """获取会话历史""" - if session_id not in self.session_store: - chat_history = InMemoryChatMessageHistory() - messages: List[dict] = self.memory_manager.get_recent_messages_for_agent( - session_id=session_id, - user_id=self.user_id - ) - if messages: - for msg in messages: - if msg.get("role") == "user": - chat_history.add_user_message(HumanMessage(content=msg.get("content", ""))) - elif msg.get("role") == "agent": - chat_history.add_ai_message(AIMessage(content=msg.get("content", ""))) - elif msg.get("role") == "tool_call": - metadata = msg.get("metadata", {}) - chat_history.add_ai_message(AIMessage( - content=msg.get("content", ""), - tool_calls=[ToolCall( - id=metadata.get("call_id"), - name=metadata.get("tool_name"), - args=metadata.get("parameters"), - )] - )) - elif msg.get("role") == "tool_result": - chat_history.add_ai_message(AIMessage(content=msg.get("content", ""))) - elif msg.get("role") == "system": - chat_history.add_ai_message(AIMessage(content=msg.get("content", ""))) - self.session_store[session_id] = chat_history - return self.session_store[session_id] - - @staticmethod - def _initialize_prompt() -> ChatPromptTemplate: - """初始化提示词模板""" - try: - prompt = ChatPromptTemplate.from_messages([ - ("system", "{system_prompt}"), - MessagesPlaceholder(variable_name="chat_history"), - ("user", "{input}"), - MessagesPlaceholder(variable_name="agent_scratchpad"), - ]) - logger.info("LangChain提示词模板初始化成功") - return prompt - except Exception as e: - logger.error(f"初始化提示词失败: {e}") - raise e - - def _create_agent_executor(self) -> RunnableWithMessageHistory: - """创建Agent执行器""" - try: - agent = create_openai_tools_agent( - llm=self.llm, - tools=self.tools, - prompt=self.prompt - ) - executor = AgentExecutor( - agent=agent, - tools=self.tools, - verbose=settings.LLM_VERBOSE, - max_iterations=settings.LLM_MAX_ITERATIONS, - return_intermediate_steps=True, - handle_parsing_errors=True, - early_stopping_method="force" - ) - return RunnableWithMessageHistory( - executor, - self.get_session_history, - input_messages_key="input", - history_messages_key="chat_history" - ) - except Exception as e: - logger.error(f"创建Agent执行器失败: {e}") - raise e - - async def process_message(self, message: str) -> str: - """处理用户消息""" - try: - # 添加用户消息到记忆 - await self.memory_manager.add_memory( - self.session_id, - user_id=self.user_id, - role="user", - content=message - ) - - # 构建输入上下文 - input_context = { - "system_prompt": self.prompt_manager.get_agent_prompt(), - "input": message - } - - # 执行Agent - logger.info(f"Agent执行推理: session_id={self.session_id}, input={message}") - await self._execute_agent(input_context) - - # 获取Agent回复 - agent_message = await self.callback_handler.get_message() - - # 发送Agent回复给用户 - self.message_helper.put( - message=agent_message, - role="system" - ) - - # 添加Agent回复到记忆 - await self.memory_manager.add_memory( - session_id=self.session_id, - user_id=self.user_id, - role="agent", - content=agent_message - ) - - return agent_message - - except Exception as e: - error_message = f"处理消息时发生错误: {str(e)}" - logger.error(error_message) - # 发送错误消息给用户 - self.message_helper.put( - message=error_message, - role="system", - title="MoviePilot助手错误" - ) - return error_message - - async def _execute_agent(self, input_context: Dict[str, Any]) -> Dict[str, Any]: - """执行LangChain Agent""" - try: - with get_openai_callback() as cb: - result = await self.agent_executor.ainvoke( - input_context, - config={"configurable": {"session_id": self.session_id}}, - callbacks=[self.callback_handler] - ) - logger.info(f"LLM调用消耗: \n{cb}") - - if cb.total_tokens > 0: - result["token_usage"] = { - "prompt_tokens": cb.prompt_tokens, - "completion_tokens": cb.completion_tokens, - "total_tokens": cb.total_tokens - } - return result - except asyncio.CancelledError: - logger.info(f"Agent执行被取消: session_id={self.session_id}") - return { - "output": "任务已取消", - "intermediate_steps": [], - "token_usage": {} - } - except Exception as e: - logger.error(f"Agent执行失败: {e}") - return { - "output": f"执行过程中发生错误: {str(e)}", - "intermediate_steps": [], - "token_usage": {} - } - - async def cleanup(self): - """清理智能体资源""" - if self.session_id in self.session_store: - del self.session_store[self.session_id] - logger.info(f"MoviePilot智能体已清理: session_id={self.session_id}") - - -class AgentManager: - """AI智能体管理器""" - - def __init__(self): - self.active_agents: Dict[str, MoviePilotAgent] = {} - self.memory_manager = ConversationMemoryManager() - - async def initialize(self): - """初始化管理器""" - await self.memory_manager.initialize() - - async def close(self): - """关闭管理器""" - await self.memory_manager.close() - # 清理所有活跃的智能体 - for agent in self.active_agents.values(): - await agent.cleanup() - self.active_agents.clear() - - async def process_message(self, session_id: str, user_id: str, message: str) -> str: - """处理用户消息""" - # 获取或创建Agent实例 - if session_id not in self.active_agents: - logger.info(f"创建新的AI智能体实例,session_id: {session_id}, user_id: {user_id}") - agent = MoviePilotAgent( - session_id=session_id, - user_id=user_id - ) - agent.memory_manager = self.memory_manager - self.active_agents[session_id] = agent - else: - agent = self.active_agents[session_id] - agent.user_id = user_id # 确保user_id是最新的 - - # 处理消息 - return await agent.process_message(message) - - async def clear_session(self, session_id: str, user_id: str): - """清空会话""" - if session_id in self.active_agents: - agent = self.active_agents[session_id] - await agent.cleanup() - del self.active_agents[session_id] - await self.memory_manager.clear_memory(session_id, user_id) - logger.info(f"会话 {session_id} 的记忆已清空") - - -# 全局智能体管理器实例 -agent_manager = AgentManager() diff --git a/app/api/endpoints/workflow.py b/app/api/endpoints/workflow.py index 2dfdb6be..a6854b2a 100644 --- a/app/api/endpoints/workflow.py +++ b/app/api/endpoints/workflow.py @@ -11,7 +11,7 @@ from app.chain.workflow import WorkflowChain from app.core.config import global_vars from app.core.plugin import PluginManager from app.core.security import verify_token -from app.core.workflow import WorkFlowManager +from app.workflow import WorkFlowManager from app.db import get_async_db, get_db from app.db.models import Workflow from app.db.systemconfig_oper import SystemConfigOper diff --git a/app/chain/message.py b/app/chain/message.py index ed98b5d1..572fa8fc 100644 --- a/app/chain/message.py +++ b/app/chain/message.py @@ -847,7 +847,7 @@ class MessageChain(ChainBase): # 异步处理AI智能体请求 import asyncio - from app.agent.agent import agent_manager + from app.agent import agent_manager # 生成会话ID session_id = f"user_{userid}_{hash(user_message) % 10000}" diff --git a/app/chain/workflow.py b/app/chain/workflow.py index 5c5836da..1248205f 100644 --- a/app/chain/workflow.py +++ b/app/chain/workflow.py @@ -11,7 +11,7 @@ from pydantic.fields import Callable from app.chain import ChainBase from app.core.config import global_vars from app.core.event import Event, eventmanager -from app.core.workflow import WorkFlowManager +from app.workflow import WorkFlowManager from app.db.models import Workflow from app.db.workflow_oper import WorkflowOper from app.log import logger diff --git a/app/startup/agent_initializer.py b/app/startup/agent_initializer.py index 366f9b3c..35b6d34e 100644 --- a/app/startup/agent_initializer.py +++ b/app/startup/agent_initializer.py @@ -26,7 +26,7 @@ class AgentInitializer: logger.info("AI智能体功能未启用") return True - from app.agent.agent import agent_manager + from app.agent import agent_manager self.agent_manager = agent_manager await agent_manager.initialize() diff --git a/app/startup/workflow_initializer.py b/app/startup/workflow_initializer.py index 8ef0a37d..e29dab94 100644 --- a/app/startup/workflow_initializer.py +++ b/app/startup/workflow_initializer.py @@ -1,5 +1,4 @@ -from app.core.workflow import WorkFlowManager -from app.chain.workflow import WorkflowChain +from app.workflow import WorkFlowManager def init_workflow(): diff --git a/app/core/workflow.py b/app/workflow/__init__.py similarity index 99% rename from app/core/workflow.py rename to app/workflow/__init__.py index 48be9b29..1823f308 100644 --- a/app/core/workflow.py +++ b/app/workflow/__init__.py @@ -46,7 +46,7 @@ class WorkFlowManager(metaclass=Singleton): # 加载所有动作 self._actions = {} actions = ModuleHelper.load( - "app.actions", + "app.workflow.actions", filter_func=lambda _, obj: filter_func(obj) ) for action in actions: diff --git a/app/actions/__init__.py b/app/workflow/actions/__init__.py similarity index 100% rename from app/actions/__init__.py rename to app/workflow/actions/__init__.py diff --git a/app/actions/add_download.py b/app/workflow/actions/add_download.py similarity index 99% rename from app/actions/add_download.py rename to app/workflow/actions/add_download.py index aea4082f..425875a7 100644 --- a/app/actions/add_download.py +++ b/app/workflow/actions/add_download.py @@ -2,7 +2,7 @@ from typing import Optional from pydantic import Field -from app.actions import BaseAction +from app.workflow.actions import BaseAction from app.chain.download import DownloadChain from app.chain.media import MediaChain from app.core.config import global_vars diff --git a/app/actions/add_subscribe.py b/app/workflow/actions/add_subscribe.py similarity index 98% rename from app/actions/add_subscribe.py rename to app/workflow/actions/add_subscribe.py index 84d30548..2d022d76 100644 --- a/app/actions/add_subscribe.py +++ b/app/workflow/actions/add_subscribe.py @@ -1,4 +1,4 @@ -from app.actions import BaseAction +from app.workflow.actions import BaseAction from app.chain.subscribe import SubscribeChain from app.core.config import settings, global_vars from app.core.context import MediaInfo diff --git a/app/actions/fetch_downloads.py b/app/workflow/actions/fetch_downloads.py similarity index 97% rename from app/actions/fetch_downloads.py rename to app/workflow/actions/fetch_downloads.py index 1fe2f240..ab12a04f 100644 --- a/app/actions/fetch_downloads.py +++ b/app/workflow/actions/fetch_downloads.py @@ -1,4 +1,4 @@ -from app.actions import BaseAction, ActionChain +from app.workflow.actions import BaseAction, ActionChain from app.core.config import global_vars from app.schemas import ActionParams, ActionContext from app.log import logger diff --git a/app/actions/fetch_medias.py b/app/workflow/actions/fetch_medias.py similarity index 99% rename from app/actions/fetch_medias.py rename to app/workflow/actions/fetch_medias.py index a8532e70..174e022a 100644 --- a/app/actions/fetch_medias.py +++ b/app/workflow/actions/fetch_medias.py @@ -2,7 +2,7 @@ from typing import List, Optional from pydantic import Field -from app.actions import BaseAction +from app.workflow.actions import BaseAction from app.chain.recommend import RecommendChain from app.schemas import ActionParams, ActionContext from app.core.config import settings, global_vars diff --git a/app/actions/fetch_rss.py b/app/workflow/actions/fetch_rss.py similarity index 98% rename from app/actions/fetch_rss.py rename to app/workflow/actions/fetch_rss.py index 7d429e6b..a0b299fa 100644 --- a/app/actions/fetch_rss.py +++ b/app/workflow/actions/fetch_rss.py @@ -2,7 +2,7 @@ from typing import Optional from pydantic import Field -from app.actions import BaseAction, ActionChain +from app.workflow.actions import BaseAction, ActionChain from app.core.config import settings, global_vars from app.core.context import Context from app.core.metainfo import MetaInfo diff --git a/app/actions/fetch_torrents.py b/app/workflow/actions/fetch_torrents.py similarity index 98% rename from app/actions/fetch_torrents.py rename to app/workflow/actions/fetch_torrents.py index da3a6ff4..64f1db53 100644 --- a/app/actions/fetch_torrents.py +++ b/app/workflow/actions/fetch_torrents.py @@ -4,7 +4,7 @@ from typing import Optional, List from pydantic import Field -from app.actions import BaseAction +from app.workflow.actions import BaseAction from app.chain.search import SearchChain from app.core.config import global_vars from app.log import logger diff --git a/app/actions/filter_medias.py b/app/workflow/actions/filter_medias.py similarity index 97% rename from app/actions/filter_medias.py rename to app/workflow/actions/filter_medias.py index 7abddaf6..4f1ac90c 100644 --- a/app/actions/filter_medias.py +++ b/app/workflow/actions/filter_medias.py @@ -2,7 +2,7 @@ from typing import Optional from pydantic import Field -from app.actions import BaseAction +from app.workflow.actions import BaseAction from app.core.config import global_vars from app.log import logger from app.schemas import ActionParams, ActionContext diff --git a/app/actions/filter_torrents.py b/app/workflow/actions/filter_torrents.py similarity index 97% rename from app/actions/filter_torrents.py rename to app/workflow/actions/filter_torrents.py index 1aa37092..ed59443f 100644 --- a/app/actions/filter_torrents.py +++ b/app/workflow/actions/filter_torrents.py @@ -2,7 +2,7 @@ from typing import Optional, List from pydantic import Field -from app.actions import BaseAction, ActionChain +from app.workflow.actions import BaseAction, ActionChain from app.core.config import global_vars from app.helper.torrent import TorrentHelper from app.log import logger diff --git a/app/actions/invoke_plugin.py b/app/workflow/actions/invoke_plugin.py similarity index 98% rename from app/actions/invoke_plugin.py rename to app/workflow/actions/invoke_plugin.py index cc60df72..63b26c7e 100644 --- a/app/actions/invoke_plugin.py +++ b/app/workflow/actions/invoke_plugin.py @@ -1,6 +1,6 @@ from pydantic import Field -from app.actions import BaseAction +from app.workflow.actions import BaseAction from app.core.plugin import PluginManager from app.log import logger from app.schemas import ActionParams, ActionContext diff --git a/app/actions/note.py b/app/workflow/actions/note.py similarity index 92% rename from app/actions/note.py rename to app/workflow/actions/note.py index 49443905..13086757 100644 --- a/app/actions/note.py +++ b/app/workflow/actions/note.py @@ -1,4 +1,4 @@ -from app.actions import BaseAction +from app.workflow.actions import BaseAction from app.schemas import ActionContext diff --git a/app/actions/scan_file.py b/app/workflow/actions/scan_file.py similarity index 98% rename from app/actions/scan_file.py rename to app/workflow/actions/scan_file.py index 533b8cc5..859fcb49 100644 --- a/app/actions/scan_file.py +++ b/app/workflow/actions/scan_file.py @@ -3,7 +3,7 @@ from typing import Optional from pydantic import Field -from app.actions import BaseAction +from app.workflow.actions import BaseAction from app.chain.storage import StorageChain from app.core.config import global_vars, settings from app.log import logger diff --git a/app/actions/scrape_file.py b/app/workflow/actions/scrape_file.py similarity index 98% rename from app/actions/scrape_file.py rename to app/workflow/actions/scrape_file.py index 2b4152df..d0ed15ea 100644 --- a/app/actions/scrape_file.py +++ b/app/workflow/actions/scrape_file.py @@ -1,6 +1,6 @@ from pathlib import Path -from app.actions import BaseAction +from app.workflow.actions import BaseAction from app.core.config import global_vars from app.schemas import ActionParams, ActionContext from app.chain.media import MediaChain diff --git a/app/actions/send_event.py b/app/workflow/actions/send_event.py similarity index 96% rename from app/actions/send_event.py rename to app/workflow/actions/send_event.py index 2f4966a6..d71a5b57 100644 --- a/app/actions/send_event.py +++ b/app/workflow/actions/send_event.py @@ -1,4 +1,4 @@ -from app.actions import BaseAction +from app.workflow.actions import BaseAction from app.core.event import eventmanager from app.schemas import ActionParams, ActionContext from app.schemas.types import ChainEventType diff --git a/app/actions/send_message.py b/app/workflow/actions/send_message.py similarity index 97% rename from app/actions/send_message.py rename to app/workflow/actions/send_message.py index f3e218c0..4c046eb5 100644 --- a/app/actions/send_message.py +++ b/app/workflow/actions/send_message.py @@ -2,7 +2,7 @@ from typing import List, Optional, Union from pydantic import Field -from app.actions import BaseAction, ActionChain +from app.workflow.actions import BaseAction, ActionChain from app.schemas import ActionParams, ActionContext, Notification from app.core.config import settings diff --git a/app/actions/transfer_file.py b/app/workflow/actions/transfer_file.py similarity index 99% rename from app/actions/transfer_file.py rename to app/workflow/actions/transfer_file.py index 5963226c..e1cfaaa6 100644 --- a/app/actions/transfer_file.py +++ b/app/workflow/actions/transfer_file.py @@ -4,7 +4,7 @@ from typing import Optional from pydantic import Field -from app.actions import BaseAction +from app.workflow.actions import BaseAction from app.core.config import global_vars from app.db.transferhistory_oper import TransferHistoryOper from app.schemas import ActionParams, ActionContext