feat: 流式输出消息超长时自动分段发送,消息长度限制纳入渠道能力管理

This commit is contained in:
jxxghp
2026-03-26 01:56:11 +08:00
parent a6f16dcf8f
commit 4b4b808b76
2 changed files with 97 additions and 21 deletions

View File

@@ -29,6 +29,7 @@ class StreamingHandler:
3. 定时器周期性调用 _flush()
- 第一次有内容时发送新消息(通过 send_direct_message 获取 message_id
- 后续有新内容时编辑同一条消息(通过 edit_message
- 当消息长度接近渠道限制时,冻结当前消息并发送新消息继续输出
4. 工具调用时:
- 流式渠道:工具消息直接 emit() 追加到 buffer与 Agent 文字合并为同一条流式消息
- 非流式渠道:调用 take() 取出已积累的文字,与工具消息合并独立发送
@@ -49,6 +50,10 @@ class StreamingHandler:
self._message_response: Optional[MessageResponse] = None
# 已发送给用户的文本(用于追踪增量)
self._sent_text = ""
# 当前消息的起始偏移量buffer 中属于当前消息的起始位置)
self._msg_start_offset = 0
# 当前渠道的单条消息最大长度0 表示不限制)
self._max_message_length = 0
# 消息发送所需的上下文信息
self._channel: Optional[str] = None
self._source: Optional[str] = None
@@ -91,6 +96,7 @@ class StreamingHandler:
self._buffer = ""
self._sent_text = ""
self._message_response = None
self._msg_start_offset = 0
async def start_streaming(
self,
@@ -122,6 +128,16 @@ class StreamingHandler:
self._streaming_enabled = True
self._sent_text = ""
self._message_response = None
self._msg_start_offset = 0
# 从渠道能力中获取单条消息最大长度
try:
channel_enum = MessageChannel(self._channel)
self._max_message_length = ChannelCapabilityManager.get_max_message_length(
channel_enum
)
except (ValueError, KeyError):
self._max_message_length = 0
# 启动异步定时刷新任务
self._flush_task = asyncio.create_task(self._flush_loop())
@@ -148,16 +164,19 @@ class StreamingHandler:
# 检查是否所有缓冲内容都已发送
with self._lock:
# 当前消息的文本 = buffer 中从 _msg_start_offset 开始的部分
current_msg_text = self._buffer[self._msg_start_offset :]
all_sent = (
self._message_response is not None
and self._sent_text
and self._buffer == self._sent_text
and current_msg_text == self._sent_text
)
# 保留最终文本用于返回
final_text = self._sent_text if all_sent else ""
# 保留最终文本用于返回(返回完整 buffer 内容,包含所有分段消息)
final_text = self._buffer if all_sent else ""
# 重置状态
self._sent_text = ""
self._message_response = None
self._msg_start_offset = 0
if all_sent:
# 所有内容已通过流式发送,清空缓冲区
self._buffer = ""
@@ -208,9 +227,11 @@ class StreamingHandler:
将当前缓冲区内容刷新到用户消息
- 如果还没有发送过消息先发送一条新消息并记录message_id
- 如果已经发送过消息,编辑该消息为最新的完整内容
- 如果当前消息内容超过长度限制,冻结当前消息并发送新消息继续输出
"""
with self._lock:
current_text = self._buffer
# 当前消息的文本 = buffer 中从 _msg_start_offset 开始的部分
current_text = self._buffer[self._msg_start_offset :]
if not current_text or current_text == self._sent_text:
# 没有新内容需要刷新
return
@@ -243,25 +264,64 @@ class StreamingHandler:
)
self._streaming_enabled = False
else:
# 后续更新:编辑已有消息
try:
channel_enum = MessageChannel(self._channel)
except (ValueError, KeyError):
return
success = chain.edit_message(
channel=channel_enum,
source=self._message_response.source,
message_id=self._message_response.message_id,
chat_id=self._message_response.chat_id,
text=current_text,
title=self._title,
)
if success:
# 检查当前消息内容是否超过长度限制
if (
self._max_message_length
and len(current_text) > self._max_message_length
):
# 消息过长,冻结当前消息(保持最后一次成功编辑的内容)
# 将 offset 移动到已发送文本之后,开启新消息
logger.debug(
f"流式消息长度 {len(current_text)} 超过限制 {self._max_message_length},启用新消息"
)
with self._lock:
self._sent_text = current_text
self._msg_start_offset += len(self._sent_text)
current_text = self._buffer[self._msg_start_offset :]
self._message_response = None
self._sent_text = ""
# 如果偏移后还有新内容,立即发送为新消息
if current_text:
response = chain.send_direct_message(
Notification(
channel=self._channel,
source=self._source,
userid=self._user_id,
username=self._username,
title=self._title,
text=current_text,
)
)
if response and response.success and response.message_id:
self._message_response = response
with self._lock:
self._sent_text = current_text
logger.debug(
f"流式输出新消息已发送: message_id={response.message_id}"
)
else:
logger.debug("流式输出新消息发送失败,降级为非流式输出")
self._streaming_enabled = False
else:
logger.debug("流式输出消息编辑失败")
# 后续更新:编辑已有消息
try:
channel_enum = MessageChannel(self._channel)
except (ValueError, KeyError):
return
success = chain.edit_message(
channel=channel_enum,
source=self._message_response.source,
message_id=self._message_response.message_id,
chat_id=self._message_response.chat_id,
text=current_text,
title=self._title,
)
if success:
with self._lock:
self._sent_text = current_text
else:
logger.debug("流式输出消息编辑失败")
except Exception as e:
logger.error(f"流式输出刷新失败: {e}")

View File

@@ -194,6 +194,8 @@ class ChannelCapabilities:
max_buttons_per_row: int = 5
max_button_rows: int = 10
max_button_text_length: int = 30
# 单条消息最大长度0 表示不限制),用于流式输出时自动分段
max_message_length: int = 0
fallback_enabled: bool = True
@@ -219,6 +221,8 @@ class ChannelCapabilityManager:
max_buttons_per_row=4,
max_button_rows=10,
max_button_text_length=30,
# Telegram 文本消息限制 4096 字符,预留空间给 MarkdownV2 转义和标题
max_message_length=3500,
),
MessageChannel.Wechat: ChannelCapabilities(
channel=MessageChannel.Wechat,
@@ -244,6 +248,8 @@ class ChannelCapabilityManager:
max_buttons_per_row=3,
max_button_rows=8,
max_button_text_length=25,
# Slack 消息限制 40000 字符,预留空间给格式化
max_message_length=39000,
fallback_enabled=True,
),
MessageChannel.Discord: ChannelCapabilities(
@@ -260,6 +266,8 @@ class ChannelCapabilityManager:
max_buttons_per_row=5,
max_button_rows=5,
max_button_text_length=80,
# Discord 消息限制 2000 字符
max_message_length=1800,
fallback_enabled=True,
),
MessageChannel.SynologyChat: ChannelCapabilities(
@@ -376,6 +384,14 @@ class ChannelCapabilityManager:
channel_caps = cls.get_capabilities(channel)
return channel_caps.max_button_text_length if channel_caps else 20
@classmethod
def get_max_message_length(cls, channel: MessageChannel) -> int:
"""
获取单条消息最大长度0 表示不限制)
"""
channel_caps = cls.get_capabilities(channel)
return channel_caps.max_message_length if channel_caps else 0
@classmethod
def should_use_fallback(cls, channel: MessageChannel) -> bool:
"""