From 91a124ab8fd635d440f81d63322ee08b3d67350e Mon Sep 17 00:00:00 2001 From: jxxghp Date: Mon, 25 Aug 2025 20:44:38 +0800 Subject: [PATCH] =?UTF-8?q?fix=20=E5=BC=82=E6=AD=A5=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/scheduler.py | 17 +++++++-- app/utils/asyncio.py | 83 -------------------------------------------- 2 files changed, 14 insertions(+), 86 deletions(-) delete mode 100644 app/utils/asyncio.py diff --git a/app/scheduler.py b/app/scheduler.py index 877b3e22..db0236f8 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -1,3 +1,4 @@ +import asyncio import inspect import threading import traceback @@ -28,7 +29,6 @@ from app.helper.wallpaper import WallpaperHelper from app.log import logger from app.schemas import Notification, NotificationType, Workflow, ConfigChangeEventData from app.schemas.types import EventType, SystemConfigKey -from app.utils.asyncio import AsyncUtils from app.utils.singleton import Singleton from app.utils.timer import TimerUtils @@ -449,6 +449,17 @@ class Scheduler(metaclass=Singleton): """ 启动定时服务 """ + + def __start_coro(coro): + """ + 启动协程 + """ + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = asyncio.get_event_loop() + return asyncio.run_coroutine_threadsafe(coro, loop) + # 获取定时任务 job = self.__prepare_job(job_id) if not job: @@ -461,7 +472,7 @@ class Scheduler(metaclass=Singleton): if not func: return if inspect.iscoroutinefunction(func): - AsyncUtils.run_async(func(*args, **kwargs)) + __start_coro(func(*args, **kwargs)) else: job["func"](*args, **kwargs) except Exception as e: @@ -565,7 +576,7 @@ class Scheduler(metaclass=Singleton): except JobLookupError: pass if job_removed: - logger.info(f"移除插件服务({plugin_name}):{service.get('name')}") + logger.info(f"移除插件服务({plugin_name}):{service.get('name')}") # noqa except Exception as e: logger.error(f"移除插件服务失败:{str(e)} - {job_id}: {service}") SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务移除失败", diff --git a/app/utils/asyncio.py b/app/utils/asyncio.py deleted file mode 100644 index 91ec6148..00000000 --- a/app/utils/asyncio.py +++ /dev/null @@ -1,83 +0,0 @@ -import asyncio -import threading -from concurrent.futures import ThreadPoolExecutor -from typing import Coroutine, Any, TypeVar - -T = TypeVar('T') - - -class AsyncUtils: - """ - 异步工具类,用于在同步环境中调用异步方法 - """ - - @staticmethod - def run_async(coro: Coroutine[Any, Any, T]) -> T: - """ - 在同步环境中安全地执行异步协程 - - :param coro: 要执行的协程 - :return: 协程的返回值 - :raises: 协程执行过程中的任何异常 - """ - try: - # 尝试获取当前运行的事件循环 - asyncio.get_running_loop() - # 如果有运行中的事件循环,在新线程中执行 - return AsyncUtils._run_in_thread(coro) - except RuntimeError: - # 没有运行中的事件循环,直接使用 asyncio.run - return asyncio.run(coro) - - @staticmethod - def _run_in_thread(coro: Coroutine[Any, Any, T]) -> T: - """ - 在新线程中创建事件循环并执行协程 - - :param coro: 要执行的协程 - :return: 协程的返回值 - """ - result = None - exception = None - - def _run(): - nonlocal result, exception - try: - # 在新线程中创建新的事件循环 - new_loop = asyncio.new_event_loop() - asyncio.set_event_loop(new_loop) - try: - result = new_loop.run_until_complete(coro) - finally: - new_loop.close() - except Exception as e: - exception = e - - # 在新线程中执行 - thread = threading.Thread(target=_run) - thread.start() - thread.join() - - if exception: - raise exception - - return result - - @staticmethod - def run_async_in_executor(coro: Coroutine[Any, Any, T]) -> T: - """ - 使用线程池执行器在新线程中运行异步协程 - - :param coro: 要执行的协程 - :return: 协程的返回值 - """ - try: - # 检查是否有运行中的事件循环 - asyncio.get_running_loop() - # 有运行中的事件循环,使用线程池 - with ThreadPoolExecutor() as executor: - future = executor.submit(asyncio.run, coro) - return future.result() - except RuntimeError: - # 没有运行中的事件循环,直接运行 - return asyncio.run(coro)