feat: add read_file tool for agent with line range and 50KB size limit support

Co-authored-by: jxxghp <51039935+jxxghp@users.noreply.github.com>
Agent-Logs-Url: https://github.com/jxxghp/MoviePilot/sessions/615dcf93-c017-4d3f-a96f-5cdad426b9a4
This commit is contained in:
copilot-swe-agent[bot]
2026-03-22 16:32:19 +00:00
parent d1503f9df3
commit bdcbb5cce6
2 changed files with 84 additions and 1 deletions

View File

@@ -43,6 +43,7 @@ from app.agent.tools.impl.transfer_file import TransferFileTool
from app.agent.tools.impl.execute_command import ExecuteCommandTool
from app.agent.tools.impl.edit_file import EditFileTool
from app.agent.tools.impl.write_file import WriteFileTool
from app.agent.tools.impl.read_file import ReadFileTool
from app.core.plugin import PluginManager
from app.log import logger
from .base import MoviePilotTool
@@ -104,7 +105,8 @@ class MoviePilotToolFactory:
RunWorkflowTool,
ExecuteCommandTool,
EditFileTool,
WriteFileTool
WriteFileTool,
ReadFileTool
]
# 创建内置工具
for ToolClass in tool_definitions:

View File

@@ -0,0 +1,81 @@
"""文件读取工具"""
from pathlib import Path
from typing import Optional, Type
from anyio import Path as AsyncPath
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.log import logger
# 最大读取大小 50KB
MAX_READ_SIZE = 50 * 1024
class ReadFileInput(BaseModel):
"""Input parameters for read file tool"""
file_path: str = Field(..., description="The absolute path of the file to read")
start_line: Optional[int] = Field(None, description="The starting line number (1-based, inclusive). If not provided, reading starts from the beginning of the file.")
end_line: Optional[int] = Field(None, description="The ending line number (1-based, inclusive). If not provided, reading goes until the end of the file.")
class ReadFileTool(MoviePilotTool):
name: str = "read_file"
description: str = "Read the content of a text file. Supports reading by line range. Each read is limited to 50KB; content exceeding this limit will be truncated."
args_schema: Type[BaseModel] = ReadFileInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据参数生成友好的提示消息"""
file_path = kwargs.get("file_path", "")
file_name = Path(file_path).name if file_path else "未知文件"
return f"正在读取文件: {file_name}"
async def run(self, file_path: str, start_line: Optional[int] = None,
end_line: Optional[int] = None, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: file_path={file_path}, start_line={start_line}, end_line={end_line}")
try:
path = AsyncPath(file_path)
if not await path.exists():
return f"错误:文件 {file_path} 不存在"
if not await path.is_file():
return f"错误:{file_path} 不是一个文件"
content = await path.read_text(encoding="utf-8")
truncated = False
if start_line is not None or end_line is not None:
lines = content.splitlines(keepends=True)
total_lines = len(lines)
# 将行号转换为索引1-based -> 0-based
s = (start_line - 1) if start_line and start_line >= 1 else 0
e = end_line if end_line and end_line >= 1 else total_lines
# 确保范围有效
s = max(0, min(s, total_lines))
e = max(s, min(e, total_lines))
content = "".join(lines[s:e])
# 检查大小限制
content_bytes = content.encode("utf-8")
if len(content_bytes) > MAX_READ_SIZE:
content = content_bytes[:MAX_READ_SIZE].decode("utf-8", errors="ignore")
truncated = True
if truncated:
return f"{content}\n\n[警告文件内容已超过50KB限制以上内容已被截断。请使用 start_line/end_line 参数分段读取。]"
return content
except PermissionError:
return f"错误:没有权限读取 {file_path}"
except UnicodeDecodeError:
return f"错误:{file_path} 不是文本文件,无法读取"
except Exception as e:
logger.error(f"读取文件 {file_path} 时发生错误: {str(e)}", exc_info=True)
return f"操作失败: {str(e)}"