From dc773337d320d00caf927038e8f2c47488f9f470 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Mon, 22 Jun 2026 22:18:12 +0800 Subject: [PATCH] fix: bind browser sessions to dedicated worker threads --- app/helper/browser.py | 291 +++++++++++++++++++++++++++++++---- tests/test_browser_helper.py | 55 +++++++ 2 files changed, 314 insertions(+), 32 deletions(-) diff --git a/app/helper/browser.py b/app/helper/browser.py index c1ec55fb..fd1448bb 100644 --- a/app/helper/browser.py +++ b/app/helper/browser.py @@ -2,6 +2,7 @@ import ipaddress import threading import time import uuid +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from typing import Any, Callable, Optional, Protocol from urllib.parse import urlparse @@ -144,8 +145,11 @@ class BrowserSessionHelper: PRIVATE_HOST_SUFFIXES = (".localhost", ".local", ".lan", ".home", ".internal") PRIVATE_HOSTNAMES = {"localhost", "ip6-localhost", "ip6-loopback"} REF_ATTRIBUTE = "data-moviepilot-agent-ref" + SESSION_WORKER_NAME_PREFIX = "browser-session" _sessions: dict[str, _BrowserSessionState] = {} + _session_executors: dict[str, ThreadPoolExecutor] = {} + _session_thread_ids: dict[str, int] = {} _sessions_lock = threading.RLock() def __init__(self, headless: bool = True, viewport: Optional[dict[str, int]] = None): @@ -211,7 +215,9 @@ class BrowserSessionHelper: 关闭所有 Agent 浏览器会话。 """ with cls._sessions_lock: - session_keys = list(cls._sessions.keys()) + session_keys = list( + set(cls._sessions.keys()) | set(cls._session_executors.keys()) + ) for session_key in session_keys: cls.close_session(session_key) @@ -223,12 +229,27 @@ class BrowserSessionHelper: :param session_key: 会话标识 :return: 找到并关闭会话时返回 True """ - with cls._sessions_lock: - session = cls._sessions.pop(session_key, None) - if not session: - return False - cls._close_session_state(session) - return True + if cls._is_current_session_thread(session_key): + closed = cls._close_session_in_thread(session_key) + cls._shutdown_session_executor(session_key, wait=False) + return closed + + executor = cls._get_existing_session_executor(session_key) + if executor: + future = executor.submit( + cls._run_session_task, + session_key, + cls._close_session_in_thread, + session_key, + ) + try: + return future.result() + finally: + cls._shutdown_session_executor(session_key) + + closed = cls._close_session_in_thread(session_key) + cls._shutdown_session_executor(session_key) + return closed def with_session( self, @@ -249,6 +270,34 @@ class BrowserSessionHelper: :return: 回调函数返回值 """ self._prune_sessions() + return self._run_in_session_thread( + session_key, + self._with_session_in_thread, + session_key, + callback, + user_agent=user_agent, + cookies=cookies, + timeout=timeout, + ) + + def _with_session_in_thread( + self, + session_key: str, + callback: Callable[[_BrowserSessionState], Any], + user_agent: Optional[str] = None, + cookies: Optional[str] = None, + timeout: Optional[int] = 30, + ) -> Any: + """ + 在会话专属线程内获取浏览器会话并执行回调。 + + :param session_key: 会话标识 + :param callback: 使用浏览器会话执行操作的回调函数 + :param user_agent: 新建会话时使用的 User-Agent + :param cookies: 本次操作要注入的 Cookie 请求头 + :param timeout: 默认操作超时时间,单位秒 + :return: 回调函数返回值 + """ session = self._get_or_create_session( session_key=session_key, user_agent=user_agent, @@ -263,6 +312,164 @@ class BrowserSessionHelper: session.active_page.set_extra_http_headers({"cookie": cookies}) return callback(session) + @classmethod + def _run_in_session_thread( + cls, + session_key: str, + callback: Callable[..., Any], + *args: Any, + **kwargs: Any, + ) -> Any: + """ + 将浏览器同步 API 调用投递到当前会话固定的单线程执行器。 + + :param session_key: 会话标识 + :param callback: 需要在会话线程中运行的回调 + :param args: 回调位置参数 + :param kwargs: 回调关键字参数 + :return: 回调返回值 + """ + if cls._is_current_session_thread(session_key): + return callback(*args, **kwargs) + + for _ in range(2): + executor = cls._get_session_executor(session_key) + try: + future = executor.submit( + cls._run_session_task, + session_key, + callback, + *args, + **kwargs, + ) + except RuntimeError: + cls._discard_session_executor(session_key, executor) + continue + try: + return future.result() + except Exception: + with cls._sessions_lock: + has_session = session_key in cls._sessions + if not has_session: + cls._shutdown_session_executor(session_key) + raise + + raise RuntimeError("浏览器会话线程已关闭") + + @classmethod + def _run_session_task( + cls, + session_key: str, + callback: Callable[..., Any], + *args: Any, + **kwargs: Any, + ) -> Any: + """ + 记录当前会话工作线程后执行实际任务。 + + :param session_key: 会话标识 + :param callback: 需要执行的回调 + :param args: 回调位置参数 + :param kwargs: 回调关键字参数 + :return: 回调返回值 + """ + with cls._sessions_lock: + cls._session_thread_ids[session_key] = threading.get_ident() + return callback(*args, **kwargs) + + @classmethod + def _is_current_session_thread(cls, session_key: str) -> bool: + """ + 判断当前代码是否已运行在指定会话的固定线程内。 + + :param session_key: 会话标识 + :return: 当前线程是会话工作线程时返回 True + """ + with cls._sessions_lock: + thread_id = cls._session_thread_ids.get(session_key) + return thread_id == threading.get_ident() + + @classmethod + def _get_session_executor(cls, session_key: str) -> ThreadPoolExecutor: + """ + 获取或创建指定会话的单线程执行器。 + + :param session_key: 会话标识 + :return: 会话专属执行器 + """ + with cls._sessions_lock: + executor = cls._session_executors.get(session_key) + if executor: + return executor + executor = ThreadPoolExecutor( + max_workers=1, + thread_name_prefix=cls.SESSION_WORKER_NAME_PREFIX, + ) + cls._session_executors[session_key] = executor + return executor + + @classmethod + def _get_existing_session_executor( + cls, session_key: str + ) -> Optional[ThreadPoolExecutor]: + """ + 获取指定会话已存在的执行器。 + + :param session_key: 会话标识 + :return: 已存在的执行器,不存在时返回 None + """ + with cls._sessions_lock: + return cls._session_executors.get(session_key) + + @classmethod + def _discard_session_executor( + cls, + session_key: str, + executor: ThreadPoolExecutor, + ) -> None: + """ + 丢弃已经关闭的会话执行器。 + + :param session_key: 会话标识 + :param executor: 需要从缓存中移除的执行器 + """ + with cls._sessions_lock: + if cls._session_executors.get(session_key) is executor: + cls._session_executors.pop(session_key, None) + + @classmethod + def _shutdown_session_executor( + cls, + session_key: str, + wait: bool = True, + ) -> None: + """ + 关闭并移除指定会话的执行器。 + + :param session_key: 会话标识 + :param wait: 是否等待工作线程退出 + """ + with cls._sessions_lock: + executor = cls._session_executors.pop(session_key, None) + cls._session_thread_ids.pop(session_key, None) + if executor: + executor.shutdown(wait=wait, cancel_futures=True) + + @classmethod + def _close_session_in_thread(cls, session_key: str) -> bool: + """ + 在会话固定线程内关闭并移除浏览器会话。 + + :param session_key: 会话标识 + :return: 找到并关闭会话时返回 True + """ + with cls._sessions_lock: + session = cls._sessions.pop(session_key, None) + if not session: + return False + cls._close_session_state(session) + return True + def open_tab( self, session: _BrowserSessionState, @@ -479,24 +686,29 @@ class BrowserSessionHelper: if session: return session - context = self._launch_context( - headless=self.headless, - user_agent=user_agent, - viewport=self.viewport, - ) - page = context.new_page() - if cookies: - page.set_extra_http_headers({"cookie": cookies}) - session = _BrowserSessionState( - session_key=session_key, - context=context, - pages=[page], - user_agent=user_agent, - cookies=cookies, - ) + context = self._launch_context( + headless=self.headless, + user_agent=user_agent, + viewport=self.viewport, + ) + page = context.new_page() + if cookies: + page.set_extra_http_headers({"cookie": cookies}) + session = _BrowserSessionState( + session_key=session_key, + context=context, + pages=[page], + user_agent=user_agent, + cookies=cookies, + ) + with self._sessions_lock: + existing_session = self._sessions.get(session_key) + if existing_session: + self._close_session_state(session) + return existing_session self._sessions[session_key] = session - self._enforce_session_limit() - return session + self._enforce_session_limit(protect_session_key=session_key) + return session @classmethod def _prune_sessions(cls) -> None: @@ -511,14 +723,29 @@ class BrowserSessionHelper: cls.close_session(session_key) @classmethod - def _enforce_session_limit(cls) -> None: - while len(cls._sessions) > cls.MAX_SESSIONS: - oldest_key = min( - cls._sessions, - key=lambda key: cls._sessions[key].last_used_at, - ) - session = cls._sessions.pop(oldest_key) - cls._close_session_state(session) + def _enforce_session_limit(cls, protect_session_key: Optional[str] = None) -> None: + """ + 清理超过数量上限的旧会话。 + + :param protect_session_key: 本次刚创建、需要优先保留的会话标识 + """ + while True: + with cls._sessions_lock: + if len(cls._sessions) <= cls.MAX_SESSIONS: + return + candidate_keys = [ + session_key + for session_key in cls._sessions + if session_key != protect_session_key + ] + if not candidate_keys: + return + oldest_key = min( + candidate_keys, + key=lambda key: cls._sessions[key].last_used_at, + ) + if not cls.close_session(oldest_key): + return @staticmethod def _close_session_state(session: _BrowserSessionState) -> None: diff --git a/tests/test_browser_helper.py b/tests/test_browser_helper.py index 110891ef..c6f0191a 100644 --- a/tests/test_browser_helper.py +++ b/tests/test_browser_helper.py @@ -1,6 +1,8 @@ from __future__ import annotations import json +import threading +from concurrent.futures import ThreadPoolExecutor from typing import Optional from unittest.mock import patch @@ -45,6 +47,7 @@ class _FakePage: self.clicks = [] self.fills = [] self.selects = [] + self.close_thread_id = None def set_extra_http_headers(self, headers: dict[str, str]) -> None: """记录额外请求头。""" @@ -124,6 +127,7 @@ class _FakePage: def close(self) -> None: """记录页面关闭状态。""" + self.close_thread_id = threading.get_ident() self.closed = True @@ -133,6 +137,7 @@ class _FakeContext: def __init__(self, pages: Optional[list[_FakePage]] = None) -> None: self.pages = pages or [_FakePage()] self.closed = False + self.close_thread_id = None def new_page(self) -> _FakePage: """返回或创建模拟页面。""" @@ -146,6 +151,7 @@ class _FakeContext: def close(self) -> None: """记录上下文关闭状态。""" + self.close_thread_id = threading.get_ident() self.closed = True @@ -250,6 +256,55 @@ def test_browser_session_helper_reuses_page_within_session(): assert not context.closed +def test_browser_session_helper_runs_same_session_on_one_worker_thread(): + """同一 session_key 的浏览器操作应固定在同一个工作线程。""" + page = _FakePage() + context = _FakeContext([page]) + helper = BrowserSessionHelper() + caller_thread_ids = set() + session_thread_ids = [] + barrier = threading.Barrier(2) + + def _run_from_caller_thread() -> int: + """从外部调用线程进入同一个浏览器会话。""" + caller_thread_ids.add(threading.get_ident()) + barrier.wait(timeout=1) + return helper.with_session( + "session-1", + lambda _session: threading.get_ident(), + ) + + with patch.object(BrowserSessionHelper, "_launch_context", return_value=context): + with ThreadPoolExecutor(max_workers=2) as executor: + futures = [ + executor.submit(_run_from_caller_thread), + executor.submit(_run_from_caller_thread), + ] + session_thread_ids = [future.result(timeout=1) for future in futures] + + assert len(caller_thread_ids) == 2 + assert len(set(session_thread_ids)) == 1 + assert session_thread_ids[0] not in caller_thread_ids + + +def test_browser_session_helper_closes_session_on_worker_thread(): + """关闭会话时应在创建浏览器对象的工作线程内释放资源。""" + page = _FakePage() + context = _FakeContext([page]) + helper = BrowserSessionHelper() + + with patch.object(BrowserSessionHelper, "_launch_context", return_value=context): + session_thread_id = helper.with_session( + "session-1", + lambda _session: threading.get_ident(), + ) + closed = BrowserSessionHelper.close_session("session-1") + + assert closed is True + assert page.close_thread_id == session_thread_id + assert context.close_thread_id == session_thread_id + + def test_browse_webpage_returns_snapshot_with_refs_after_goto(): """goto 后应返回包含可交互元素 ref 的页面快照。""" page = _FakePage()