diff --git a/app/agent/tools/impl/scrape_metadata.py b/app/agent/tools/impl/scrape_metadata.py index 86337c70..085d37cd 100644 --- a/app/agent/tools/impl/scrape_metadata.py +++ b/app/agent/tools/impl/scrape_metadata.py @@ -1,6 +1,5 @@ """刮削媒体元数据工具""" -import asyncio import json from pathlib import Path from typing import Optional, Type @@ -9,6 +8,7 @@ from pydantic import BaseModel, Field from app.agent.tools.base import MoviePilotTool from app.chain.media import MediaChain +from app.core.config import GlobalVar from app.core.metainfo import MetaInfoPath from app.log import logger from app.schemas import FileItem @@ -83,8 +83,7 @@ class ScrapeMetadataTool(MoviePilotTool): }, ensure_ascii=False) # 在线程池中执行同步的刮削操作 - loop = asyncio.get_event_loop() - await loop.run_in_executor( + await GlobalVar.CURRENT_EVENT_LOOP.run_in_executor( None, lambda: media_chain.scrape_metadata( fileitem=fileitem, diff --git a/app/agent/tools/impl/search_subscribe.py b/app/agent/tools/impl/search_subscribe.py index f9a7711c..9e601c16 100644 --- a/app/agent/tools/impl/search_subscribe.py +++ b/app/agent/tools/impl/search_subscribe.py @@ -1,6 +1,5 @@ """搜索订阅缺失剧集工具""" -import asyncio import json from typing import Optional, Type, List @@ -8,6 +7,7 @@ from pydantic import BaseModel, Field from app.agent.tools.base import MoviePilotTool from app.chain.subscribe import SubscribeChain +from app.core.config import GlobalVar from app.db.subscribe_oper import SubscribeOper from app.log import logger @@ -85,8 +85,7 @@ class SearchSubscribeTool(MoviePilotTool): # 在线程池中执行同步的搜索操作 # 当 sid 有值时,state 参数会被忽略,直接处理该订阅 - loop = asyncio.get_event_loop() - await loop.run_in_executor( + await GlobalVar.CURRENT_EVENT_LOOP.run_in_executor( None, lambda: subscribe_chain.search( sid=subscribe_id, diff --git a/app/chain/message.py b/app/chain/message.py index 4bf8300a..865c5c28 100644 --- a/app/chain/message.py +++ b/app/chain/message.py @@ -10,7 +10,7 @@ from app.chain.download import DownloadChain from app.chain.media import MediaChain from app.chain.search import SearchChain from app.chain.subscribe import SubscribeChain -from app.core.config import settings +from app.core.config import settings, GlobalVar from app.core.context import MediaInfo, Context from app.core.meta import MetaBase from app.db.user_oper import UserOper @@ -881,21 +881,12 @@ class MessageChain(ChainBase): # 如果有会话ID,同时清除智能体的会话记忆 if session_id: try: - try: - loop = asyncio.get_event_loop() - loop.run_until_complete( - agent_manager.clear_session( - session_id=session_id, - user_id=str(userid) - ) - ) - except RuntimeError: - asyncio.run( - agent_manager.clear_session( - session_id=session_id, - user_id=str(userid) - ) + GlobalVar.CURRENT_EVENT_LOOP.run_until_complete( + agent_manager.clear_session( + session_id=session_id, + user_id=str(userid) ) + ) except Exception as e: logger.warning(f"清除智能体会话记忆失败: {e}") @@ -958,8 +949,7 @@ class MessageChain(ChainBase): # 在事件循环中处理 try: - loop = asyncio.get_event_loop() - loop.run_until_complete( + GlobalVar.CURRENT_EVENT_LOOP.run_until_complete( agent_manager.process_message( session_id=session_id, user_id=str(userid), diff --git a/app/core/config.py b/app/core/config.py index e05a2c54..0f57536a 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -1,3 +1,4 @@ +import asyncio import copy import json import os @@ -6,6 +7,7 @@ import re import secrets import sys import threading +from asyncio import AbstractEventLoop from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Type from urllib.parse import urlparse @@ -852,6 +854,8 @@ class GlobalVar(object): EMERGENCY_STOP_WORKFLOWS: List[int] = [] # 需应急停止文件整理 EMERGENCY_STOP_TRANSFER: List[str] = [] + # 当前事件循环 + CURRENT_EVENT_LOOP: AbstractEventLoop = asyncio.get_event_loop() def stop_system(self): """ diff --git a/app/core/event.py b/app/core/event.py index d079cdc5..a4daeb66 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -11,6 +11,7 @@ from typing import Callable, Dict, List, Optional, Tuple, Union, Any from fastapi.concurrency import run_in_threadpool +from app.core.config import GlobalVar from app.helper.thread import ThreadHelper from app.log import logger from app.schemas import ChainEventData @@ -90,8 +91,6 @@ class EventManager(metaclass=Singleton): self.__lock = threading.Lock() # 退出事件 self.__event = threading.Event() - # 当前事件循环 - self.loop = asyncio.get_event_loop() def start(self): """ @@ -454,7 +453,7 @@ class EventManager(metaclass=Singleton): # 对于异步函数,直接在事件循环中运行 asyncio.run_coroutine_threadsafe( self.__safe_invoke_handler_async(handler, isolated_event), - self.loop + GlobalVar.CURRENT_EVENT_LOOP ) else: # 对于同步函数,在线程池中运行 diff --git a/app/scheduler.py b/app/scheduler.py index 1bbcd47f..3c6552b5 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -21,7 +21,7 @@ from app.chain.site import SiteChain from app.chain.subscribe import SubscribeChain from app.chain.transfer import TransferChain from app.chain.workflow import WorkflowChain -from app.core.config import settings +from app.core.config import settings, GlobalVar from app.core.event import eventmanager, Event from app.core.plugin import PluginManager from app.db.systemconfig_oper import SystemConfigOper @@ -60,8 +60,7 @@ class Scheduler(metaclass=SingletonClass): self._auth_count = 0 # 用户认证失败消息发送 self._auth_message = False - # 当前事件循环 - self.loop = asyncio.get_event_loop() + # 初始化 self.init() @eventmanager.register(EventType.ConfigChanged) @@ -475,7 +474,7 @@ class Scheduler(metaclass=SingletonClass): """ 启动协程 """ - return asyncio.run_coroutine_threadsafe(coro, self.loop) + return asyncio.run_coroutine_threadsafe(coro, GlobalVar.CURRENT_EVENT_LOOP) # 获取定时任务 job = self.__prepare_job(job_id) diff --git a/app/startup/plugins_initializer.py b/app/startup/plugins_initializer.py index 0c59f631..36ee1d61 100644 --- a/app/startup/plugins_initializer.py +++ b/app/startup/plugins_initializer.py @@ -1,5 +1,4 @@ -import asyncio - +from app.core.config import GlobalVar from app.core.plugin import PluginManager from app.log import logger @@ -9,7 +8,7 @@ async def sync_plugins() -> bool: 初始化安装插件,并动态注册后台任务及API """ try: - loop = asyncio.get_event_loop() + loop = GlobalVar.CURRENT_EVENT_LOOP plugin_manager = PluginManager() sync_result = await execute_task(loop, plugin_manager.sync, "插件同步到本地")