diff --git a/app/api/endpoints/workflow.py b/app/api/endpoints/workflow.py index b236e651..a1652b95 100644 --- a/app/api/endpoints/workflow.py +++ b/app/api/endpoints/workflow.py @@ -3,18 +3,19 @@ from datetime import datetime from typing import List, Any, Optional from fastapi import APIRouter, Depends +from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Session from app import schemas from app.chain.workflow import WorkflowChain from app.core.config import global_vars from app.core.plugin import PluginManager +from app.core.security import verify_token from app.core.workflow import WorkFlowManager -from app.db import get_db +from app.db import get_async_db, get_db from app.db.models import Workflow from app.db.systemconfig_oper import SystemConfigOper -from app.db.user_oper import get_current_active_user -from app.db.workflow_oper import WorkflowOper +from app.db.workflow_oper import AsyncWorkflowOper, WorkflowOper from app.helper.workflow import WorkflowHelper from app.scheduler import Scheduler from app.schemas.types import EventType, EVENT_TYPE_NAMES @@ -23,23 +24,22 @@ router = APIRouter() @router.get("/", summary="所有工作流", response_model=List[schemas.Workflow]) -def list_workflows(db: Session = Depends(get_db), - _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: +async def list_workflows(db: AsyncSession = Depends(get_async_db), + _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 获取工作流列表 """ - from app.db.workflow_oper import WorkflowOper - return WorkflowOper(db).list() + return await AsyncWorkflowOper(db).list() @router.post("/", summary="创建工作流", response_model=schemas.Response) -def create_workflow(workflow: schemas.Workflow, - db: Session = Depends(get_db), - _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: +async def create_workflow(workflow: schemas.Workflow, + db: AsyncSession = Depends(get_async_db), + _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 创建工作流 """ - if workflow.name and WorkflowOper(db).get_by_name(workflow.name): + if workflow.name and await AsyncWorkflowOper(db).get_by_name(workflow.name): return schemas.Response(success=False, message="已存在相同名称的工作流") if not workflow.add_time: workflow.add_time = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S") @@ -47,12 +47,13 @@ def create_workflow(workflow: schemas.Workflow, workflow.state = "P" if not workflow.trigger_type: workflow.trigger_type = "timer" - Workflow(**workflow.dict()).create(db) + workflow_obj = Workflow(**workflow.dict()) + await workflow_obj.async_create(db) return schemas.Response(success=True, message="创建工作流成功") @router.get("/plugin/actions", summary="查询插件动作", response_model=List[dict]) -def list_plugin_actions(plugin_id: str = None, _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: +def list_plugin_actions(plugin_id: str = None, _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 获取所有动作 """ @@ -60,7 +61,7 @@ def list_plugin_actions(plugin_id: str = None, _: schemas.TokenPayload = Depends @router.get("/actions", summary="所有动作", response_model=List[dict]) -def list_actions(_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: +async def list_actions(_: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 获取所有动作 """ @@ -68,7 +69,7 @@ def list_actions(_: schemas.TokenPayload = Depends(get_current_active_user)) -> @router.get("/event_types", summary="获取所有事件类型", response_model=List[dict]) -def get_event_types(_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: +async def get_event_types(_: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 获取所有事件类型 """ @@ -79,38 +80,38 @@ def get_event_types(_: schemas.TokenPayload = Depends(get_current_active_user)) @router.post("/share", summary="分享工作流", response_model=schemas.Response) -def workflow_share( +async def workflow_share( workflow: schemas.WorkflowShare, - _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 分享工作流 """ if not workflow.id or not workflow.share_title or not workflow.share_user: return schemas.Response(success=False, message="请填写工作流ID、分享标题和分享人") - state, errmsg = WorkflowHelper().workflow_share(workflow_id=workflow.id, - share_title=workflow.share_title or "", - share_comment=workflow.share_comment or "", - share_user=workflow.share_user or "") + state, errmsg = await WorkflowHelper().async_workflow_share(workflow_id=workflow.id, + share_title=workflow.share_title or "", + share_comment=workflow.share_comment or "", + share_user=workflow.share_user or "") return schemas.Response(success=state, message=errmsg) @router.delete("/share/{share_id}", summary="删除分享", response_model=schemas.Response) -def workflow_share_delete( +async def workflow_share_delete( share_id: int, - _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 删除分享 """ - state, errmsg = WorkflowHelper().share_delete(share_id=share_id) + state, errmsg = await WorkflowHelper().async_share_delete(share_id=share_id) return schemas.Response(success=state, message=errmsg) @router.post("/fork", summary="复用工作流", response_model=schemas.Response) -def workflow_fork( +async def workflow_fork( workflow: schemas.WorkflowShare, - db: Session = Depends(get_db), - _: schemas.User = Depends(get_current_active_user)) -> Any: + db: AsyncSession = Depends(get_async_db), + _: schemas.User = Depends(verify_token)) -> Any: """ 复用工作流 """ @@ -148,36 +149,40 @@ def workflow_fork( } # 检查名称是否重复 - if Workflow.get_by_name(db, workflow_dict["name"]): + workflow_oper = AsyncWorkflowOper(db) + if await workflow_oper.get_by_name(workflow_dict["name"]): return schemas.Response(success=False, message="已存在相同名称的工作流") # 创建新工作流 - workflow = Workflow(**workflow_dict) - workflow.create(db) + workflow_obj = Workflow(**workflow_dict) + await workflow_obj.async_create(db) + + # 获取工作流ID(在数据库会话有效时) + workflow = await workflow_oper.get_by_name(workflow_dict["name"]) # 更新复用次数 - if workflow.id: - WorkflowHelper().workflow_fork(share_id=workflow.id) + if workflow: + await WorkflowHelper().async_workflow_fork(share_id=workflow.id) return schemas.Response(success=True, message="复用成功") @router.get("/shares", summary="查询分享的工作流", response_model=List[schemas.WorkflowShare]) -def workflow_shares( +async def workflow_shares( name: Optional[str] = None, page: Optional[int] = 1, count: Optional[int] = 30, - _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 查询分享的工作流 """ - return WorkflowHelper().get_shares(name=name, page=page, count=count) + return await WorkflowHelper().async_get_shares(name=name, page=page, count=count) @router.post("/{workflow_id}/run", summary="执行工作流", response_model=schemas.Response) def run_workflow(workflow_id: int, from_begin: Optional[bool] = True, - _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 执行工作流 """ @@ -190,11 +195,10 @@ def run_workflow(workflow_id: int, @router.post("/{workflow_id}/start", summary="启用工作流", response_model=schemas.Response) def start_workflow(workflow_id: int, db: Session = Depends(get_db), - _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 启用工作流 """ - from app.db.workflow_oper import WorkflowOper workflow = WorkflowOper(db).get(workflow_id) if not workflow: return schemas.Response(success=False, message="工作流不存在") @@ -212,7 +216,7 @@ def start_workflow(workflow_id: int, @router.post("/{workflow_id}/pause", summary="停用工作流", response_model=schemas.Response) def pause_workflow(workflow_id: int, db: Session = Depends(get_db), - _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 停用工作流 """ @@ -234,53 +238,52 @@ def pause_workflow(workflow_id: int, @router.post("/{workflow_id}/reset", summary="重置工作流", response_model=schemas.Response) -def reset_workflow(workflow_id: int, - db: Session = Depends(get_db), - _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: +async def reset_workflow(workflow_id: int, + db: AsyncSession = Depends(get_async_db), + _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 重置工作流 """ - from app.db.workflow_oper import WorkflowOper - workflow = WorkflowOper(db).get(workflow_id) + workflow = await AsyncWorkflowOper(db).get(workflow_id) if not workflow: return schemas.Response(success=False, message="工作流不存在") # 停止工作流 global_vars.stop_workflow(workflow_id) # 重置工作流 - workflow.reset(db, workflow_id, reset_count=True) + await Workflow.async_reset(db, workflow_id, reset_count=True) # 删除缓存 SystemConfigOper().delete(f"WorkflowCache-{workflow_id}") return schemas.Response(success=True) @router.get("/{workflow_id}", summary="工作流详情", response_model=schemas.Workflow) -def get_workflow(workflow_id: int, - db: Session = Depends(get_db), - _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: +async def get_workflow(workflow_id: int, + db: AsyncSession = Depends(get_async_db), + _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 获取工作流详情 """ - from app.db.workflow_oper import WorkflowOper - return WorkflowOper(db).get(workflow_id) + return await AsyncWorkflowOper(db).get(workflow_id) @router.put("/{workflow_id}", summary="更新工作流", response_model=schemas.Response) def update_workflow(workflow: schemas.Workflow, db: Session = Depends(get_db), - _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 更新工作流 """ if not workflow.id: return schemas.Response(success=False, message="工作流ID不能为空") - wf = WorkflowOper(db).get(workflow.id) + workflow_oper = WorkflowOper(db) + wf = workflow_oper.get(workflow.id) if not wf: return schemas.Response(success=False, message="工作流不存在") if not wf.trigger_type: workflow.trigger_type = "timer" wf.update(db, workflow.dict()) # 更新后的工作流对象 - updated_workflow = wf.get(workflow.id) + updated_workflow = workflow_oper.get(workflow.id) # 更新定时任务 Scheduler().update_workflow_job(updated_workflow) # 更新事件注册 @@ -291,7 +294,7 @@ def update_workflow(workflow: schemas.Workflow, @router.delete("/{workflow_id}", summary="删除工作流", response_model=schemas.Response) def delete_workflow(workflow_id: int, db: Session = Depends(get_db), - _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 删除工作流 """ diff --git a/app/db/models/workflow.py b/app/db/models/workflow.py index 7770c26f..e1e7a0d0 100644 --- a/app/db/models/workflow.py +++ b/app/db/models/workflow.py @@ -1,9 +1,10 @@ from datetime import datetime from typing import Optional -from sqlalchemy import Column, Integer, JSON, Sequence, String, and_, or_ +from sqlalchemy import Column, Integer, JSON, Sequence, String, and_, or_, select +from sqlalchemy.ext.asyncio import AsyncSession -from app.db import Base, db_query, db_update +from app.db import Base, db_query, db_update, async_db_query, async_db_update class Workflow(Base): @@ -48,11 +49,23 @@ class Workflow(Base): def list(db): return db.query(Workflow).all() + @staticmethod + @async_db_query + async def async_list(db: AsyncSession): + result = await db.execute(select(Workflow)) + return result.scalars().all() + @staticmethod @db_query def get_enabled_workflows(db): return db.query(Workflow).filter(Workflow.state != 'P').all() + @staticmethod + @async_db_query + async def async_get_enabled_workflows(db: AsyncSession): + result = await db.execute(select(Workflow).where(Workflow.state != 'P')) + return result.scalars().all() + @staticmethod @db_query def get_timer_triggered_workflows(db): @@ -67,6 +80,21 @@ class Workflow(Base): ) ).all() + @staticmethod + @async_db_query + async def async_get_timer_triggered_workflows(db: AsyncSession): + """异步获取定时触发的工作流""" + result = await db.execute(select(Workflow).where( + and_( + or_( + Workflow.trigger_type == 'timer', + not Workflow.trigger_type + ), + Workflow.state != 'P' + ) + )) + return result.scalars().all() + @staticmethod @db_query def get_event_triggered_workflows(db): @@ -78,17 +106,42 @@ class Workflow(Base): ) ).all() + @staticmethod + @async_db_query + async def async_get_event_triggered_workflows(db: AsyncSession): + """异步获取事件触发的工作流""" + result = await db.execute(select(Workflow).where( + and_( + Workflow.trigger_type == 'event', + Workflow.state != 'P' + ) + )) + return result.scalars().all() + @staticmethod @db_query def get_by_name(db, name: str): return db.query(Workflow).filter(Workflow.name == name).first() + @staticmethod + @async_db_query + async def async_get_by_name(db: AsyncSession, name: str): + result = await db.execute(select(Workflow).where(Workflow.name == name)) + return result.scalars().first() + @staticmethod @db_update def update_state(db, wid: int, state: str): db.query(Workflow).filter(Workflow.id == wid).update({"state": state}) return True + @staticmethod + @async_db_update + async def async_update_state(db: AsyncSession, wid: int, state: str): + from sqlalchemy import update + await db.execute(update(Workflow).where(Workflow.id == wid).values(state=state)) + return True + @staticmethod @db_update def start(db, wid: int): @@ -97,6 +150,13 @@ class Workflow(Base): }) return True + @staticmethod + @async_db_update + async def async_start(db: AsyncSession, wid: int): + from sqlalchemy import update + await db.execute(update(Workflow).where(Workflow.id == wid).values(state='R')) + return True + @staticmethod @db_update def fail(db, wid: int, result: str): @@ -107,6 +167,19 @@ class Workflow(Base): }) return True + @staticmethod + @async_db_update + async def async_fail(db: AsyncSession, wid: int, result: str): + from sqlalchemy import update + await db.execute(update(Workflow).where( + and_(Workflow.id == wid, Workflow.state != "P") + ).values( + state='F', + result=result, + last_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S') + )) + return True + @staticmethod @db_update def success(db, wid: int, result: Optional[str] = None): @@ -118,6 +191,20 @@ class Workflow(Base): }) return True + @staticmethod + @async_db_update + async def async_success(db: AsyncSession, wid: int, result: Optional[str] = None): + from sqlalchemy import update + await db.execute(update(Workflow).where( + and_(Workflow.id == wid, Workflow.state != "P") + ).values( + state='S', + result=result, + run_count=Workflow.run_count + 1, + last_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S') + )) + return True + @staticmethod @db_update def reset(db, wid: int, reset_count: Optional[bool] = False): @@ -129,6 +216,18 @@ class Workflow(Base): }) return True + @staticmethod + @async_db_update + async def async_reset(db: AsyncSession, wid: int, reset_count: Optional[bool] = False): + from sqlalchemy import update + await db.execute(update(Workflow).where(Workflow.id == wid).values( + state='W', + result=None, + current_action=None, + run_count=0 if reset_count else Workflow.run_count, + )) + return True + @staticmethod @db_update def update_current_action(db, wid: int, action_id: str, context: dict): @@ -137,3 +236,18 @@ class Workflow(Base): "context": context }) return True + + @staticmethod + @async_db_update + async def async_update_current_action(db: AsyncSession, wid: int, action_id: str, context: dict): + from sqlalchemy import update + # 先获取当前current_action + result = await db.execute(select(Workflow.current_action).where(Workflow.id == wid)) + current_action = result.scalar() + new_current_action = current_action + f",{action_id}" if current_action else action_id + + await db.execute(update(Workflow).where(Workflow.id == wid).values( + current_action=new_current_action, + context=context + )) + return True diff --git a/app/db/workflow_oper.py b/app/db/workflow_oper.py index d71b99aa..66ea26c9 100644 --- a/app/db/workflow_oper.py +++ b/app/db/workflow_oper.py @@ -1,6 +1,6 @@ -from typing import List, Tuple, Optional +from typing import List, Tuple, Optional, Any, Coroutine, Sequence -from app.db import DbOper +from app.db import DbOper, AsyncDbOper from app.db.models.workflow import Workflow @@ -84,3 +84,27 @@ class WorkflowOper(DbOper): 重置 """ return Workflow.reset(self._db, wid, reset_count=reset_count) + + +class AsyncWorkflowOper(AsyncDbOper): + """ + 异步工作流管理 + """ + + async def get(self, wid: int) -> Workflow: + """ + 异步查询单个工作流 + """ + return await Workflow.async_get(self._db, wid) + + async def list(self) -> Coroutine[Any, Any, Sequence[Any]]: + """ + 异步获取所有工作流列表 + """ + return await Workflow.async_list(self._db) + + async def get_by_name(self, name: str) -> Workflow: + """ + 异步按名称获取工作流 + """ + return await Workflow.async_get_by_name(self._db, name) diff --git a/app/helper/workflow.py b/app/helper/workflow.py index e411eb74..3e9f7e40 100644 --- a/app/helper/workflow.py +++ b/app/helper/workflow.py @@ -3,9 +3,9 @@ from typing import List, Tuple, Optional from app.core.cache import cached, cache_backend from app.core.config import settings -from app.db.workflow_oper import WorkflowOper +from app.db.workflow_oper import WorkflowOper, AsyncWorkflowOper from app.log import logger -from app.utils.http import RequestUtils +from app.utils.http import RequestUtils, AsyncRequestUtils from app.utils.singleton import WeakSingleton from app.utils.system import SystemUtils @@ -35,7 +35,7 @@ class WorkflowHelper(metaclass=WeakSingleton): """ if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 return False, "当前没有开启工作流数据共享功能" - + # 获取工作流信息 workflow = WorkflowOper().get(workflow_id) if not workflow: @@ -51,7 +51,8 @@ class WorkflowHelper(metaclass=WeakSingleton): workflow_dict['flows'] = json.dumps(workflow_dict['flows'] or []) # 发送分享请求 - res = RequestUtils(proxies=settings.PROXY or {}, content_type="application/json", + res = RequestUtils(proxies=settings.PROXY or {}, + content_type="application/json", timeout=10).post(self._workflow_share, json={ "share_title": share_title, @@ -69,13 +70,55 @@ class WorkflowHelper(metaclass=WeakSingleton): else: return False, res.json().get("message") + async def async_workflow_share(self, workflow_id: int, + share_title: str, share_comment: str, share_user: str) -> Tuple[bool, str]: + """ + 异步分享工作流 + """ + if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 + return False, "当前没有开启工作流数据共享功能" + + # 获取工作流信息 + workflow = await AsyncWorkflowOper().get(workflow_id) + if not workflow: + return False, "工作流不存在" + + if not workflow.actions or not workflow.flows: + return False, "请分享有动作和流程的工作流" + + workflow_dict = workflow.to_dict() + workflow_dict.pop("id", None) + workflow_dict.pop("context", None) + workflow_dict['actions'] = json.dumps(workflow_dict['actions'] or []) + workflow_dict['flows'] = json.dumps(workflow_dict['flows'] or []) + + # 发送分享请求 + res = await AsyncRequestUtils(proxies=settings.PROXY or {}, + content_type="application/json", + timeout=10).post(self._workflow_share, + json={ + "share_title": share_title, + "share_comment": share_comment, + "share_user": share_user, + "share_uid": self._share_user_id, + **workflow_dict + }) + if res is None: + return False, "连接MoviePilot服务器失败" + if res.status_code == 200: + # 清除 get_shares 的缓存,以便实时看到结果 + cache_backend.clear(region=self._shares_cache_region) + return True, "" + else: + return False, res.json().get("message") + def share_delete(self, share_id: int) -> Tuple[bool, str]: """ 删除分享 """ if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 return False, "当前没有开启工作流数据共享功能" - + res = RequestUtils(proxies=settings.PROXY or {}, timeout=5).delete_res(f"{self._workflow_share}/{share_id}", params={"share_uid": self._share_user_id}) @@ -88,13 +131,32 @@ class WorkflowHelper(metaclass=WeakSingleton): else: return False, res.json().get("message") + async def async_share_delete(self, share_id: int) -> Tuple[bool, str]: + """ + 异步删除分享 + """ + if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 + return False, "当前没有开启工作流数据共享功能" + + res = await AsyncRequestUtils(proxies=settings.PROXY or {}, + timeout=5).delete_res(f"{self._workflow_share}/{share_id}", + params={"share_uid": self._share_user_id}) + if res is None: + return False, "连接MoviePilot服务器失败" + if res.status_code == 200: + # 清除 get_shares 的缓存,以便实时看到结果 + cache_backend.clear(region=self._shares_cache_region) + return True, "" + else: + return False, res.json().get("message") + def workflow_fork(self, share_id: int) -> Tuple[bool, str]: """ 复用分享的工作流 """ if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 return False, "当前没有开启工作流数据共享功能" - + res = RequestUtils(proxies=settings.PROXY or {}, timeout=5, headers={ "Content-Type": "application/json" }).get_res(self._workflow_fork % share_id) @@ -105,6 +167,25 @@ class WorkflowHelper(metaclass=WeakSingleton): else: return False, res.json().get("message") + async def async_workflow_fork(self, share_id: int) -> Tuple[bool, str]: + """ + 异步复用分享的工作流 + """ + if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 + return False, "当前没有开启工作流数据共享功能" + + res = await AsyncRequestUtils(proxies=settings.PROXY or {}, + timeout=5, + headers={ + "Content-Type": "application/json" + }).get_res(self._workflow_fork % share_id) + if res is None: + return False, "连接MoviePilot服务器失败" + if res.status_code == 200: + return True, "" + else: + return False, res.json().get("message") + @cached(region=_shares_cache_region, maxsize=1, skip_empty=True) def get_shares(self, name: Optional[str] = None, page: Optional[int] = 1, count: Optional[int] = 30) -> List[dict]: """ @@ -112,7 +193,7 @@ class WorkflowHelper(metaclass=WeakSingleton): """ if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 return [] - + res = RequestUtils(proxies=settings.PROXY or {}, timeout=15).get_res(self._workflow_shares, params={ "name": name, "page": page, @@ -122,6 +203,24 @@ class WorkflowHelper(metaclass=WeakSingleton): return res.json() return [] + @cached(region=_shares_cache_region, maxsize=1, skip_empty=True) + async def async_get_shares(self, name: Optional[str] = None, page: Optional[int] = 1, count: Optional[int] = 30) -> \ + List[dict]: + """ + 异步获取工作流分享数据 + """ + if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 + return [] + + res = await AsyncRequestUtils(proxies=settings.PROXY or {}, timeout=15).get_res(self._workflow_shares, params={ + "name": name, + "page": page, + "count": count + }) + if res and res.status_code == 200: + return res.json() + return [] + def get_user_uuid(self) -> str: """ 获取用户uuid @@ -129,4 +228,4 @@ class WorkflowHelper(metaclass=WeakSingleton): if not self._share_user_id: self._share_user_id = SystemUtils.generate_user_unique_id() logger.info(f"当前用户UUID: {self._share_user_id}") - return self._share_user_id or "" \ No newline at end of file + return self._share_user_id or ""