diff --git a/app/agent/callback/__init__.py b/app/agent/callback/__init__.py index c8d837e0..e2d8ae6f 100644 --- a/app/agent/callback/__init__.py +++ b/app/agent/callback/__init__.py @@ -29,6 +29,7 @@ class StreamingHandler: 3. 定时器周期性调用 _flush(): - 第一次有内容时发送新消息(通过 send_direct_message 获取 message_id) - 后续有新内容时编辑同一条消息(通过 edit_message) + - 当消息长度接近渠道限制时,冻结当前消息并发送新消息继续输出 4. 工具调用时: - 流式渠道:工具消息直接 emit() 追加到 buffer,与 Agent 文字合并为同一条流式消息 - 非流式渠道:调用 take() 取出已积累的文字,与工具消息合并独立发送 @@ -49,6 +50,10 @@ class StreamingHandler: self._message_response: Optional[MessageResponse] = None # 已发送给用户的文本(用于追踪增量) self._sent_text = "" + # 当前消息的起始偏移量(buffer 中属于当前消息的起始位置) + self._msg_start_offset = 0 + # 当前渠道的单条消息最大长度(0 表示不限制) + self._max_message_length = 0 # 消息发送所需的上下文信息 self._channel: Optional[str] = None self._source: Optional[str] = None @@ -91,6 +96,7 @@ class StreamingHandler: self._buffer = "" self._sent_text = "" self._message_response = None + self._msg_start_offset = 0 async def start_streaming( self, @@ -122,6 +128,16 @@ class StreamingHandler: self._streaming_enabled = True self._sent_text = "" self._message_response = None + self._msg_start_offset = 0 + + # 从渠道能力中获取单条消息最大长度 + try: + channel_enum = MessageChannel(self._channel) + self._max_message_length = ChannelCapabilityManager.get_max_message_length( + channel_enum + ) + except (ValueError, KeyError): + self._max_message_length = 0 # 启动异步定时刷新任务 self._flush_task = asyncio.create_task(self._flush_loop()) @@ -148,16 +164,19 @@ class StreamingHandler: # 检查是否所有缓冲内容都已发送 with self._lock: + # 当前消息的文本 = buffer 中从 _msg_start_offset 开始的部分 + current_msg_text = self._buffer[self._msg_start_offset :] all_sent = ( self._message_response is not None and self._sent_text - and self._buffer == self._sent_text + and current_msg_text == self._sent_text ) - # 保留最终文本用于返回 - final_text = self._sent_text if all_sent else "" + # 保留最终文本用于返回(返回完整 buffer 内容,包含所有分段消息) + final_text = self._buffer if all_sent else "" # 重置状态 self._sent_text = "" self._message_response = None + self._msg_start_offset = 0 if all_sent: # 所有内容已通过流式发送,清空缓冲区 self._buffer = "" @@ -208,9 +227,11 @@ class StreamingHandler: 将当前缓冲区内容刷新到用户消息 - 如果还没有发送过消息,先发送一条新消息并记录message_id - 如果已经发送过消息,编辑该消息为最新的完整内容 + - 如果当前消息内容超过长度限制,冻结当前消息并发送新消息继续输出 """ with self._lock: - current_text = self._buffer + # 当前消息的文本 = buffer 中从 _msg_start_offset 开始的部分 + current_text = self._buffer[self._msg_start_offset :] if not current_text or current_text == self._sent_text: # 没有新内容需要刷新 return @@ -243,25 +264,64 @@ class StreamingHandler: ) self._streaming_enabled = False else: - # 后续更新:编辑已有消息 - try: - channel_enum = MessageChannel(self._channel) - except (ValueError, KeyError): - return - - success = chain.edit_message( - channel=channel_enum, - source=self._message_response.source, - message_id=self._message_response.message_id, - chat_id=self._message_response.chat_id, - text=current_text, - title=self._title, - ) - if success: + # 检查当前消息内容是否超过长度限制 + if ( + self._max_message_length + and len(current_text) > self._max_message_length + ): + # 消息过长,冻结当前消息(保持最后一次成功编辑的内容) + # 将 offset 移动到已发送文本之后,开启新消息 + logger.debug( + f"流式消息长度 {len(current_text)} 超过限制 {self._max_message_length},启用新消息" + ) with self._lock: - self._sent_text = current_text + self._msg_start_offset += len(self._sent_text) + current_text = self._buffer[self._msg_start_offset :] + self._message_response = None + self._sent_text = "" + + # 如果偏移后还有新内容,立即发送为新消息 + if current_text: + response = chain.send_direct_message( + Notification( + channel=self._channel, + source=self._source, + userid=self._user_id, + username=self._username, + title=self._title, + text=current_text, + ) + ) + if response and response.success and response.message_id: + self._message_response = response + with self._lock: + self._sent_text = current_text + logger.debug( + f"流式输出新消息已发送: message_id={response.message_id}" + ) + else: + logger.debug("流式输出新消息发送失败,降级为非流式输出") + self._streaming_enabled = False else: - logger.debug("流式输出消息编辑失败") + # 后续更新:编辑已有消息 + try: + channel_enum = MessageChannel(self._channel) + except (ValueError, KeyError): + return + + success = chain.edit_message( + channel=channel_enum, + source=self._message_response.source, + message_id=self._message_response.message_id, + chat_id=self._message_response.chat_id, + text=current_text, + title=self._title, + ) + if success: + with self._lock: + self._sent_text = current_text + else: + logger.debug("流式输出消息编辑失败") except Exception as e: logger.error(f"流式输出刷新失败: {e}") diff --git a/app/schemas/message.py b/app/schemas/message.py index 18c3557e..17c54e6a 100644 --- a/app/schemas/message.py +++ b/app/schemas/message.py @@ -194,6 +194,8 @@ class ChannelCapabilities: max_buttons_per_row: int = 5 max_button_rows: int = 10 max_button_text_length: int = 30 + # 单条消息最大长度(0 表示不限制),用于流式输出时自动分段 + max_message_length: int = 0 fallback_enabled: bool = True @@ -219,6 +221,8 @@ class ChannelCapabilityManager: max_buttons_per_row=4, max_button_rows=10, max_button_text_length=30, + # Telegram 文本消息限制 4096 字符,预留空间给 MarkdownV2 转义和标题 + max_message_length=3500, ), MessageChannel.Wechat: ChannelCapabilities( channel=MessageChannel.Wechat, @@ -244,6 +248,8 @@ class ChannelCapabilityManager: max_buttons_per_row=3, max_button_rows=8, max_button_text_length=25, + # Slack 消息限制 40000 字符,预留空间给格式化 + max_message_length=39000, fallback_enabled=True, ), MessageChannel.Discord: ChannelCapabilities( @@ -260,6 +266,8 @@ class ChannelCapabilityManager: max_buttons_per_row=5, max_button_rows=5, max_button_text_length=80, + # Discord 消息限制 2000 字符 + max_message_length=1800, fallback_enabled=True, ), MessageChannel.SynologyChat: ChannelCapabilities( @@ -376,6 +384,14 @@ class ChannelCapabilityManager: channel_caps = cls.get_capabilities(channel) return channel_caps.max_button_text_length if channel_caps else 20 + @classmethod + def get_max_message_length(cls, channel: MessageChannel) -> int: + """ + 获取单条消息最大长度(0 表示不限制) + """ + channel_caps = cls.get_capabilities(channel) + return channel_caps.max_message_length if channel_caps else 0 + @classmethod def should_use_fallback(cls, channel: MessageChannel) -> bool: """