diff --git a/examples/sandbox/extensions/README.md b/examples/sandbox/extensions/README.md
index 837d9dfa28..05c2ba07b6 100644
--- a/examples/sandbox/extensions/README.md
+++ b/examples/sandbox/extensions/README.md
@@ -211,14 +211,17 @@ uv run python examples/sandbox/extensions/vercel_runner.py --stream
Useful flags:
+- `--demo pty`
+- `--demo port`
+- `--demo backend-checks`
- `--workspace-persistence tar`
- `--workspace-persistence snapshot`
- `--runtime node22`
- `--timeout-ms 120000`
-The Vercel example stays on the non-PTY path on purpose. It covers command
-execution, workspace materialization, and persistence verification without
-depending on interactive websocket support.
+The default Vercel run mirrors the other cloud runners and stays focused on the
+standard agent flow. Use `--demo` for backend-specific checks such as PTY,
+resume verification, and exposed-port validation.
## Daytona
diff --git a/examples/sandbox/extensions/vercel_runner.py b/examples/sandbox/extensions/vercel_runner.py
index 9d33bf1fe4..8a9ef0b102 100644
--- a/examples/sandbox/extensions/vercel_runner.py
+++ b/examples/sandbox/extensions/vercel_runner.py
@@ -1,9 +1,5 @@
"""
-Minimal Vercel-backed sandbox example for manual validation.
-
-This mirrors the other cloud extension examples: it creates a tiny workspace,
-verifies stop/resume persistence, then asks a sandboxed agent to inspect the
-workspace through one shell tool.
+Vercel-backed sandbox example for manual validation.
"""
from __future__ import annotations
@@ -20,18 +16,19 @@
from pathlib import Path
from typing import Literal, cast
-from openai.types.responses import ResponseTextDeltaEvent
+from openai.types.responses import ResponseCompletedEvent, ResponseTextDeltaEvent
-from agents import ModelSettings, Runner
-from agents.models.openai_provider import OpenAIProvider
+from agents import ModelSettings, MultiProvider, Runner
from agents.run import RunConfig
from agents.sandbox import LocalSnapshotSpec, Manifest, SandboxAgent, SandboxRunConfig
+from agents.sandbox.capabilities import Shell
+from agents.sandbox.entries import File
from agents.sandbox.session import BaseSandboxSession
if __package__ is None or __package__ == "":
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
-from examples.sandbox.misc.example_support import text_manifest
+from examples.sandbox.misc.example_support import text_manifest, tool_call_name
from examples.sandbox.misc.workspace_shell import WorkspaceShellCapability
try:
@@ -43,11 +40,18 @@
) from exc
+DEFAULT_MODEL = "gpt-5.4"
DEFAULT_QUESTION = "Summarize this cloud sandbox workspace in 2 sentences."
+DEFAULT_PTY_QUESTION = (
+ "Start an interactive Python session with `tty=true`. In that same session, compute "
+ "`5 + 5`, then add 5 more to the previous result. Briefly report the outputs and "
+ "confirm that you stayed in one Python process."
+)
SNAPSHOT_CHECK_PATH = Path("snapshot-check.txt")
SNAPSHOT_CHECK_CONTENT = "vercel snapshot round-trip ok\n"
LIVE_RESUME_CHECK_PATH = Path("live-resume-check.txt")
LIVE_RESUME_CHECK_CONTENT = "vercel live resume ok\n"
+PTY_CHECK_VALUE = "vercel pty round-trip ok"
EXPOSED_PORT = 3000
PORT_CHECK_CONTENT = "
vercel exposed port ok
\n"
PORT_CHECK_NODE_SERVER_PATH = Path(".port-check-server.js")
@@ -93,8 +97,8 @@ def _build_manifest() -> Manifest:
"handoff.md": (
"# Handoff\n\n"
"- Customer: Northwind Traders.\n"
- "- Goal: validate Vercel sandbox exec and persistence flows.\n"
- "- Current status: non-PTY backend slice is wired and under test.\n"
+ "- Goal: validate Vercel sandbox exec, PTY, and persistence flows.\n"
+ "- Current status: backend slice is wired and under test.\n"
),
"todo.md": (
"# Todo\n\n"
@@ -105,6 +109,36 @@ def _build_manifest() -> Manifest:
)
+def _build_pty_manifest() -> Manifest:
+ return Manifest(
+ entries={
+ "README.md": File(
+ content=(
+ b"# Vercel PTY Agent Example\n\n"
+ b"This workspace is used by the Vercel PTY demo.\n"
+ )
+ ),
+ }
+ )
+
+
+def _stream_event_banner(event_name: str, raw_item: object) -> str | None:
+ _ = raw_item
+ if event_name == "tool_called":
+ return "[tool call]"
+ if event_name == "tool_output":
+ return "[tool output]"
+ return None
+
+
+def _raw_item_call_id(raw_item: object) -> str | None:
+ if isinstance(raw_item, dict):
+ call_id = raw_item.get("call_id") or raw_item.get("id")
+ else:
+ call_id = getattr(raw_item, "call_id", None) or getattr(raw_item, "id", None)
+ return call_id if isinstance(call_id, str) and call_id else None
+
+
async def _read_text(session: BaseSandboxSession, path: Path) -> str:
data = await session.read(path)
text = cast(str | bytes, data.read())
@@ -134,6 +168,31 @@ def _require_vercel_credentials() -> None:
)
+def _build_options(
+ *,
+ runtime: str | None,
+ timeout_ms: int | None,
+ workspace_persistence: Literal["tar", "snapshot"],
+ interactive: bool = False,
+ exposed_ports: tuple[int, ...] = (),
+) -> VercelSandboxClientOptions:
+ return VercelSandboxClientOptions(
+ runtime=runtime,
+ timeout_ms=timeout_ms,
+ workspace_persistence=workspace_persistence,
+ interactive=interactive,
+ exposed_ports=exposed_ports,
+ )
+
+
+def _build_run_config(*, sandbox: BaseSandboxSession, workflow_name: str) -> RunConfig:
+ return RunConfig(
+ sandbox=SandboxRunConfig(session=sandbox),
+ workflow_name=workflow_name,
+ model_provider=MultiProvider(openai_prefix_mode="model_id"),
+ )
+
+
async def _verify_stop_resume(
*,
manifest: Manifest,
@@ -142,7 +201,7 @@ async def _verify_stop_resume(
workspace_persistence: Literal["tar", "snapshot"],
) -> None:
client = VercelSandboxClient()
- options = VercelSandboxClientOptions(
+ options = _build_options(
runtime=runtime,
timeout_ms=timeout_ms,
workspace_persistence=workspace_persistence,
@@ -189,7 +248,7 @@ async def _verify_resume_running_sandbox(
client = VercelSandboxClient()
sandbox = await client.create(
manifest=manifest,
- options=VercelSandboxClientOptions(
+ options=_build_options(
runtime=runtime,
timeout_ms=timeout_ms,
workspace_persistence=workspace_persistence,
@@ -249,7 +308,7 @@ async def _verify_exposed_port(
client = VercelSandboxClient()
sandbox = await client.create(
manifest=manifest,
- options=VercelSandboxClientOptions(
+ options=_build_options(
runtime=runtime,
timeout_ms=timeout_ms,
workspace_persistence=workspace_persistence,
@@ -267,10 +326,7 @@ async def _verify_exposed_port(
PORT_CHECK_PYTHON_SERVER_PATH,
io.BytesIO(PORT_CHECK_PYTHON_SERVER_CONTENT.encode("utf-8")),
)
- result = await sandbox.exec(
- _port_check_server_command(),
- shell=True,
- )
+ result = await sandbox.exec(_port_check_server_command(), shell=True)
if not result.ok():
raise RuntimeError(
f"Failed to start HTTP server for exposed port check: {result.stderr!r}"
@@ -298,39 +354,173 @@ async def _verify_exposed_port(
await sandbox.shutdown()
-async def main(
+async def _verify_pty_direct(
*,
- model: str,
- question: str,
+ manifest: Manifest,
runtime: str | None,
timeout_ms: int | None,
workspace_persistence: Literal["tar", "snapshot"],
- stream: bool,
) -> None:
- _require_env("OPENAI_API_KEY")
- _require_vercel_credentials()
-
- manifest = _build_manifest()
-
- await _verify_stop_resume(
+ client = VercelSandboxClient()
+ sandbox = await client.create(
manifest=manifest,
- runtime=runtime,
- timeout_ms=timeout_ms,
- workspace_persistence=workspace_persistence,
+ options=_build_options(
+ runtime=runtime,
+ timeout_ms=timeout_ms,
+ workspace_persistence=workspace_persistence,
+ interactive=True,
+ ),
)
- await _verify_resume_running_sandbox(
- manifest=manifest,
- runtime=runtime,
- timeout_ms=timeout_ms,
- workspace_persistence=workspace_persistence,
+
+ try:
+ await sandbox.start()
+ if not sandbox.supports_pty():
+ raise RuntimeError("Interactive Vercel sandbox did not report PTY support.")
+
+ started = await sandbox.pty_exec_start("sh", shell=False, yield_time_s=0.25)
+ process_id = started.process_id
+ if process_id is None:
+ raise RuntimeError(
+ f"PTY session exited too early during startup: output={started.output!r}, "
+ f"exit_code={started.exit_code!r}"
+ )
+
+ await sandbox.pty_write_stdin(
+ session_id=process_id,
+ chars=f"export PTY_CHECK_VALUE={PTY_CHECK_VALUE!r}\n",
+ yield_time_s=0.25,
+ )
+ completed = await sandbox.pty_write_stdin(
+ session_id=process_id,
+ chars='printf "%s\\n" "$PTY_CHECK_VALUE"\nexit\n',
+ yield_time_s=0.5,
+ )
+
+ if completed.exit_code != 0:
+ raise RuntimeError(
+ f"PTY verification exited with {completed.exit_code}: {completed.output!r}"
+ )
+ if PTY_CHECK_VALUE not in completed.output.decode("utf-8", errors="replace"):
+ raise RuntimeError(
+ f"PTY verification did not observe persisted shell state: {completed.output!r}"
+ )
+ finally:
+ try:
+ await sandbox.pty_terminate_all()
+ finally:
+ await sandbox.shutdown()
+
+ print(f"pty round-trip ok ({workspace_persistence})")
+
+
+async def _run_pty_demo(
+ *,
+ model: str,
+ question: str,
+ runtime: str | None,
+ timeout_ms: int | None,
+ workspace_persistence: Literal["tar", "snapshot"],
+) -> None:
+ agent = SandboxAgent(
+ name="Vercel PTY Demo",
+ model=model,
+ instructions=(
+ "Complete the task by interacting with the sandbox through the shell capability. "
+ "Keep the final answer concise. "
+ "Preserve process state when the task depends on it. If you start an interactive "
+ "program, continue using that same process instead of launching a second one."
+ ),
+ default_manifest=_build_pty_manifest(),
+ capabilities=[Shell()],
+ model_settings=ModelSettings(tool_choice="required"),
)
- await _verify_exposed_port(
- manifest=manifest,
- runtime=runtime,
- timeout_ms=timeout_ms,
- workspace_persistence=workspace_persistence,
+
+ client = VercelSandboxClient()
+ sandbox = await client.create(
+ manifest=agent.default_manifest,
+ options=_build_options(
+ runtime=runtime,
+ timeout_ms=timeout_ms,
+ workspace_persistence=workspace_persistence,
+ interactive=True,
+ ),
)
+ try:
+ async with sandbox:
+ result = Runner.run_streamed(
+ agent,
+ question,
+ run_config=_build_run_config(
+ sandbox=sandbox,
+ workflow_name="Vercel PTY sandbox example",
+ ),
+ )
+
+ saw_text_delta = False
+ saw_any_text = False
+ tool_names_by_call_id: dict[str, str] = {}
+
+ async for event in result.stream_events():
+ if event.type == "raw_response_event" and isinstance(
+ event.data, ResponseTextDeltaEvent
+ ):
+ if not saw_text_delta:
+ print("assistant> ", end="", flush=True)
+ saw_text_delta = True
+ print(event.data.delta, end="", flush=True)
+ saw_any_text = True
+ continue
+ if event.type == "raw_response_event" and isinstance(
+ event.data, ResponseCompletedEvent
+ ):
+ continue
+
+ if event.type != "run_item_stream_event":
+ continue
+
+ raw_item = event.item.raw_item
+ banner = _stream_event_banner(event.name, raw_item)
+ if banner is None:
+ continue
+
+ if saw_text_delta:
+ print()
+ saw_text_delta = False
+
+ if event.name == "tool_called":
+ tool_name = tool_call_name(raw_item)
+ call_id = _raw_item_call_id(raw_item)
+ if call_id is not None and tool_name:
+ tool_names_by_call_id[call_id] = tool_name
+ if tool_name:
+ banner = f"{banner} {tool_name}"
+ elif event.name == "tool_output":
+ call_id = _raw_item_call_id(raw_item)
+ output_tool_name = tool_names_by_call_id.get(call_id or "")
+ if output_tool_name:
+ banner = f"{banner} {output_tool_name}"
+
+ print(banner)
+
+ if saw_text_delta:
+ print()
+ if not saw_any_text:
+ print(result.final_output)
+ finally:
+ await client.delete(sandbox)
+
+
+async def _run_standard_agent(
+ *,
+ model: str,
+ question: str,
+ runtime: str | None,
+ timeout_ms: int | None,
+ workspace_persistence: Literal["tar", "snapshot"],
+ stream: bool,
+) -> None:
+ manifest = _build_manifest()
agent = SandboxAgent(
name="Vercel Sandbox Assistant",
model=model,
@@ -348,24 +538,19 @@ async def main(
client = VercelSandboxClient()
sandbox = await client.create(
manifest=manifest,
- options=VercelSandboxClientOptions(
+ options=_build_options(
runtime=runtime,
timeout_ms=timeout_ms,
workspace_persistence=workspace_persistence,
),
)
- run_config = RunConfig(
- model_provider=OpenAIProvider(),
- sandbox=SandboxRunConfig(session=sandbox),
- # Disable tracing because it does not currently work reliably with alternate
- # upstreams such as AI Gateway, and provider config already comes from env.
- tracing_disabled=True,
- workflow_name="Vercel sandbox example",
- )
-
try:
async with sandbox:
+ run_config = _build_run_config(
+ sandbox=sandbox,
+ workflow_name="Vercel sandbox example",
+ )
if not stream:
result = await Runner.run(agent, question, run_config=run_config)
print(result.final_output)
@@ -381,6 +566,10 @@ async def main(
print("assistant> ", end="", flush=True)
saw_text_delta = True
print(event.data.delta, end="", flush=True)
+ elif event.type == "raw_response_event" and isinstance(
+ event.data, ResponseCompletedEvent
+ ):
+ continue
if saw_text_delta:
print()
@@ -388,10 +577,109 @@ async def main(
await client.delete(sandbox)
+async def main(
+ *,
+ model: str,
+ question: str,
+ runtime: str | None,
+ timeout_ms: int | None,
+ workspace_persistence: Literal["tar", "snapshot"],
+ stream: bool,
+ demo: str,
+) -> None:
+ _require_env("OPENAI_API_KEY")
+ _require_vercel_credentials()
+
+ manifest = _build_manifest()
+
+ if demo == "snapshot":
+ await _verify_stop_resume(
+ manifest=manifest,
+ runtime=runtime,
+ timeout_ms=timeout_ms,
+ workspace_persistence=workspace_persistence,
+ )
+ return
+
+ if demo == "resume":
+ await _verify_resume_running_sandbox(
+ manifest=manifest,
+ runtime=runtime,
+ timeout_ms=timeout_ms,
+ workspace_persistence=workspace_persistence,
+ )
+ return
+
+ if demo == "port":
+ await _verify_exposed_port(
+ manifest=manifest,
+ runtime=runtime,
+ timeout_ms=timeout_ms,
+ workspace_persistence=workspace_persistence,
+ )
+ return
+
+ if demo == "pty":
+ await _run_pty_demo(
+ model=model,
+ question=question,
+ runtime=runtime,
+ timeout_ms=timeout_ms,
+ workspace_persistence=workspace_persistence,
+ )
+ return
+
+ if demo == "backend-checks":
+ await _verify_stop_resume(
+ manifest=manifest,
+ runtime=runtime,
+ timeout_ms=timeout_ms,
+ workspace_persistence=workspace_persistence,
+ )
+ await _verify_resume_running_sandbox(
+ manifest=manifest,
+ runtime=runtime,
+ timeout_ms=timeout_ms,
+ workspace_persistence=workspace_persistence,
+ )
+ await _verify_exposed_port(
+ manifest=manifest,
+ runtime=runtime,
+ timeout_ms=timeout_ms,
+ workspace_persistence=workspace_persistence,
+ )
+ await _verify_pty_direct(
+ manifest=manifest,
+ runtime=runtime,
+ timeout_ms=timeout_ms,
+ workspace_persistence=workspace_persistence,
+ )
+ return
+
+ await _run_standard_agent(
+ model=model,
+ question=question,
+ runtime=runtime,
+ timeout_ms=timeout_ms,
+ workspace_persistence=workspace_persistence,
+ stream=stream,
+ )
+
+
if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument("--model", default="gpt-5.4", help="Model name to use.")
+ parser = argparse.ArgumentParser(
+ description=(
+ "Run a Vercel sandbox agent with optional resume, exposed-port, and PTY demos."
+ )
+ )
+ parser.add_argument("--model", default=DEFAULT_MODEL, help="Model name to use.")
parser.add_argument("--question", default=DEFAULT_QUESTION, help="Prompt to send to the agent.")
+ parser.add_argument(
+ "--demo",
+ default="agent",
+ choices=["agent", "snapshot", "resume", "port", "pty", "backend-checks"],
+ help="Which demo to run (default: agent).",
+ )
parser.add_argument(
"--runtime",
default=None,
@@ -407,18 +695,22 @@ async def main(
"--workspace-persistence",
choices=("tar", "snapshot"),
default="tar",
- help="Workspace persistence mode to verify before the agent run.",
+ help="Workspace persistence mode for the Vercel sandbox.",
)
parser.add_argument("--stream", action="store_true", default=False, help="Stream the response.")
args = parser.parse_args()
+ default_question = (
+ DEFAULT_PTY_QUESTION if args.demo == "pty" and args.question == DEFAULT_QUESTION else None
+ )
asyncio.run(
main(
model=args.model,
- question=args.question,
+ question=default_question or args.question,
runtime=args.runtime,
timeout_ms=args.timeout_ms,
workspace_persistence=cast(Literal["tar", "snapshot"], args.workspace_persistence),
stream=args.stream,
+ demo=args.demo,
)
)
diff --git a/pyproject.toml b/pyproject.toml
index b016977639..ab6c5f4616 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -51,7 +51,7 @@ cloudflare = ["aiohttp>=3.12,<4"]
e2b = ["e2b==2.20.0", "e2b-code-interpreter==2.4.1"]
modal = ["modal==1.3.5"]
runloop = ["runloop_api_client>=1.16.0,<2.0.0"]
-vercel = ["vercel>=0.5.6,<0.6"]
+vercel = ["vercel>=0.5.7,<0.6"]
s3 = ["boto3>=1.34"]
temporal = [
"temporalio==1.25.0",
diff --git a/src/agents/extensions/sandbox/vercel/sandbox.py b/src/agents/extensions/sandbox/vercel/sandbox.py
index 6bc14876a3..58e317a9e7 100644
--- a/src/agents/extensions/sandbox/vercel/sandbox.py
+++ b/src/agents/extensions/sandbox/vercel/sandbox.py
@@ -14,10 +14,14 @@
import asyncio
import io
import json
+import logging
import os
import tarfile
+import time
import uuid
+from collections import deque
from collections.abc import Awaitable, Callable
+from dataclasses import dataclass, field
from pathlib import Path, PurePosixPath
from typing import Any, Literal, cast
from urllib.parse import urlsplit
@@ -31,6 +35,7 @@
SandboxStatus,
SnapshotSource,
)
+from vercel.sandbox.pty import AsyncPTYSession
from ....sandbox.errors import (
ConfigurationError,
@@ -51,6 +56,16 @@
from ....sandbox.session.base_sandbox_session import BaseSandboxSession
from ....sandbox.session.dependencies import Dependencies
from ....sandbox.session.manager import Instrumentation
+from ....sandbox.session.pty_types import (
+ PTY_PROCESSES_MAX,
+ PTY_PROCESSES_WARNING,
+ PtyExecUpdate,
+ allocate_pty_process_id,
+ clamp_pty_yield_time_ms,
+ process_id_to_prune_from_meta,
+ resolve_pty_write_yield_time_ms,
+ truncate_text_by_tokens,
+)
from ....sandbox.session.sandbox_client import BaseSandboxClient, BaseSandboxClientOptions
from ....sandbox.snapshot import SnapshotBase, SnapshotSpec, resolve_snapshot
from ....sandbox.types import ExecResult, ExposedPortEndpoint, User
@@ -77,6 +92,19 @@
httpx.NetworkError,
httpx.ProtocolError,
)
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class _VercelPtySessionEntry:
+ session: Any
+ output_chunks: deque[bytes] = field(default_factory=deque)
+ output_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
+ output_notify: asyncio.Event = field(default_factory=asyncio.Event)
+ reader_task: asyncio.Task[None] | None = None
+ done: bool = False
+ exit_code: int | None = None
+ last_used: float = field(default_factory=time.monotonic)
def _is_transient_create_error(exc: BaseException) -> bool:
@@ -245,6 +273,9 @@ class VercelSandboxSession(BaseSandboxSession):
state: VercelSandboxSessionState
_sandbox: Any | None
_token: str | None
+ _pty_lock: asyncio.Lock
+ _pty_sessions: dict[int, _VercelPtySessionEntry]
+ _reserved_pty_process_ids: set[int]
def __init__(
self,
@@ -256,6 +287,9 @@ def __init__(
self.state = state
self._sandbox = sandbox
self._token = token
+ self._pty_lock = asyncio.Lock()
+ self._pty_sessions = {}
+ self._reserved_pty_process_ids = set()
@classmethod
def from_state(
@@ -268,7 +302,7 @@ def from_state(
return cls(state=state, sandbox=sandbox, token=token)
def supports_pty(self) -> bool:
- return False
+ return self.state.interactive
def _reject_user_arg(self, *, op: Literal["exec", "read", "write"], user: str | User) -> None:
user_name = user.name if isinstance(user, User) else user
@@ -453,8 +487,243 @@ async def running(self) -> bool:
return bool(sandbox.status == SandboxStatus.RUNNING)
async def shutdown(self) -> None:
+ await self.pty_terminate_all()
await self._stop_attached_sandbox()
+ async def pty_exec_start(
+ self,
+ *command: str | Path,
+ timeout: float | None = None,
+ shell: bool | list[str] = True,
+ user: str | User | None = None,
+ tty: bool = False,
+ yield_time_s: float | None = None,
+ max_output_tokens: int | None = None,
+ ) -> PtyExecUpdate:
+ _ = tty
+ sandbox = await self._ensure_sandbox()
+ sanitized = self._prepare_exec_command(*command, shell=shell, user=user)
+ connect_timeout = 30.0 if timeout is None else timeout
+
+ entry = _VercelPtySessionEntry(session=None)
+ registered = False
+ pruned: _VercelPtySessionEntry | None = None
+ process_count = 0
+
+ try:
+ session = await AsyncPTYSession.open(
+ sandbox,
+ sanitized,
+ cwd=self.state.manifest.root,
+ _connection_timeout=connect_timeout,
+ )
+ await session.ready()
+ entry.session = session
+ entry.reader_task = asyncio.create_task(self._pty_reader(entry))
+
+ async with self._pty_lock:
+ process_id = allocate_pty_process_id(self._reserved_pty_process_ids)
+ self._reserved_pty_process_ids.add(process_id)
+ pruned = self._prune_pty_sessions_if_needed()
+ self._pty_sessions[process_id] = entry
+ process_count = len(self._pty_sessions)
+ registered = True
+ except TimeoutError as e:
+ if not registered:
+ await self._terminate_pty_entry(entry)
+ raise ExecTimeoutError(command=command, timeout_s=timeout, cause=e) from e
+ except ExecTimeoutError:
+ raise
+ except Exception as e:
+ if not registered:
+ await self._terminate_pty_entry(entry)
+ raise ExecTransportError(
+ command=command,
+ context={"backend": "vercel", "sandbox_id": self.state.sandbox_id},
+ cause=e,
+ ) from e
+
+ if pruned is not None:
+ await self._terminate_pty_entry(pruned)
+
+ if process_count >= PTY_PROCESSES_WARNING:
+ logger.warning(
+ "PTY process count reached warning threshold: %s active sessions",
+ process_count,
+ )
+
+ yield_time_ms = 10_000 if yield_time_s is None else int(yield_time_s * 1000)
+ output, original_token_count = await self._collect_pty_output(
+ entry=entry,
+ yield_time_ms=clamp_pty_yield_time_ms(yield_time_ms),
+ max_output_tokens=max_output_tokens,
+ )
+ return await self._finalize_pty_update(
+ process_id=process_id,
+ entry=entry,
+ output=output,
+ original_token_count=original_token_count,
+ )
+
+ async def pty_write_stdin(
+ self,
+ *,
+ session_id: int,
+ chars: str,
+ yield_time_s: float | None = None,
+ max_output_tokens: int | None = None,
+ ) -> PtyExecUpdate:
+ async with self._pty_lock:
+ entry = self._resolve_pty_session_entry(
+ pty_processes=self._pty_sessions,
+ session_id=session_id,
+ )
+
+ try:
+ if chars:
+ await entry.session.stream.send(chars.encode("utf-8"))
+ except Exception as e:
+ raise ExecTransportError(
+ command=("pty_write_stdin",),
+ context={
+ "backend": "vercel",
+ "sandbox_id": self.state.sandbox_id,
+ "session_id": session_id,
+ },
+ cause=e,
+ ) from e
+
+ yield_time_ms = 250 if yield_time_s is None else int(yield_time_s * 1000)
+ output, original_token_count = await self._collect_pty_output(
+ entry=entry,
+ yield_time_ms=resolve_pty_write_yield_time_ms(
+ yield_time_ms=yield_time_ms,
+ input_empty=chars == "",
+ ),
+ max_output_tokens=max_output_tokens,
+ )
+ entry.last_used = time.monotonic()
+ return await self._finalize_pty_update(
+ process_id=session_id,
+ entry=entry,
+ output=output,
+ original_token_count=original_token_count,
+ )
+
+ async def pty_terminate_all(self) -> None:
+ async with self._pty_lock:
+ entries = list(self._pty_sessions.values())
+ self._pty_sessions.clear()
+ self._reserved_pty_process_ids.clear()
+ for entry in entries:
+ await self._terminate_pty_entry(entry)
+
+ async def _pty_reader(self, entry: _VercelPtySessionEntry) -> None:
+ """Stream PTY output into the session buffer and capture process exit."""
+ try:
+ async for data in entry.session.stream:
+ if not data:
+ continue
+ async with entry.output_lock:
+ entry.output_chunks.append(data)
+ entry.output_notify.set()
+ except Exception as e:
+ logger.debug("PTY reader output stream terminated with error: %s", e)
+ try:
+ finished = await entry.session.command.wait()
+ entry.exit_code = finished.exit_code
+ except Exception as e:
+ logger.debug("PTY reader terminated with error: %s", e)
+ finally:
+ entry.done = True
+ entry.output_notify.set()
+
+ async def _collect_pty_output(
+ self,
+ *,
+ entry: _VercelPtySessionEntry,
+ yield_time_ms: int,
+ max_output_tokens: int | None,
+ ) -> tuple[bytes, int | None]:
+ deadline = time.monotonic() + (yield_time_ms / 1000)
+ output = bytearray()
+
+ while True:
+ async with entry.output_lock:
+ while entry.output_chunks:
+ output.extend(entry.output_chunks.popleft())
+
+ if time.monotonic() >= deadline:
+ break
+ if entry.done:
+ async with entry.output_lock:
+ while entry.output_chunks:
+ output.extend(entry.output_chunks.popleft())
+ break
+
+ remaining_s = deadline - time.monotonic()
+ if remaining_s <= 0:
+ break
+ try:
+ await asyncio.wait_for(entry.output_notify.wait(), timeout=remaining_s)
+ except asyncio.TimeoutError:
+ break
+ entry.output_notify.clear()
+
+ text = output.decode("utf-8", errors="replace")
+ truncated, original_token_count = truncate_text_by_tokens(text, max_output_tokens)
+ return truncated.encode("utf-8", errors="replace"), original_token_count
+
+ async def _finalize_pty_update(
+ self,
+ *,
+ process_id: int,
+ entry: _VercelPtySessionEntry,
+ output: bytes,
+ original_token_count: int | None,
+ ) -> PtyExecUpdate:
+ exit_code = entry.exit_code if entry.done else None
+ live_process_id: int | None = process_id
+
+ if entry.done:
+ async with self._pty_lock:
+ removed = self._pty_sessions.pop(process_id, None)
+ self._reserved_pty_process_ids.discard(process_id)
+ if removed is not None:
+ await self._terminate_pty_entry(removed)
+ live_process_id = None
+
+ return PtyExecUpdate(
+ process_id=live_process_id,
+ output=output,
+ exit_code=exit_code,
+ original_token_count=original_token_count,
+ )
+
+ def _prune_pty_sessions_if_needed(self) -> _VercelPtySessionEntry | None:
+ if len(self._pty_sessions) < PTY_PROCESSES_MAX:
+ return None
+ meta = [(pid, entry.last_used, entry.done) for pid, entry in self._pty_sessions.items()]
+ pid = process_id_to_prune_from_meta(meta)
+ if pid is None:
+ return None
+ self._reserved_pty_process_ids.discard(pid)
+ return self._pty_sessions.pop(pid, None)
+
+ async def _terminate_pty_entry(self, entry: _VercelPtySessionEntry) -> None:
+ if entry.session is None:
+ return
+ try:
+ if entry.reader_task is not None and not entry.reader_task.done():
+ entry.reader_task.cancel()
+ try:
+ await entry.reader_task
+ except (asyncio.CancelledError, Exception):
+ pass
+ await entry.session.close()
+ except Exception as e:
+ logger.debug("PTY entry termination error (non-fatal): %s", e)
+
async def _persist_with_ephemeral_mounts_removed(
self,
operation: Callable[[], Awaitable[io.IOBase]],
diff --git a/tests/extensions/test_sandbox_vercel.py b/tests/extensions/test_sandbox_vercel.py
index 55460823dc..4ebd0204bf 100644
--- a/tests/extensions/test_sandbox_vercel.py
+++ b/tests/extensions/test_sandbox_vercel.py
@@ -1,16 +1,19 @@
from __future__ import annotations
+import asyncio
import builtins
import importlib
import io
import sys
import tarfile
import types
+from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal, cast
import httpx
import pytest
+from anyio import EndOfStream
from pydantic import BaseModel, PrivateAttr
from agents.sandbox import Manifest
@@ -24,6 +27,8 @@
from agents.sandbox.snapshot import NoopSnapshot, SnapshotBase
from agents.sandbox.types import User
+_ASYNCIO = asyncio
+
class _FakeNetworkPolicyRule(BaseModel):
pass
@@ -90,6 +95,98 @@ async def stderr(self) -> str:
return self._stderr
+@dataclass
+class _FakePtyCommandFinished:
+ exit_code: int
+
+
+class _FakePtyCommand:
+ def __init__(self, *, exit_code: int = 0) -> None:
+ self.exit_code = exit_code
+ self.wait_calls = 0
+
+ async def wait(self) -> _FakePtyCommandFinished:
+ self.wait_calls += 1
+ return _FakePtyCommandFinished(exit_code=self.exit_code)
+
+
+class _FakeAsyncPTYSession:
+ open_calls: list[dict[str, object]] = []
+ next_session: _FakeAsyncPTYSession | None = None
+
+ def __init__(
+ self,
+ *,
+ chunks: list[bytes] | None = None,
+ exit_code: int = 0,
+ ) -> None:
+ self.command = _FakePtyCommand(exit_code=exit_code)
+ self.ready_calls = 0
+ self.write_calls: list[bytes] = []
+ self.close_calls = 0
+ self._queue: _ASYNCIO.Queue[bytes | None] = _ASYNCIO.Queue()
+ self._closed = False
+ self.stream = _FakePtyStream(self)
+ for chunk in chunks or []:
+ self.feed_output(chunk)
+ if chunks is not None:
+ self.finish()
+
+ @classmethod
+ def reset(cls) -> None:
+ cls.open_calls = []
+ cls.next_session = None
+
+ @classmethod
+ async def open(
+ cls, sandbox: object, command: list[str], **kwargs: object
+ ) -> _FakeAsyncPTYSession:
+ cls.open_calls.append({"sandbox": sandbox, "command": list(command), **kwargs})
+ if cls.next_session is None:
+ cls.next_session = _FakeAsyncPTYSession()
+ return cls.next_session
+
+ async def ready(self) -> None:
+ self.ready_calls += 1
+
+ def feed_output(self, chunk: bytes) -> None:
+ self._queue.put_nowait(chunk)
+
+ def finish(self) -> None:
+ self._queue.put_nowait(None)
+
+ async def close(self) -> None:
+ self.close_calls += 1
+ self._closed = True
+ self.finish()
+
+
+class _FakePtyStream:
+ def __init__(self, session: _FakeAsyncPTYSession) -> None:
+ self._session = session
+
+ def __aiter__(self) -> _FakePtyStream:
+ return self
+
+ async def __anext__(self) -> bytes:
+ try:
+ return await self.receive()
+ except EndOfStream:
+ raise StopAsyncIteration from None
+
+ async def send(self, data: bytes) -> None:
+ self._session.write_calls.append(data)
+
+ async def receive(self, max_bytes: int = 65536) -> bytes:
+ chunk = await self._session._queue.get()
+ if chunk is None:
+ raise EndOfStream
+ return chunk[:max_bytes]
+
+ async def aclose(self) -> None:
+ await self._session.close()
+
+
class _FakeClient:
def __init__(self) -> None:
self.closed = False
@@ -344,9 +441,11 @@ async def restore_after_snapshot(
def _load_vercel_module(monkeypatch: pytest.MonkeyPatch) -> Any:
_FakeAsyncSandbox.reset()
+ _FakeAsyncPTYSession.reset()
fake_vercel = types.ModuleType("vercel")
fake_vercel_sandbox = cast(Any, types.ModuleType("vercel.sandbox"))
+ fake_vercel_sandbox_pty = cast(Any, types.ModuleType("vercel.sandbox.pty"))
fake_vercel_sandbox.AsyncSandbox = _FakeAsyncSandbox
fake_vercel_sandbox.NetworkPolicy = NetworkPolicy
fake_vercel_sandbox.NetworkPolicyCustom = NetworkPolicyCustom
@@ -355,9 +454,11 @@ def _load_vercel_module(monkeypatch: pytest.MonkeyPatch) -> Any:
fake_vercel_sandbox.Resources = Resources
fake_vercel_sandbox.SandboxStatus = types.SimpleNamespace(RUNNING="running")
fake_vercel_sandbox.SnapshotSource = SnapshotSource
+ fake_vercel_sandbox_pty.AsyncPTYSession = _FakeAsyncPTYSession
monkeypatch.setitem(sys.modules, "vercel", fake_vercel)
monkeypatch.setitem(sys.modules, "vercel.sandbox", fake_vercel_sandbox)
+ monkeypatch.setitem(sys.modules, "vercel.sandbox.pty", fake_vercel_sandbox_pty)
sys.modules.pop("agents.extensions.sandbox.vercel.sandbox", None)
sys.modules.pop("agents.extensions.sandbox.vercel", None)
@@ -376,7 +477,7 @@ def test_vercel_package_re_exports_backend_symbols(monkeypatch: pytest.MonkeyPat
assert package_module.VercelSandboxSessionState is vercel_module.VercelSandboxSessionState
-def test_vercel_supports_pty_is_disabled_until_provider_methods_exist(
+def test_vercel_supports_pty_when_interactive_and_sdk_support_is_available(
monkeypatch: pytest.MonkeyPatch,
) -> None:
vercel_module = _load_vercel_module(monkeypatch)
@@ -397,7 +498,75 @@ def test_vercel_supports_pty_is_disabled_until_provider_methods_exist(
)
assert not vercel_module.VercelSandboxSession.from_state(noninteractive).supports_pty()
- assert not vercel_module.VercelSandboxSession.from_state(interactive).supports_pty()
+ assert vercel_module.VercelSandboxSession.from_state(interactive).supports_pty()
+
+
+@pytest.mark.asyncio
+async def test_vercel_pty_exec_start_tracks_live_session(monkeypatch: pytest.MonkeyPatch) -> None:
+ vercel_module = _load_vercel_module(monkeypatch)
+ state = vercel_module.VercelSandboxSessionState(
+ session_id="00000000-0000-0000-0000-000000000201",
+ manifest=Manifest(root="/vercel/sandbox/project"),
+ snapshot=NoopSnapshot(id="snapshot"),
+ sandbox_id="sandbox-pty-live",
+ interactive=True,
+ )
+ sandbox = _FakeAsyncSandbox(sandbox_id="sandbox-pty-live")
+ session = vercel_module.VercelSandboxSession.from_state(state, sandbox=sandbox)
+ _FakeAsyncPTYSession.next_session = _FakeAsyncPTYSession()
+
+ update = await session.pty_exec_start("python", "-i", shell=False, yield_time_s=0.25)
+
+ assert update.process_id is not None
+ assert update.exit_code is None
+ assert update.output == b""
+ assert _FakeAsyncPTYSession.open_calls == [
+ {
+ "sandbox": sandbox,
+ "command": ["python", "-i"],
+ "cwd": "/vercel/sandbox/project",
+ "_connection_timeout": 30.0,
+ }
+ ]
+ assert _FakeAsyncPTYSession.next_session.ready_calls == 1
+
+ await session.pty_terminate_all()
+ assert _FakeAsyncPTYSession.next_session.close_calls == 1
+
+
+@pytest.mark.asyncio
+async def test_vercel_pty_write_collects_output_and_exit(monkeypatch: pytest.MonkeyPatch) -> None:
+ vercel_module = _load_vercel_module(monkeypatch)
+ state = vercel_module.VercelSandboxSessionState(
+ session_id="00000000-0000-0000-0000-000000000202",
+ manifest=Manifest(root="/vercel/sandbox/project"),
+ snapshot=NoopSnapshot(id="snapshot"),
+ sandbox_id="sandbox-pty-exit",
+ interactive=True,
+ )
+ sandbox = _FakeAsyncSandbox(sandbox_id="sandbox-pty-exit")
+ session = vercel_module.VercelSandboxSession.from_state(state, sandbox=sandbox)
+ fake_pty = _FakeAsyncPTYSession()
+ _FakeAsyncPTYSession.next_session = fake_pty
+
+ started = await session.pty_exec_start("python", "-i", shell=False, yield_time_s=0.25)
+ assert started.process_id is not None
+
+ fake_pty.feed_output(b"hello from pty\n")
+ fake_pty.command.exit_code = 7
+ fake_pty.finish()
+
+ update = await session.pty_write_stdin(
+ session_id=cast(int, started.process_id),
+ chars="print('x')\n",
+ yield_time_s=0.25,
+ )
+
+ assert fake_pty.write_calls == [b"print('x')\n"]
+ assert update.process_id is None
+ assert update.exit_code == 7
+ assert update.output == b"hello from pty\n"
+ assert fake_pty.close_calls == 1
@pytest.mark.asyncio
diff --git a/uv.lock b/uv.lock
index bf401d354a..a59ac90ba2 100644
--- a/uv.lock
+++ b/uv.lock
@@ -2562,7 +2562,7 @@ requires-dist = [
{ name = "textual", marker = "extra == 'temporal'", specifier = ">=8.2.3,<8.3" },
{ name = "types-requests", specifier = ">=2.0,<3" },
{ name = "typing-extensions", specifier = ">=4.12.2,<5" },
- { name = "vercel", marker = "extra == 'vercel'", specifier = ">=0.5.6,<0.6" },
+ { name = "vercel", marker = "extra == 'vercel'", specifier = ">=0.5.7,<0.6" },
{ name = "websockets", specifier = ">=15.0,<16" },
{ name = "websockets", marker = "extra == 'realtime'", specifier = ">=15.0,<16" },
{ name = "websockets", marker = "extra == 'voice'", specifier = ">=15.0,<16" },
@@ -4203,7 +4203,7 @@ wheels = [
[[package]]
name = "vercel"
-version = "0.5.6"
+version = "0.5.7"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
@@ -4215,9 +4215,9 @@ dependencies = [
{ name = "vercel-workers", marker = "python_full_version >= '3.12'" },
{ name = "websockets" },
]
-sdist = { url = "https://files.pythonhosted.org/packages/73/2a/acf30370e110c839b198cdf08ccfbacc9e11db91fc5c0b185805b318232b/vercel-0.5.6.tar.gz", hash = "sha256:c5aacd81739ff22771f9c3bba6b764de1589e25fefce6ce5ded32261128f8710", size = 115452, upload-time = "2026-04-13T21:52:40.815Z" }
+sdist = { url = "https://files.pythonhosted.org/packages/d7/68/a671ebc656afbb5e25fb88c681b61511cc13670ea771c87b2f711782022b/vercel-0.5.7.tar.gz", hash = "sha256:8070ea1b33962adfed98498f9273f24ea2066a20c74d38643d479d8280801c6e", size = 118597, upload-time = "2026-04-15T17:58:20.424Z" }
wheels = [
- { url = "https://files.pythonhosted.org/packages/bb/70/0bf6374905d8b7eccea8f33e67c8ec8b8ffcb5eb54c40fff52edbc976514/vercel-0.5.6-py3-none-any.whl", hash = "sha256:9f5f6c2f7bcec642809338bc1c507ea91b41b977ed3be16f4e24bd5065b8a1ee", size = 135164, upload-time = "2026-04-13T21:52:39.15Z" },
+ { url = "https://files.pythonhosted.org/packages/c7/2e/bacf1ccc0ec95464a68398e64bf5e36f859cd51f3e379623f103802f85f1/vercel-0.5.7-py3-none-any.whl", hash = "sha256:90eb2689c34e403db2170fec3eb47e1a91092c200d91baf4b4501fb3e2a44d28", size = 139698, upload-time = "2026-04-15T17:58:18.945Z" },
]
[[package]]