feat(streaming): emit ReasoningDeltaEvent for reasoning/thinking deltas (#825)#3
feat(streaming): emit ReasoningDeltaEvent for reasoning/thinking deltas (#825)#3adityasingh2400 wants to merge 2 commits intomainfrom
Conversation
📝 WalkthroughWalkthroughA new Changes
Sequence DiagramsequenceDiagram
participant Client
participant Runner
participant Agent
participant Model
participant StreamLoop
participant EventQueue
Client->>Runner: run_streamed()
Runner->>Agent: start execution (streaming)
Agent->>Model: request reasoning + response
Model-->>Agent: ResponseReasoningTextDeltaEvent / ResponseReasoningSummaryTextDeltaEvent
Agent->>StreamLoop: deliver response events
loop for each reasoning delta
StreamLoop->>StreamLoop: append event.delta → _reasoning_snapshot
StreamLoop->>EventQueue: enqueue ReasoningDeltaEvent(delta, snapshot)
end
EventQueue-->>Client: stream_events() yields ReasoningDeltaEvent and other events
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/agents/run_internal/run_loop.py`:
- Around line 1113-1114: The _reasoning_snapshot field is never cleared across
stream retries, causing ReasoningDeltaEvent.snapshot to include duplicated text;
import ResponseCreatedEvent and, inside the event loop where stream events are
handled (the loop that processes events from stream_response_with_retry /
get_stream after rewind()), detect events of type ResponseCreatedEvent and reset
_reasoning_snapshot = "" when such an event is received to mark a fresh response
attempt; ensure you reference and update the existing _reasoning_snapshot
variable (not a new local) so subsequent ReasoningDeltaEvent handling produces a
clean snapshot for the new stream.
In `@tests/test_reasoning_delta_stream_event.py`:
- Around line 100-104: The test currently breaks out when a ReasoningDeltaEvent
is seen but does nothing if none are emitted; update the test to explicitly fail
when no ReasoningDeltaEvent is observed by either setting a flag (e.g.,
seen_reasoning = False) and asserting seen_reasoning is True after the async for
loop, or by using an else branch on the loop to raise an AssertionError;
reference the async iterator result.stream_events(), the ReasoningDeltaEvent
type check and the event.type assertion to locate where to add the post-loop
failure assertion.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: da5ae3ef-04e2-4c5c-a0aa-b83bf9abdb96
📒 Files selected for processing (5)
src/agents/__init__.pysrc/agents/run_internal/run_loop.pysrc/agents/stream_events.pytests/test_reasoning_delta_stream_event.pytests/test_stream_events.py
- Import ResponseCreatedEvent and reset _reasoning_snapshot to "" when a ResponseCreatedEvent is received inside the retry stream loop, fixing the bug where snapshot text would be duplicated across retries - In test_reasoning_delta_event_type_field: add found=False flag and assert found after the loop so the test properly fails when no ReasoningDeltaEvent is emitted
…as (openai#825) Add a new ReasoningDeltaEvent to StreamEvent so callers can react to reasoning/thinking tokens in real time without unpacking low-level raw response events. The event is emitted whenever a ResponseReasoningSummaryTextDeltaEvent (o-series extended thinking via the Responses API) or a ResponseReasoningTextDeltaEvent (third-party models like DeepSeek-R1 via LiteLLM) passes through the stream. The underlying RawResponsesStreamEvent is still emitted as well, so nothing breaks for consumers that already inspect raw events. Fields: delta - the incremental text fragment from this chunk snapshot - full accumulated reasoning text so far in this turn type - always 'reasoning_delta' Closes openai#825
- Import ResponseCreatedEvent and reset _reasoning_snapshot to "" when a ResponseCreatedEvent is received inside the retry stream loop, fixing the bug where snapshot text would be duplicated across retries - In test_reasoning_delta_event_type_field: add found=False flag and assert found after the loop so the test properly fails when no ReasoningDeltaEvent is emitted
01d8b3d to
996be13
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/test_reasoning_delta_stream_event.py`:
- Around line 59-70: The test currently allows a vacuous pass when no
ReasoningDeltaEvent snapshots are emitted; update the test around the snapshots
collection from result.stream_events() to require at least one snapshot before
performing length-order and content checks: after collecting snapshots (variable
snapshots) add an assertion that snapshots is not empty (e.g., assert snapshots,
"no reasoning snapshots emitted") so the subsequent loop and final check that
"Hello world" appears in snapshots[-1] will fail if no ReasoningDeltaEvent
objects were produced.
- Around line 82-85: The test currently only asserts that no individual event is
a ReasoningDeltaEvent but doesn't ensure the stream produced any events; update
the test that uses result.stream_events() to also verify the stream yielded at
least one event (e.g., accumulate events or increment a counter while iterating)
and assert the collected events list length (or counter) is greater than zero,
while still asserting none of the yielded events are instances of
ReasoningDeltaEvent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: b02a6b46-9175-4392-97c4-428c546aef1f
📒 Files selected for processing (5)
src/agents/__init__.pysrc/agents/run_internal/run_loop.pysrc/agents/stream_events.pytests/test_reasoning_delta_stream_event.pytests/test_stream_events.py
✅ Files skipped from review due to trivial changes (1)
- src/agents/run_internal/run_loop.py
🚧 Files skipped from review as they are similar to previous changes (1)
- src/agents/init.py
| snapshots: list[str] = [] | ||
| async for event in result.stream_events(): | ||
| if isinstance(event, ReasoningDeltaEvent): | ||
| snapshots.append(event.snapshot) | ||
|
|
||
| # Each snapshot must be at least as long as the previous one | ||
| for i in range(1, len(snapshots)): | ||
| assert len(snapshots[i]) >= len(snapshots[i - 1]) | ||
|
|
||
| # Last snapshot must contain the full reasoning text | ||
| if snapshots: | ||
| assert "Hello world" in snapshots[-1] |
There was a problem hiding this comment.
Avoid vacuous pass when no reasoning snapshots are emitted.
Line 69 currently guards the final assertion with if snapshots, so the test passes even when zero ReasoningDeltaEvent objects are produced.
✅ Minimal hardening diff
snapshots: list[str] = []
async for event in result.stream_events():
if isinstance(event, ReasoningDeltaEvent):
snapshots.append(event.snapshot)
+ assert snapshots, "Expected at least one ReasoningDeltaEvent snapshot"
+
# Each snapshot must be at least as long as the previous one
for i in range(1, len(snapshots)):
assert len(snapshots[i]) >= len(snapshots[i - 1])
# Last snapshot must contain the full reasoning text
- if snapshots:
- assert "Hello world" in snapshots[-1]
+ assert "Hello world" in snapshots[-1]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| snapshots: list[str] = [] | |
| async for event in result.stream_events(): | |
| if isinstance(event, ReasoningDeltaEvent): | |
| snapshots.append(event.snapshot) | |
| # Each snapshot must be at least as long as the previous one | |
| for i in range(1, len(snapshots)): | |
| assert len(snapshots[i]) >= len(snapshots[i - 1]) | |
| # Last snapshot must contain the full reasoning text | |
| if snapshots: | |
| assert "Hello world" in snapshots[-1] | |
| snapshots: list[str] = [] | |
| async for event in result.stream_events(): | |
| if isinstance(event, ReasoningDeltaEvent): | |
| snapshots.append(event.snapshot) | |
| assert snapshots, "Expected at least one ReasoningDeltaEvent snapshot" | |
| # Each snapshot must be at least as long as the previous one | |
| for i in range(1, len(snapshots)): | |
| assert len(snapshots[i]) >= len(snapshots[i - 1]) | |
| # Last snapshot must contain the full reasoning text | |
| assert "Hello world" in snapshots[-1] |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_reasoning_delta_stream_event.py` around lines 59 - 70, The test
currently allows a vacuous pass when no ReasoningDeltaEvent snapshots are
emitted; update the test around the snapshots collection from
result.stream_events() to require at least one snapshot before performing
length-order and content checks: after collecting snapshots (variable snapshots)
add an assertion that snapshots is not empty (e.g., assert snapshots, "no
reasoning snapshots emitted") so the subsequent loop and final check that "Hello
world" appears in snapshots[-1] will fail if no ReasoningDeltaEvent objects were
produced.
| async for event in result.stream_events(): | ||
| assert not isinstance(event, ReasoningDeltaEvent), ( | ||
| "Got unexpected ReasoningDeltaEvent for a plain text response" | ||
| ) |
There was a problem hiding this comment.
Assert that the stream produced events in the negative-case test.
Line 82–85 verifies event type but not stream liveness. A fully empty stream would incorrectly pass this test.
✅ Minimal hardening diff
- async for event in result.stream_events():
+ saw_event = False
+ async for event in result.stream_events():
+ saw_event = True
assert not isinstance(event, ReasoningDeltaEvent), (
"Got unexpected ReasoningDeltaEvent for a plain text response"
)
+ assert saw_event, "Expected at least one streamed event"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async for event in result.stream_events(): | |
| assert not isinstance(event, ReasoningDeltaEvent), ( | |
| "Got unexpected ReasoningDeltaEvent for a plain text response" | |
| ) | |
| saw_event = False | |
| async for event in result.stream_events(): | |
| saw_event = True | |
| assert not isinstance(event, ReasoningDeltaEvent), ( | |
| "Got unexpected ReasoningDeltaEvent for a plain text response" | |
| ) | |
| assert saw_event, "Expected at least one streamed event" |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_reasoning_delta_stream_event.py` around lines 82 - 85, The test
currently only asserts that no individual event is a ReasoningDeltaEvent but
doesn't ensure the stream produced any events; update the test that uses
result.stream_events() to also verify the stream yielded at least one event
(e.g., accumulate events or increment a counter while iterating) and assert the
collected events list length (or counter) is greater than zero, while still
asserting none of the yielded events are instances of ReasoningDeltaEvent.
Summary
When models like o3 or DeepSeek-R1 produce reasoning/thinking tokens during streaming, those deltas currently only surface as raw
RawResponsesStreamEventwrappers around low-levelresponse.reasoning_summary_text.deltaorresponse.reasoning_text.deltaevents. To consume them, callers have to inspect.data.typeand cast the event themselves — there's no clean signal in theStreamEventunion.This PR adds
ReasoningDeltaEventtoStreamEventand emits it alongside the existing raw event so reasoning deltas are as easy to consume as message deltas.Closes openai#825
What changed
ReasoningDeltaEventdataclass tostream_events.pywithdelta,snapshot, andtypefieldsStreamEventtype alias to includeReasoningDeltaEventagents/__init__.pyrun_internal/run_loop.py, therun_single_turn_streamedloop now emits aReasoningDeltaEventafter eachResponseReasoningSummaryTextDeltaEvent(o-series) andResponseReasoningTextDeltaEvent(DeepSeek/LiteLLM)snapshotfield accumulates the full reasoning text so far in the turn, so callers don't have to maintain their own bufferUsage example
Tests
Added
tests/test_reasoning_delta_stream_event.pycovering:ReasoningDeltaEventis emitted for reasoning itemsagentsAlso updated
tests/test_stream_events.py::test_complete_streaming_eventsto account for the new event in the event sequence (count goes from 27 → 28).Summary by CodeRabbit
New Features
Tests