feat(telegram): 保持正在输入状态直到消息发送完成

This commit is contained in:
jxxghp
2026-03-29 09:04:42 +08:00
parent cdf40a7046
commit a1a72df6c6

View File

@@ -1,6 +1,7 @@
import asyncio
import re
import threading
import time
from typing import Optional, List, Dict, Callable, Union
from urllib.parse import urljoin, quote
@@ -39,6 +40,8 @@ class Telegram:
str, str
] = {} # userid -> chat_id mapping for reply targeting
_bot_username: Optional[str] = None # Bot username for mention detection
_typing_tasks: Dict[str, threading.Thread] = {} # chat_id -> typing任务
_typing_stop_flags: Dict[str, bool] = {} # chat_id -> 停止标志
def __init__(
self,
@@ -105,11 +108,8 @@ class Telegram:
# Check if we should process this message
if self._should_process_message(message):
# 发送正在输入状态
try:
_bot.send_chat_action(message.chat.id, "typing")
except Exception as err:
logger.error(f"发送Telegram正在输入状态失败{err}")
# 启动持续发送正在输入状态
self._start_typing_task(message.chat.id)
RequestUtils(timeout=15).post_res(self._ds_url, json=message.json)
@_bot.callback_query_handler(func=lambda call: True)
@@ -147,11 +147,8 @@ class Telegram:
# 先确认回调避免用户看到loading状态
_bot.answer_callback_query(call.id)
# 发送正在输入状态
try:
_bot.send_chat_action(call.message.chat.id, "typing")
except Exception as e:
logger.error(f"发送Telegram正在输入状态失败{e}")
# 启动持续发送正在输入状态
self._start_typing_task(call.message.chat.id)
# 发送给主程序处理
RequestUtils(timeout=15).post_res(self._ds_url, json=callback_json)
@@ -256,6 +253,47 @@ class Telegram:
"""
return self._bot is not None
def _start_typing_task(self, chat_id: Union[str, int]) -> None:
"""
启动持续发送正在输入状态的任务
"""
chat_id_str = str(chat_id)
# 如果已有任务在运行,先停止
if chat_id_str in self._typing_tasks:
self._stop_typing_task(chat_id_str)
# 设置停止标志
self._typing_stop_flags[chat_id_str] = False
def typing_worker():
"""定期发送typing状态的后台线程"""
while not self._typing_stop_flags.get(chat_id_str, True):
try:
if self._bot:
self._bot.send_chat_action(chat_id, "typing")
except Exception as e:
logger.debug(f"发送typing状态失败: {e}")
# 每5秒发送一次Telegram客户端会在约5-6秒后消失状态
for _ in range(50):
if self._typing_stop_flags.get(chat_id_str, True):
break
time.sleep(0.1)
thread = threading.Thread(target=typing_worker, daemon=True)
thread.start()
self._typing_tasks[chat_id_str] = thread
def _stop_typing_task(self, chat_id: Union[str, int]) -> None:
"""
停止正在输入状态的任务
"""
chat_id_str = str(chat_id)
self._typing_stop_flags[chat_id_str] = True
if chat_id_str in self._typing_tasks:
task = self._typing_tasks.pop(chat_id_str, None)
if task and task.is_alive():
task.join(timeout=1)
def send_msg(
self,
title: str,
@@ -317,6 +355,7 @@ class Telegram:
result = self.__edit_message(
original_chat_id, original_message_id, caption, buttons, image
)
self._stop_typing_task(chat_id)
return {
"success": bool(result),
"message_id": original_message_id,
@@ -330,6 +369,7 @@ class Telegram:
caption=caption,
reply_markup=reply_markup,
)
self._stop_typing_task(chat_id)
if sent and hasattr(sent, "message_id"):
return {
"success": True,
@@ -342,6 +382,7 @@ class Telegram:
except Exception as msg_e:
logger.error(f"发送消息失败:{msg_e}")
self._stop_typing_task(chat_id)
return {"success": False}
def _determine_target_chat_id(
@@ -838,6 +879,9 @@ class Telegram:
"""
停止Telegram消息接收服务
"""
# 停止所有typing任务
for chat_id in list(self._typing_tasks.keys()):
self._stop_typing_task(chat_id)
if self._bot:
self._bot.stop_polling()
self._polling_thread.join()