"""文件读取工具""" from pathlib import Path from typing import Optional, Type from anyio import Path as AsyncPath from pydantic import BaseModel, Field from app.agent.tools.base import MoviePilotTool from app.log import logger # 最大读取大小 50KB MAX_READ_SIZE = 50 * 1024 class ReadFileInput(BaseModel): """Input parameters for read file tool""" file_path: str = Field(..., description="The absolute path of the file to read") start_line: Optional[int] = Field(None, description="The starting line number (1-based, inclusive). If not provided, reading starts from the beginning of the file.") end_line: Optional[int] = Field(None, description="The ending line number (1-based, inclusive). If not provided, reading goes until the end of the file.") class ReadFileTool(MoviePilotTool): name: str = "read_file" description: str = "Read the content of a text file. Supports reading by line range. Each read is limited to 50KB; content exceeding this limit will be truncated." args_schema: Type[BaseModel] = ReadFileInput def get_tool_message(self, **kwargs) -> Optional[str]: """根据参数生成友好的提示消息""" file_path = kwargs.get("file_path", "") file_name = Path(file_path).name if file_path else "未知文件" return f"正在读取文件: {file_name}" async def run(self, file_path: str, start_line: Optional[int] = None, end_line: Optional[int] = None, **kwargs) -> str: logger.info(f"执行工具: {self.name}, 参数: file_path={file_path}, start_line={start_line}, end_line={end_line}") try: path = AsyncPath(file_path) if not await path.exists(): return f"错误:文件 {file_path} 不存在" if not await path.is_file(): return f"错误:{file_path} 不是一个文件" content = await path.read_text(encoding="utf-8") truncated = False if start_line is not None or end_line is not None: lines = content.splitlines(keepends=True) total_lines = len(lines) # 将行号转换为索引(1-based -> 0-based) s = (start_line - 1) if start_line and start_line >= 1 else 0 e = end_line if end_line and end_line >= 1 else total_lines # 确保范围有效 s = max(0, min(s, total_lines)) e = max(s, min(e, total_lines)) content = "".join(lines[s:e]) # 检查大小限制 content_bytes = content.encode("utf-8") if len(content_bytes) > MAX_READ_SIZE: content = content_bytes[:MAX_READ_SIZE].decode("utf-8", errors="ignore") truncated = True if truncated: return f"{content}\n\n[警告:文件内容已超过50KB限制,以上内容已被截断。请使用 start_line/end_line 参数分段读取。]" return content except PermissionError: return f"错误:没有权限读取 {file_path}" except UnicodeDecodeError: return f"错误:{file_path} 不是文本文件,无法读取" except Exception as e: logger.error(f"读取文件 {file_path} 时发生错误: {str(e)}", exc_info=True) return f"操作失败: {str(e)}"