豆豆友情提示:这是一个非官方 GitHub 代理镜像,主要用于网络测试或访问加速。请勿在此进行登录、注册或处理任何敏感信息。进行这些操作请务必访问官方网站 github.com。 Raw 内容也通过此代理提供。
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
from .stream_events import (
AgentUpdatedStreamEvent,
RawResponsesStreamEvent,
ReasoningDeltaEvent,
RunItemStreamEvent,
StreamEvent,
)
Expand Down Expand Up @@ -393,6 +394,7 @@ def enable_verbose_stdout_logging():
"RawResponsesStreamEvent",
"RunItemStreamEvent",
"AgentUpdatedStreamEvent",
"ReasoningDeltaEvent",
"StreamEvent",
"FunctionTool",
"FunctionToolResult",
Expand Down
32 changes: 31 additions & 1 deletion src/agents/run_internal/run_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@
from collections.abc import Awaitable, Callable, Mapping
from typing import Any, TypeVar, cast

from openai.types.responses import Response, ResponseCompletedEvent, ResponseOutputItemDoneEvent
from openai.types.responses import Response, ResponseCompletedEvent, ResponseCreatedEvent, ResponseOutputItemDoneEvent
from openai.types.responses.response_reasoning_text_delta_event import (
ResponseReasoningTextDeltaEvent,
)
from openai.types.responses.response_reasoning_summary_text_delta_event import (
ResponseReasoningSummaryTextDeltaEvent,
)
from openai.types.responses.response_output_item import McpCall, McpListTools
from openai.types.responses.response_prompt_param import ResponsePromptParam
from openai.types.responses.response_reasoning_item import ResponseReasoningItem
Expand Down Expand Up @@ -60,6 +66,7 @@
from ..stream_events import (
AgentUpdatedStreamEvent,
RawResponsesStreamEvent,
ReasoningDeltaEvent,
RunItemStreamEvent,
)
from ..tool import FunctionTool, Tool, dispose_resolved_computers
Expand Down Expand Up @@ -1103,6 +1110,8 @@ async def run_single_turn_streamed(
emitted_tool_call_ids: set[str] = set()
emitted_reasoning_item_ids: set[str] = set()
emitted_tool_search_fingerprints: set[str] = set()
# Accumulated reasoning text for ReasoningDeltaEvent snapshot field.
_reasoning_snapshot: str = ""
Comment thread
coderabbitai[bot] marked this conversation as resolved.
# Precompute the lookup map used for streaming descriptions. Function tools use the same
# collision-free lookup keys as runtime dispatch, including deferred top-level aliases.
tool_map: dict[NamedToolLookupKey, Any] = cast(
Expand Down Expand Up @@ -1286,6 +1295,27 @@ async def rewind_model_request() -> None:
async for event in retry_stream:
streamed_result._event_queue.put_nowait(RawResponsesStreamEvent(data=event))

# Reset the reasoning snapshot at the start of each new response attempt
# (e.g., after a retry) so the snapshot field never contains stale text
# from a failed previous attempt.
if isinstance(event, ResponseCreatedEvent):
_reasoning_snapshot = ""

# Emit a ReasoningDeltaEvent for reasoning/thinking deltas so consumers don't have
# to unwrap the raw event themselves.
if isinstance(event, ResponseReasoningSummaryTextDeltaEvent):
delta_text: str = event.delta or ""
_reasoning_snapshot += delta_text
streamed_result._event_queue.put_nowait(
ReasoningDeltaEvent(delta=delta_text, snapshot=_reasoning_snapshot)
)
elif isinstance(event, ResponseReasoningTextDeltaEvent):
delta_text = event.delta or ""
_reasoning_snapshot += delta_text
streamed_result._event_queue.put_nowait(
ReasoningDeltaEvent(delta=delta_text, snapshot=_reasoning_snapshot)
)

terminal_response: Response | None = None
if isinstance(event, ResponseCompletedEvent):
terminal_response = event.response
Expand Down
27 changes: 26 additions & 1 deletion src/agents/stream_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,30 @@ class AgentUpdatedStreamEvent:
type: Literal["agent_updated_stream_event"] = "agent_updated_stream_event"


StreamEvent: TypeAlias = Union[RawResponsesStreamEvent, RunItemStreamEvent, AgentUpdatedStreamEvent]
@dataclass
class ReasoningDeltaEvent:
"""Emitted when a reasoning/thinking delta is received from the model during streaming.

This is a convenience wrapper over the low-level
``response.reasoning_summary_text.delta`` and ``response.reasoning_text.delta`` raw
events. Both OpenAI o-series reasoning summaries and third-party
``delta.reasoning`` fields (e.g. DeepSeek-R1 via LiteLLM) are surfaced here.
"""

delta: str
"""The incremental reasoning text fragment."""

snapshot: str
"""The full reasoning text accumulated so far in this turn."""

type: Literal["reasoning_delta"] = "reasoning_delta"
"""The type of the event."""


StreamEvent: TypeAlias = Union[
RawResponsesStreamEvent,
RunItemStreamEvent,
AgentUpdatedStreamEvent,
ReasoningDeltaEvent,
]
"""A streaming event from an agent."""
147 changes: 147 additions & 0 deletions tests/test_reasoning_delta_stream_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""Tests for ReasoningDeltaEvent stream event (issue #825)."""

from __future__ import annotations

import pytest

from agents import Agent, Runner
from agents.stream_events import ReasoningDeltaEvent, RawResponsesStreamEvent

from openai.types.responses.response_reasoning_item import ResponseReasoningItem, Summary

from .fake_model import FakeModel
from .test_responses import get_text_message


def _make_reasoning_item(text: str) -> ResponseReasoningItem:
return ResponseReasoningItem(
id="rs_test",
type="reasoning",
summary=[Summary(text=text, type="summary_text")],
)


@pytest.mark.asyncio
async def test_reasoning_delta_event_emitted_during_streaming() -> None:
"""ReasoningDeltaEvent is emitted when the model streams a reasoning summary delta."""
model = FakeModel()
model.set_next_output([
_make_reasoning_item("Let me think..."),
get_text_message("Answer"),
])

agent = Agent(name="A", model=model)
result = Runner.run_streamed(agent, input="hi")

reasoning_deltas: list[ReasoningDeltaEvent] = []
async for event in result.stream_events():
if isinstance(event, ReasoningDeltaEvent):
reasoning_deltas.append(event)

assert len(reasoning_deltas) >= 1
assert all(isinstance(e.delta, str) for e in reasoning_deltas)
assert all(isinstance(e.snapshot, str) for e in reasoning_deltas)
assert all(e.type == "reasoning_delta" for e in reasoning_deltas)


@pytest.mark.asyncio
async def test_reasoning_delta_snapshot_accumulates() -> None:
"""The snapshot field grows monotonically across delta events."""
model = FakeModel()
model.set_next_output([
_make_reasoning_item("Hello world"),
get_text_message("done"),
])

agent = Agent(name="A", model=model)
result = Runner.run_streamed(agent, input="hi")

snapshots: list[str] = []
async for event in result.stream_events():
if isinstance(event, ReasoningDeltaEvent):
snapshots.append(event.snapshot)

# Each snapshot must be at least as long as the previous one
for i in range(1, len(snapshots)):
assert len(snapshots[i]) >= len(snapshots[i - 1])

# Last snapshot must contain the full reasoning text
if snapshots:
assert "Hello world" in snapshots[-1]
Comment on lines +59 to +70
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Avoid vacuous pass when no reasoning snapshots are emitted.

Line 69 currently guards the final assertion with if snapshots, so the test passes even when zero ReasoningDeltaEvent objects are produced.

✅ Minimal hardening diff
     snapshots: list[str] = []
     async for event in result.stream_events():
         if isinstance(event, ReasoningDeltaEvent):
             snapshots.append(event.snapshot)

+    assert snapshots, "Expected at least one ReasoningDeltaEvent snapshot"
+
     # Each snapshot must be at least as long as the previous one
     for i in range(1, len(snapshots)):
         assert len(snapshots[i]) >= len(snapshots[i - 1])

     # Last snapshot must contain the full reasoning text
-    if snapshots:
-        assert "Hello world" in snapshots[-1]
+    assert "Hello world" in snapshots[-1]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
snapshots: list[str] = []
async for event in result.stream_events():
if isinstance(event, ReasoningDeltaEvent):
snapshots.append(event.snapshot)
# Each snapshot must be at least as long as the previous one
for i in range(1, len(snapshots)):
assert len(snapshots[i]) >= len(snapshots[i - 1])
# Last snapshot must contain the full reasoning text
if snapshots:
assert "Hello world" in snapshots[-1]
snapshots: list[str] = []
async for event in result.stream_events():
if isinstance(event, ReasoningDeltaEvent):
snapshots.append(event.snapshot)
assert snapshots, "Expected at least one ReasoningDeltaEvent snapshot"
# Each snapshot must be at least as long as the previous one
for i in range(1, len(snapshots)):
assert len(snapshots[i]) >= len(snapshots[i - 1])
# Last snapshot must contain the full reasoning text
assert "Hello world" in snapshots[-1]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_reasoning_delta_stream_event.py` around lines 59 - 70, The test
currently allows a vacuous pass when no ReasoningDeltaEvent snapshots are
emitted; update the test around the snapshots collection from
result.stream_events() to require at least one snapshot before performing
length-order and content checks: after collecting snapshots (variable snapshots)
add an assertion that snapshots is not empty (e.g., assert snapshots, "no
reasoning snapshots emitted") so the subsequent loop and final check that "Hello
world" appears in snapshots[-1] will fail if no ReasoningDeltaEvent objects were
produced.



@pytest.mark.asyncio
async def test_no_reasoning_delta_event_without_reasoning() -> None:
"""ReasoningDeltaEvent is not emitted when there is no reasoning in the response."""
model = FakeModel()
model.set_next_output([get_text_message("plain text answer")])

agent = Agent(name="A", model=model)
result = Runner.run_streamed(agent, input="hi")

async for event in result.stream_events():
assert not isinstance(event, ReasoningDeltaEvent), (
"Got unexpected ReasoningDeltaEvent for a plain text response"
)
Comment on lines +82 to +85
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Assert that the stream produced events in the negative-case test.

Line 82–85 verifies event type but not stream liveness. A fully empty stream would incorrectly pass this test.

✅ Minimal hardening diff
-    async for event in result.stream_events():
+    saw_event = False
+    async for event in result.stream_events():
+        saw_event = True
         assert not isinstance(event, ReasoningDeltaEvent), (
             "Got unexpected ReasoningDeltaEvent for a plain text response"
         )
+    assert saw_event, "Expected at least one streamed event"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async for event in result.stream_events():
assert not isinstance(event, ReasoningDeltaEvent), (
"Got unexpected ReasoningDeltaEvent for a plain text response"
)
saw_event = False
async for event in result.stream_events():
saw_event = True
assert not isinstance(event, ReasoningDeltaEvent), (
"Got unexpected ReasoningDeltaEvent for a plain text response"
)
assert saw_event, "Expected at least one streamed event"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_reasoning_delta_stream_event.py` around lines 82 - 85, The test
currently only asserts that no individual event is a ReasoningDeltaEvent but
doesn't ensure the stream produced any events; update the test that uses
result.stream_events() to also verify the stream yielded at least one event
(e.g., accumulate events or increment a counter while iterating) and assert the
collected events list length (or counter) is greater than zero, while still
asserting none of the yielded events are instances of ReasoningDeltaEvent.



@pytest.mark.asyncio
async def test_reasoning_delta_event_type_field() -> None:
"""ReasoningDeltaEvent.type is always 'reasoning_delta'."""
model = FakeModel()
model.set_next_output([
_make_reasoning_item("some reasoning"),
get_text_message("answer"),
])

agent = Agent(name="A", model=model)
result = Runner.run_streamed(agent, input="hi")

found = False
async for event in result.stream_events():
if isinstance(event, ReasoningDeltaEvent):
assert event.type == "reasoning_delta"
found = True
break
assert found, "Expected at least one ReasoningDeltaEvent but none were emitted"

Comment thread
coderabbitai[bot] marked this conversation as resolved.

@pytest.mark.asyncio
async def test_raw_response_events_still_emitted_alongside_reasoning_delta() -> None:
"""RawResponsesStreamEvent is still emitted even when ReasoningDeltaEvent is also emitted."""
model = FakeModel()
model.set_next_output([
_make_reasoning_item("thinking"),
get_text_message("result"),
])

agent = Agent(name="A", model=model)
result = Runner.run_streamed(agent, input="hi")

raw_events: list[RawResponsesStreamEvent] = []
reasoning_events: list[ReasoningDeltaEvent] = []

async for event in result.stream_events():
if isinstance(event, RawResponsesStreamEvent):
raw_events.append(event)
elif isinstance(event, ReasoningDeltaEvent):
reasoning_events.append(event)

# Both types should be present
assert len(raw_events) > 0
assert len(reasoning_events) > 0


@pytest.mark.asyncio
async def test_reasoning_delta_event_importable_from_agents() -> None:
"""ReasoningDeltaEvent can be imported directly from the agents package."""
from agents import ReasoningDeltaEvent as RDE
assert RDE is ReasoningDeltaEvent


def test_reasoning_delta_event_dataclass() -> None:
"""ReasoningDeltaEvent is a proper dataclass with expected fields."""
event = ReasoningDeltaEvent(delta="chunk", snapshot="full chunk")
assert event.delta == "chunk"
assert event.snapshot == "full chunk"
assert event.type == "reasoning_delta"
Loading