豆豆友情提示:这是一个非官方 GitHub 代理镜像,主要用于网络测试或访问加速。请勿在此进行登录、注册或处理任何敏感信息。进行这些操作请务必访问官方网站 github.com。 Raw 内容也通过此代理提供。
Skip to content

Commit 531b9ea

Browse files
author
Ubuntu
committed
feat(streaming): emit ReasoningDeltaEvent for reasoning/thinking deltas (openai#825)
Add a new ReasoningDeltaEvent to StreamEvent so callers can react to reasoning/thinking tokens in real time without unpacking low-level raw response events. The event is emitted whenever a ResponseReasoningSummaryTextDeltaEvent (o-series extended thinking via the Responses API) or a ResponseReasoningTextDeltaEvent (third-party models like DeepSeek-R1 via LiteLLM) passes through the stream. The underlying RawResponsesStreamEvent is still emitted as well, so nothing breaks for consumers that already inspect raw events. Fields: delta - the incremental text fragment from this chunk snapshot - full accumulated reasoning text so far in this turn type - always 'reasoning_delta' Closes openai#825
1 parent 5c9fb2c commit 531b9ea

File tree

5 files changed

+256
-56
lines changed

5 files changed

+256
-56
lines changed

src/agents/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
from .stream_events import (
118118
AgentUpdatedStreamEvent,
119119
RawResponsesStreamEvent,
120+
ReasoningDeltaEvent,
120121
RunItemStreamEvent,
121122
StreamEvent,
122123
)
@@ -393,6 +394,7 @@ def enable_verbose_stdout_logging():
393394
"RawResponsesStreamEvent",
394395
"RunItemStreamEvent",
395396
"AgentUpdatedStreamEvent",
397+
"ReasoningDeltaEvent",
396398
"StreamEvent",
397399
"FunctionTool",
398400
"FunctionToolResult",

src/agents/run_internal/run_loop.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@
1212
from typing import Any, TypeVar, cast
1313

1414
from openai.types.responses import Response, ResponseCompletedEvent, ResponseOutputItemDoneEvent
15+
from openai.types.responses.response_reasoning_text_delta_event import (
16+
ResponseReasoningTextDeltaEvent,
17+
)
18+
from openai.types.responses.response_reasoning_summary_text_delta_event import (
19+
ResponseReasoningSummaryTextDeltaEvent,
20+
)
1521
from openai.types.responses.response_output_item import McpCall, McpListTools
1622
from openai.types.responses.response_prompt_param import ResponsePromptParam
1723
from openai.types.responses.response_reasoning_item import ResponseReasoningItem
@@ -60,6 +66,7 @@
6066
from ..stream_events import (
6167
AgentUpdatedStreamEvent,
6268
RawResponsesStreamEvent,
69+
ReasoningDeltaEvent,
6370
RunItemStreamEvent,
6471
)
6572
from ..tool import FunctionTool, Tool, dispose_resolved_computers
@@ -1103,6 +1110,8 @@ async def run_single_turn_streamed(
11031110
emitted_tool_call_ids: set[str] = set()
11041111
emitted_reasoning_item_ids: set[str] = set()
11051112
emitted_tool_search_fingerprints: set[str] = set()
1113+
# Accumulated reasoning text for ReasoningDeltaEvent snapshot field.
1114+
_reasoning_snapshot: str = ""
11061115
# Precompute the lookup map used for streaming descriptions. Function tools use the same
11071116
# collision-free lookup keys as runtime dispatch, including deferred top-level aliases.
11081117
tool_map: dict[NamedToolLookupKey, Any] = cast(
@@ -1286,6 +1295,21 @@ async def rewind_model_request() -> None:
12861295
async for event in retry_stream:
12871296
streamed_result._event_queue.put_nowait(RawResponsesStreamEvent(data=event))
12881297

1298+
# Emit a ReasoningDeltaEvent for reasoning/thinking deltas so consumers don't have
1299+
# to unwrap the raw event themselves.
1300+
if isinstance(event, ResponseReasoningSummaryTextDeltaEvent):
1301+
delta_text: str = event.delta or ""
1302+
_reasoning_snapshot += delta_text
1303+
streamed_result._event_queue.put_nowait(
1304+
ReasoningDeltaEvent(delta=delta_text, snapshot=_reasoning_snapshot)
1305+
)
1306+
elif isinstance(event, ResponseReasoningTextDeltaEvent):
1307+
delta_text = event.delta or ""
1308+
_reasoning_snapshot += delta_text
1309+
streamed_result._event_queue.put_nowait(
1310+
ReasoningDeltaEvent(delta=delta_text, snapshot=_reasoning_snapshot)
1311+
)
1312+
12891313
terminal_response: Response | None = None
12901314
if isinstance(event, ResponseCompletedEvent):
12911315
terminal_response = event.response

src/agents/stream_events.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,30 @@ class AgentUpdatedStreamEvent:
6060
type: Literal["agent_updated_stream_event"] = "agent_updated_stream_event"
6161

6262

63-
StreamEvent: TypeAlias = Union[RawResponsesStreamEvent, RunItemStreamEvent, AgentUpdatedStreamEvent]
63+
@dataclass
64+
class ReasoningDeltaEvent:
65+
"""Emitted when a reasoning/thinking delta is received from the model during streaming.
66+
67+
This is a convenience wrapper over the low-level
68+
``response.reasoning_summary_text.delta`` and ``response.reasoning_text.delta`` raw
69+
events. Both OpenAI o-series reasoning summaries and third-party
70+
``delta.reasoning`` fields (e.g. DeepSeek-R1 via LiteLLM) are surfaced here.
71+
"""
72+
73+
delta: str
74+
"""The incremental reasoning text fragment."""
75+
76+
snapshot: str
77+
"""The full reasoning text accumulated so far in this turn."""
78+
79+
type: Literal["reasoning_delta"] = "reasoning_delta"
80+
"""The type of the event."""
81+
82+
83+
StreamEvent: TypeAlias = Union[
84+
RawResponsesStreamEvent,
85+
RunItemStreamEvent,
86+
AgentUpdatedStreamEvent,
87+
ReasoningDeltaEvent,
88+
]
6489
"""A streaming event from an agent."""
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
"""Tests for ReasoningDeltaEvent stream event (issue #825)."""
2+
3+
from __future__ import annotations
4+
5+
import pytest
6+
7+
from agents import Agent, Runner
8+
from agents.stream_events import ReasoningDeltaEvent, RawResponsesStreamEvent
9+
10+
from openai.types.responses.response_reasoning_item import ResponseReasoningItem, Summary
11+
12+
from .fake_model import FakeModel
13+
from .test_responses import get_text_message
14+
15+
16+
def _make_reasoning_item(text: str) -> ResponseReasoningItem:
17+
return ResponseReasoningItem(
18+
id="rs_test",
19+
type="reasoning",
20+
summary=[Summary(text=text, type="summary_text")],
21+
)
22+
23+
24+
@pytest.mark.asyncio
25+
async def test_reasoning_delta_event_emitted_during_streaming() -> None:
26+
"""ReasoningDeltaEvent is emitted when the model streams a reasoning summary delta."""
27+
model = FakeModel()
28+
model.set_next_output([
29+
_make_reasoning_item("Let me think..."),
30+
get_text_message("Answer"),
31+
])
32+
33+
agent = Agent(name="A", model=model)
34+
result = Runner.run_streamed(agent, input="hi")
35+
36+
reasoning_deltas: list[ReasoningDeltaEvent] = []
37+
async for event in result.stream_events():
38+
if isinstance(event, ReasoningDeltaEvent):
39+
reasoning_deltas.append(event)
40+
41+
assert len(reasoning_deltas) >= 1
42+
assert all(isinstance(e.delta, str) for e in reasoning_deltas)
43+
assert all(isinstance(e.snapshot, str) for e in reasoning_deltas)
44+
assert all(e.type == "reasoning_delta" for e in reasoning_deltas)
45+
46+
47+
@pytest.mark.asyncio
48+
async def test_reasoning_delta_snapshot_accumulates() -> None:
49+
"""The snapshot field grows monotonically across delta events."""
50+
model = FakeModel()
51+
model.set_next_output([
52+
_make_reasoning_item("Hello world"),
53+
get_text_message("done"),
54+
])
55+
56+
agent = Agent(name="A", model=model)
57+
result = Runner.run_streamed(agent, input="hi")
58+
59+
snapshots: list[str] = []
60+
async for event in result.stream_events():
61+
if isinstance(event, ReasoningDeltaEvent):
62+
snapshots.append(event.snapshot)
63+
64+
# Each snapshot must be at least as long as the previous one
65+
for i in range(1, len(snapshots)):
66+
assert len(snapshots[i]) >= len(snapshots[i - 1])
67+
68+
# Last snapshot must contain the full reasoning text
69+
if snapshots:
70+
assert "Hello world" in snapshots[-1]
71+
72+
73+
@pytest.mark.asyncio
74+
async def test_no_reasoning_delta_event_without_reasoning() -> None:
75+
"""ReasoningDeltaEvent is not emitted when there is no reasoning in the response."""
76+
model = FakeModel()
77+
model.set_next_output([get_text_message("plain text answer")])
78+
79+
agent = Agent(name="A", model=model)
80+
result = Runner.run_streamed(agent, input="hi")
81+
82+
async for event in result.stream_events():
83+
assert not isinstance(event, ReasoningDeltaEvent), (
84+
"Got unexpected ReasoningDeltaEvent for a plain text response"
85+
)
86+
87+
88+
@pytest.mark.asyncio
89+
async def test_reasoning_delta_event_type_field() -> None:
90+
"""ReasoningDeltaEvent.type is always 'reasoning_delta'."""
91+
model = FakeModel()
92+
model.set_next_output([
93+
_make_reasoning_item("some reasoning"),
94+
get_text_message("answer"),
95+
])
96+
97+
agent = Agent(name="A", model=model)
98+
result = Runner.run_streamed(agent, input="hi")
99+
100+
async for event in result.stream_events():
101+
if isinstance(event, ReasoningDeltaEvent):
102+
assert event.type == "reasoning_delta"
103+
break
104+
105+
106+
@pytest.mark.asyncio
107+
async def test_raw_response_events_still_emitted_alongside_reasoning_delta() -> None:
108+
"""RawResponsesStreamEvent is still emitted even when ReasoningDeltaEvent is also emitted."""
109+
model = FakeModel()
110+
model.set_next_output([
111+
_make_reasoning_item("thinking"),
112+
get_text_message("result"),
113+
])
114+
115+
agent = Agent(name="A", model=model)
116+
result = Runner.run_streamed(agent, input="hi")
117+
118+
raw_events: list[RawResponsesStreamEvent] = []
119+
reasoning_events: list[ReasoningDeltaEvent] = []
120+
121+
async for event in result.stream_events():
122+
if isinstance(event, RawResponsesStreamEvent):
123+
raw_events.append(event)
124+
elif isinstance(event, ReasoningDeltaEvent):
125+
reasoning_events.append(event)
126+
127+
# Both types should be present
128+
assert len(raw_events) > 0
129+
assert len(reasoning_events) > 0
130+
131+
132+
@pytest.mark.asyncio
133+
async def test_reasoning_delta_event_importable_from_agents() -> None:
134+
"""ReasoningDeltaEvent can be imported directly from the agents package."""
135+
from agents import ReasoningDeltaEvent as RDE
136+
assert RDE is ReasoningDeltaEvent
137+
138+
139+
def test_reasoning_delta_event_dataclass() -> None:
140+
"""ReasoningDeltaEvent is a proper dataclass with expected fields."""
141+
event = ReasoningDeltaEvent(delta="chunk", snapshot="full chunk")
142+
assert event.delta == "chunk"
143+
assert event.snapshot == "full chunk"
144+
assert event.type == "reasoning_delta"

0 commit comments

Comments
 (0)