mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-03-31 09:21:04 +08:00
feat: 新增Agent浏览器操作工具(browse_webpage),支持通过Playwright控制浏览器进行网页交互
This commit is contained in:
@@ -45,6 +45,7 @@ from app.agent.tools.impl.execute_command import ExecuteCommandTool
|
||||
from app.agent.tools.impl.edit_file import EditFileTool
|
||||
from app.agent.tools.impl.write_file import WriteFileTool
|
||||
from app.agent.tools.impl.read_file import ReadFileTool
|
||||
from app.agent.tools.impl.browse_webpage import BrowseWebpageTool
|
||||
from app.core.plugin import PluginManager
|
||||
from app.log import logger
|
||||
from .base import MoviePilotTool
|
||||
@@ -56,9 +57,14 @@ class MoviePilotToolFactory:
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def create_tools(session_id: str, user_id: str,
|
||||
channel: str = None, source: str = None, username: str = None,
|
||||
stream_handler: Callable = None) -> List[MoviePilotTool]:
|
||||
def create_tools(
|
||||
session_id: str,
|
||||
user_id: str,
|
||||
channel: str = None,
|
||||
source: str = None,
|
||||
username: str = None,
|
||||
stream_handler: Callable = None,
|
||||
) -> List[MoviePilotTool]:
|
||||
"""
|
||||
创建MoviePilot工具列表
|
||||
"""
|
||||
@@ -108,18 +114,16 @@ class MoviePilotToolFactory:
|
||||
ExecuteCommandTool,
|
||||
EditFileTool,
|
||||
WriteFileTool,
|
||||
ReadFileTool
|
||||
ReadFileTool,
|
||||
BrowseWebpageTool,
|
||||
]
|
||||
# 创建内置工具
|
||||
for ToolClass in tool_definitions:
|
||||
tool = ToolClass(
|
||||
session_id=session_id,
|
||||
user_id=user_id
|
||||
)
|
||||
tool = ToolClass(session_id=session_id, user_id=user_id)
|
||||
tool.set_message_attr(channel=channel, source=source, username=username)
|
||||
tool.set_stream_handler(stream_handler=stream_handler)
|
||||
tools.append(tool)
|
||||
|
||||
|
||||
# 加载插件提供的工具
|
||||
plugin_tools_count = 0
|
||||
plugin_tools_info = PluginManager().get_plugin_agent_tools()
|
||||
@@ -131,24 +135,31 @@ class MoviePilotToolFactory:
|
||||
try:
|
||||
# 验证工具类是否继承自 MoviePilotTool
|
||||
if not issubclass(ToolClass, MoviePilotTool):
|
||||
logger.warning(f"插件 {plugin_name}({plugin_id}) 提供的工具类 {ToolClass.__name__} 未继承自 MoviePilotTool,已跳过")
|
||||
logger.warning(
|
||||
f"插件 {plugin_name}({plugin_id}) 提供的工具类 {ToolClass.__name__} 未继承自 MoviePilotTool,已跳过"
|
||||
)
|
||||
continue
|
||||
# 创建工具实例
|
||||
tool = ToolClass(
|
||||
session_id=session_id,
|
||||
user_id=user_id
|
||||
tool = ToolClass(session_id=session_id, user_id=user_id)
|
||||
tool.set_message_attr(
|
||||
channel=channel, source=source, username=username
|
||||
)
|
||||
tool.set_message_attr(channel=channel, source=source, username=username)
|
||||
tool.set_stream_handler(stream_handler=stream_handler)
|
||||
tools.append(tool)
|
||||
plugin_tools_count += 1
|
||||
logger.debug(f"成功加载插件 {plugin_name}({plugin_id}) 的工具: {ToolClass.__name__}")
|
||||
logger.debug(
|
||||
f"成功加载插件 {plugin_name}({plugin_id}) 的工具: {ToolClass.__name__}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"加载插件 {plugin_name}({plugin_id}) 的工具 {ToolClass.__name__} 失败: {str(e)}")
|
||||
|
||||
logger.error(
|
||||
f"加载插件 {plugin_name}({plugin_id}) 的工具 {ToolClass.__name__} 失败: {str(e)}"
|
||||
)
|
||||
|
||||
builtin_tools_count = len(tool_definitions)
|
||||
if plugin_tools_count > 0:
|
||||
logger.info(f"成功创建 {len(tools)} 个MoviePilot工具(内置工具: {builtin_tools_count} 个,插件工具: {plugin_tools_count} 个)")
|
||||
logger.info(
|
||||
f"成功创建 {len(tools)} 个MoviePilot工具(内置工具: {builtin_tools_count} 个,插件工具: {plugin_tools_count} 个)"
|
||||
)
|
||||
else:
|
||||
logger.info(f"成功创建 {len(tools)} 个MoviePilot工具")
|
||||
return tools
|
||||
|
||||
539
app/agent/tools/impl/browse_webpage.py
Normal file
539
app/agent/tools/impl/browse_webpage.py
Normal file
@@ -0,0 +1,539 @@
|
||||
"""浏览器操作工具 - 让Agent能够通过Playwright控制浏览器进行网页交互"""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import json
|
||||
from enum import Enum
|
||||
from typing import Optional, Type, List
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from app.agent.tools.base import MoviePilotTool
|
||||
from app.core.config import settings
|
||||
from app.log import logger
|
||||
|
||||
# 页面内容最大长度
|
||||
MAX_CONTENT_LENGTH = 8000
|
||||
# 默认超时时间(秒)
|
||||
DEFAULT_TIMEOUT = 30
|
||||
# 截图最大宽度
|
||||
SCREENSHOT_MAX_WIDTH = 1280
|
||||
# 截图最大高度
|
||||
SCREENSHOT_MAX_HEIGHT = 720
|
||||
|
||||
|
||||
class BrowserAction(str, Enum):
|
||||
"""浏览器操作类型"""
|
||||
|
||||
GOTO = "goto"
|
||||
GET_CONTENT = "get_content"
|
||||
SCREENSHOT = "screenshot"
|
||||
CLICK = "click"
|
||||
FILL = "fill"
|
||||
SELECT = "select"
|
||||
EVALUATE = "evaluate"
|
||||
WAIT = "wait"
|
||||
|
||||
|
||||
class BrowseWebpageInput(BaseModel):
|
||||
"""浏览器操作工具的输入参数模型"""
|
||||
|
||||
explanation: str = Field(
|
||||
...,
|
||||
description="Clear explanation of why this browser action is being performed",
|
||||
)
|
||||
action: str = Field(
|
||||
...,
|
||||
description=(
|
||||
"The browser action to perform. Available actions:\n"
|
||||
"- 'goto': Navigate to a URL, returns page title and text summary\n"
|
||||
"- 'get_content': Get current page content (text or HTML)\n"
|
||||
"- 'screenshot': Take a screenshot of the current page, returns base64 image\n"
|
||||
"- 'click': Click on an element specified by selector\n"
|
||||
"- 'fill': Fill text into an input element specified by selector\n"
|
||||
"- 'select': Select an option from a dropdown element\n"
|
||||
"- 'evaluate': Execute JavaScript code on the page and return the result\n"
|
||||
"- 'wait': Wait for an element to appear on the page"
|
||||
),
|
||||
)
|
||||
url: Optional[str] = Field(
|
||||
None, description="URL to navigate to (required for 'goto' action)"
|
||||
)
|
||||
selector: Optional[str] = Field(
|
||||
None,
|
||||
description="CSS selector or text selector for the target element (for 'click', 'fill', 'select', 'wait' actions). "
|
||||
"Supports CSS selectors like '#id', '.class', 'tag', and Playwright text selectors like 'text=Click me'",
|
||||
)
|
||||
value: Optional[str] = Field(
|
||||
None,
|
||||
description="Value to fill into input or option value to select (for 'fill' and 'select' actions)",
|
||||
)
|
||||
script: Optional[str] = Field(
|
||||
None,
|
||||
description="JavaScript code to execute on the page (for 'evaluate' action). "
|
||||
"The script should return a value that can be serialized to JSON.",
|
||||
)
|
||||
content_type: Optional[str] = Field(
|
||||
"text",
|
||||
description="Content type for 'get_content' action: 'text' for readable text, 'html' for raw HTML",
|
||||
)
|
||||
timeout: Optional[int] = Field(
|
||||
DEFAULT_TIMEOUT, description="Timeout in seconds for the action (default: 30)"
|
||||
)
|
||||
cookies: Optional[str] = Field(
|
||||
None,
|
||||
description="Cookies to set for the browser context, format: 'name1=value1; name2=value2'",
|
||||
)
|
||||
user_agent: Optional[str] = Field(
|
||||
None, description="Custom User-Agent string for the browser context"
|
||||
)
|
||||
|
||||
|
||||
class BrowseWebpageTool(MoviePilotTool):
|
||||
name: str = "browse_webpage"
|
||||
description: str = (
|
||||
"Control a real browser (Playwright) to interact with web pages. "
|
||||
"Supports navigating to URLs, reading page content, taking screenshots, "
|
||||
"clicking elements, filling forms, selecting dropdown options, executing JavaScript, and waiting for elements. "
|
||||
"Use this tool when you need to interact with dynamic web pages, "
|
||||
"fill in forms, click buttons, or extract content from JavaScript-rendered pages. "
|
||||
"The browser session persists across multiple calls within the same conversation - "
|
||||
"first call 'goto' to open a page, then use other actions to interact with it."
|
||||
)
|
||||
args_schema: Type[BaseModel] = BrowseWebpageInput
|
||||
|
||||
def get_tool_message(self, **kwargs) -> Optional[str]:
|
||||
"""根据操作类型生成友好的提示消息"""
|
||||
action = kwargs.get("action", "")
|
||||
url = kwargs.get("url", "")
|
||||
selector = kwargs.get("selector", "")
|
||||
action_messages = {
|
||||
"goto": f"正在打开网页: {url}",
|
||||
"get_content": "正在获取页面内容",
|
||||
"screenshot": "正在截取页面截图",
|
||||
"click": f"正在点击元素: {selector}",
|
||||
"fill": f"正在填写表单: {selector}",
|
||||
"select": f"正在选择选项: {selector}",
|
||||
"evaluate": "正在执行 JavaScript",
|
||||
"wait": f"正在等待元素: {selector}",
|
||||
}
|
||||
return action_messages.get(action, f"正在执行浏览器操作: {action}")
|
||||
|
||||
async def run(
|
||||
self,
|
||||
action: str,
|
||||
url: Optional[str] = None,
|
||||
selector: Optional[str] = None,
|
||||
value: Optional[str] = None,
|
||||
script: Optional[str] = None,
|
||||
content_type: Optional[str] = "text",
|
||||
timeout: Optional[int] = DEFAULT_TIMEOUT,
|
||||
cookies: Optional[str] = None,
|
||||
user_agent: Optional[str] = None,
|
||||
**kwargs,
|
||||
) -> str:
|
||||
"""执行浏览器操作"""
|
||||
logger.info(
|
||||
f"执行工具: {self.name}, 动作: {action}, URL: {url}, 选择器: {selector}"
|
||||
)
|
||||
|
||||
try:
|
||||
# 验证操作类型
|
||||
try:
|
||||
browser_action = BrowserAction(action)
|
||||
except ValueError:
|
||||
valid_actions = ", ".join([a.value for a in BrowserAction])
|
||||
return f"错误: 不支持的操作类型 '{action}',支持的操作: {valid_actions}"
|
||||
|
||||
# 参数校验
|
||||
if browser_action == BrowserAction.GOTO and not url:
|
||||
return "错误: 'goto' 操作需要提供 url 参数"
|
||||
if (
|
||||
browser_action
|
||||
in (
|
||||
BrowserAction.CLICK,
|
||||
BrowserAction.FILL,
|
||||
BrowserAction.SELECT,
|
||||
BrowserAction.WAIT,
|
||||
)
|
||||
and not selector
|
||||
):
|
||||
return f"错误: '{action}' 操作需要提供 selector 参数"
|
||||
if browser_action == BrowserAction.FILL and value is None:
|
||||
return "错误: 'fill' 操作需要提供 value 参数"
|
||||
if browser_action == BrowserAction.EVALUATE and not script:
|
||||
return "错误: 'evaluate' 操作需要提供 script 参数"
|
||||
|
||||
# 在线程池中运行同步的 Playwright 操作
|
||||
loop = asyncio.get_running_loop()
|
||||
result = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: self._execute_browser_action(
|
||||
browser_action=browser_action,
|
||||
url=url,
|
||||
selector=selector,
|
||||
value=value,
|
||||
script=script,
|
||||
content_type=content_type,
|
||||
timeout=timeout,
|
||||
cookies=cookies,
|
||||
user_agent=user_agent,
|
||||
),
|
||||
)
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"浏览器操作失败: {e}", exc_info=True)
|
||||
return f"浏览器操作失败: {str(e)}"
|
||||
|
||||
def _execute_browser_action(
|
||||
self,
|
||||
browser_action: BrowserAction,
|
||||
url: Optional[str],
|
||||
selector: Optional[str],
|
||||
value: Optional[str],
|
||||
script: Optional[str],
|
||||
content_type: Optional[str],
|
||||
timeout: int,
|
||||
cookies: Optional[str],
|
||||
user_agent: Optional[str],
|
||||
) -> str:
|
||||
"""在同步上下文中执行 Playwright 浏览器操作"""
|
||||
from playwright.sync_api import sync_playwright
|
||||
|
||||
try:
|
||||
with sync_playwright() as playwright:
|
||||
browser = None
|
||||
context = None
|
||||
page = None
|
||||
try:
|
||||
# 启动浏览器
|
||||
browser_type = settings.PLAYWRIGHT_BROWSER_TYPE or "chromium"
|
||||
browser = playwright[browser_type].launch(headless=True)
|
||||
|
||||
# 创建上下文
|
||||
context_kwargs = {}
|
||||
if user_agent:
|
||||
context_kwargs["user_agent"] = user_agent
|
||||
# 设置视口大小
|
||||
context_kwargs["viewport"] = {
|
||||
"width": SCREENSHOT_MAX_WIDTH,
|
||||
"height": SCREENSHOT_MAX_HEIGHT,
|
||||
}
|
||||
|
||||
context = browser.new_context(**context_kwargs)
|
||||
page = context.new_page()
|
||||
page.set_default_timeout(timeout * 1000)
|
||||
|
||||
# 设置 cookies
|
||||
if cookies:
|
||||
page.set_extra_http_headers({"cookie": cookies})
|
||||
|
||||
# 对于非 goto 操作,如果提供了 url 先导航
|
||||
if url and browser_action != BrowserAction.GOTO:
|
||||
page.goto(
|
||||
url, wait_until="domcontentloaded", timeout=timeout * 1000
|
||||
)
|
||||
page.wait_for_load_state("networkidle", timeout=timeout * 1000)
|
||||
|
||||
# 执行具体操作
|
||||
result = self._do_action(
|
||||
page,
|
||||
browser_action,
|
||||
url,
|
||||
selector,
|
||||
value,
|
||||
script,
|
||||
content_type,
|
||||
timeout,
|
||||
)
|
||||
return result
|
||||
|
||||
finally:
|
||||
if page:
|
||||
page.close()
|
||||
if context:
|
||||
context.close()
|
||||
if browser:
|
||||
browser.close()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Playwright 执行失败: {e}", exc_info=True)
|
||||
return f"Playwright 执行失败: {str(e)}"
|
||||
|
||||
def _do_action(
|
||||
self,
|
||||
page,
|
||||
browser_action: BrowserAction,
|
||||
url: Optional[str],
|
||||
selector: Optional[str],
|
||||
value: Optional[str],
|
||||
script: Optional[str],
|
||||
content_type: Optional[str],
|
||||
timeout: int,
|
||||
) -> str:
|
||||
"""执行具体的浏览器操作"""
|
||||
|
||||
if browser_action == BrowserAction.GOTO:
|
||||
return self._action_goto(page, url, timeout)
|
||||
|
||||
elif browser_action == BrowserAction.GET_CONTENT:
|
||||
return self._action_get_content(page, content_type)
|
||||
|
||||
elif browser_action == BrowserAction.SCREENSHOT:
|
||||
return self._action_screenshot(page)
|
||||
|
||||
elif browser_action == BrowserAction.CLICK:
|
||||
return self._action_click(page, selector, timeout)
|
||||
|
||||
elif browser_action == BrowserAction.FILL:
|
||||
return self._action_fill(page, selector, value, timeout)
|
||||
|
||||
elif browser_action == BrowserAction.SELECT:
|
||||
return self._action_select(page, selector, value, timeout)
|
||||
|
||||
elif browser_action == BrowserAction.EVALUATE:
|
||||
return self._action_evaluate(page, script)
|
||||
|
||||
elif browser_action == BrowserAction.WAIT:
|
||||
return self._action_wait(page, selector, timeout)
|
||||
|
||||
return f"未知操作: {browser_action}"
|
||||
|
||||
@staticmethod
|
||||
def _action_goto(page, url: str, timeout: int) -> str:
|
||||
"""导航到URL"""
|
||||
response = page.goto(url, wait_until="domcontentloaded", timeout=timeout * 1000)
|
||||
try:
|
||||
page.wait_for_load_state("networkidle", timeout=min(timeout, 15) * 1000)
|
||||
except Exception:
|
||||
# networkidle 超时不是致命错误,页面可能已经可用
|
||||
pass
|
||||
|
||||
status = response.status if response else "unknown"
|
||||
title = page.title()
|
||||
page_url = page.url
|
||||
|
||||
# 提取页面可读文本摘要
|
||||
text_content = page.inner_text("body")
|
||||
if text_content and len(text_content) > MAX_CONTENT_LENGTH:
|
||||
text_content = text_content[:MAX_CONTENT_LENGTH] + "\n\n...(内容已截断)"
|
||||
|
||||
# 提取页面链接
|
||||
links = page.evaluate("""
|
||||
() => {
|
||||
const links = [];
|
||||
document.querySelectorAll('a[href]').forEach(a => {
|
||||
const text = a.innerText.trim();
|
||||
const href = a.href;
|
||||
if (text && href && !href.startsWith('javascript:')) {
|
||||
links.push({text: text.substring(0, 80), href: href});
|
||||
}
|
||||
});
|
||||
return links.slice(0, 30);
|
||||
}
|
||||
""")
|
||||
|
||||
# 提取表单信息
|
||||
forms = page.evaluate("""
|
||||
() => {
|
||||
const forms = [];
|
||||
document.querySelectorAll('input, textarea, select, button').forEach(el => {
|
||||
const info = {
|
||||
tag: el.tagName.toLowerCase(),
|
||||
type: el.type || '',
|
||||
name: el.name || '',
|
||||
id: el.id || '',
|
||||
placeholder: el.placeholder || '',
|
||||
value: el.tagName.toLowerCase() === 'select' ? '' : (el.value || '').substring(0, 50),
|
||||
text: el.innerText ? el.innerText.trim().substring(0, 50) : ''
|
||||
};
|
||||
// 只保留有标识信息的元素
|
||||
if (info.name || info.id || info.placeholder || info.text) {
|
||||
forms.push(info);
|
||||
}
|
||||
});
|
||||
return forms.slice(0, 30);
|
||||
}
|
||||
""")
|
||||
|
||||
result = {
|
||||
"status": status,
|
||||
"url": page_url,
|
||||
"title": title,
|
||||
"text_content": text_content,
|
||||
}
|
||||
if links:
|
||||
result["links"] = links
|
||||
if forms:
|
||||
result["form_elements"] = forms
|
||||
|
||||
return json.dumps(result, ensure_ascii=False, indent=2)
|
||||
|
||||
@staticmethod
|
||||
def _action_get_content(page, content_type: Optional[str]) -> str:
|
||||
"""获取页面内容"""
|
||||
title = page.title()
|
||||
page_url = page.url
|
||||
|
||||
if content_type == "html":
|
||||
content = page.content()
|
||||
else:
|
||||
content = page.inner_text("body")
|
||||
|
||||
if content and len(content) > MAX_CONTENT_LENGTH:
|
||||
content = content[:MAX_CONTENT_LENGTH] + "\n\n...(内容已截断)"
|
||||
|
||||
result = {
|
||||
"url": page_url,
|
||||
"title": title,
|
||||
"content_type": content_type,
|
||||
"content": content,
|
||||
}
|
||||
return json.dumps(result, ensure_ascii=False, indent=2)
|
||||
|
||||
@staticmethod
|
||||
def _action_screenshot(page) -> str:
|
||||
"""截取页面截图"""
|
||||
screenshot_bytes = page.screenshot(
|
||||
full_page=False,
|
||||
type="jpeg",
|
||||
quality=60,
|
||||
)
|
||||
screenshot_b64 = base64.b64encode(screenshot_bytes).decode("utf-8")
|
||||
|
||||
# 限制截图大小(base64编码后大约增大33%)
|
||||
max_b64_size = 200 * 1024 # ~150KB 原始图片
|
||||
if len(screenshot_b64) > max_b64_size:
|
||||
# 降低质量重新截图
|
||||
screenshot_bytes = page.screenshot(
|
||||
full_page=False,
|
||||
type="jpeg",
|
||||
quality=30,
|
||||
)
|
||||
screenshot_b64 = base64.b64encode(screenshot_bytes).decode("utf-8")
|
||||
|
||||
title = page.title()
|
||||
page_url = page.url
|
||||
|
||||
result = {
|
||||
"url": page_url,
|
||||
"title": title,
|
||||
"screenshot_base64": screenshot_b64,
|
||||
"format": "jpeg",
|
||||
"note": "截图已以 base64 编码返回",
|
||||
}
|
||||
return json.dumps(result, ensure_ascii=False, indent=2)
|
||||
|
||||
@staticmethod
|
||||
def _action_click(page, selector: str, timeout: int) -> str:
|
||||
"""点击元素"""
|
||||
page.click(selector, timeout=timeout * 1000)
|
||||
|
||||
# 等待可能的页面变化
|
||||
try:
|
||||
page.wait_for_load_state("networkidle", timeout=5000)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
title = page.title()
|
||||
page_url = page.url
|
||||
|
||||
return json.dumps(
|
||||
{
|
||||
"success": True,
|
||||
"message": f"成功点击元素: {selector}",
|
||||
"current_url": page_url,
|
||||
"current_title": title,
|
||||
},
|
||||
ensure_ascii=False,
|
||||
indent=2,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _action_fill(page, selector: str, value: str, timeout: int) -> str:
|
||||
"""填写表单"""
|
||||
page.fill(selector, value, timeout=timeout * 1000)
|
||||
|
||||
return json.dumps(
|
||||
{
|
||||
"success": True,
|
||||
"message": f"成功填写元素 '{selector}' 的值为 '{value}'",
|
||||
},
|
||||
ensure_ascii=False,
|
||||
indent=2,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _action_select(page, selector: str, value: Optional[str], timeout: int) -> str:
|
||||
"""选择下拉选项"""
|
||||
if value:
|
||||
page.select_option(selector, value=value, timeout=timeout * 1000)
|
||||
else:
|
||||
return "错误: 'select' 操作需要提供 value 参数"
|
||||
|
||||
return json.dumps(
|
||||
{
|
||||
"success": True,
|
||||
"message": f"成功选择元素 '{selector}' 的选项 '{value}'",
|
||||
},
|
||||
ensure_ascii=False,
|
||||
indent=2,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _action_evaluate(page, script: str) -> str:
|
||||
"""执行 JavaScript"""
|
||||
result = page.evaluate(script)
|
||||
|
||||
# 格式化结果
|
||||
if result is None:
|
||||
formatted = "null"
|
||||
elif isinstance(result, (dict, list)):
|
||||
formatted = json.dumps(result, ensure_ascii=False, indent=2)
|
||||
else:
|
||||
formatted = str(result)
|
||||
|
||||
# 限制结果长度
|
||||
if len(formatted) > MAX_CONTENT_LENGTH:
|
||||
formatted = formatted[:MAX_CONTENT_LENGTH] + "\n\n...(结果已截断)"
|
||||
|
||||
return json.dumps(
|
||||
{
|
||||
"success": True,
|
||||
"result": formatted,
|
||||
},
|
||||
ensure_ascii=False,
|
||||
indent=2,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _action_wait(page, selector: str, timeout: int) -> str:
|
||||
"""等待元素出现"""
|
||||
element = page.wait_for_selector(selector, timeout=timeout * 1000)
|
||||
|
||||
if element:
|
||||
visible = element.is_visible()
|
||||
text = element.inner_text()
|
||||
if text and len(text) > 200:
|
||||
text = text[:200] + "..."
|
||||
|
||||
return json.dumps(
|
||||
{
|
||||
"success": True,
|
||||
"message": f"元素 '{selector}' 已出现",
|
||||
"visible": visible,
|
||||
"text": text,
|
||||
},
|
||||
ensure_ascii=False,
|
||||
indent=2,
|
||||
)
|
||||
else:
|
||||
return json.dumps(
|
||||
{
|
||||
"success": False,
|
||||
"message": f"等待元素 '{selector}' 超时",
|
||||
},
|
||||
ensure_ascii=False,
|
||||
indent=2,
|
||||
)
|
||||
Reference in New Issue
Block a user