Skip to content

contrib/openai_agents: stream model events via Workflow Streams#1497

Open
jssmith wants to merge 3 commits intomainfrom
contrib/openai-agents-streaming
Open

contrib/openai_agents: stream model events via Workflow Streams#1497
jssmith wants to merge 3 commits intomainfrom
contrib/openai-agents-streaming

Conversation

@jssmith
Copy link
Copy Markdown
Contributor

@jssmith jssmith commented Apr 30, 2026

Summary

Adds a streaming integration to temporalio.contrib.openai_agents that publishes raw TResponseStreamEvent values from the model activity to a Workflow Streams topic, so a workflow can fan model output out to clients in real time without losing durability.

  • Opt in via OpenAIAgentsPlugin(model_params=ModelActivityParameters(streaming_event_topic="...")). With no topic set, behavior is unchanged.
  • Activity calls Runner.run_streamed and publishes each TResponseStreamEvent chunk through a typed topic handle. Final response and tool-call events are still returned to the workflow as before.
  • TResponseStreamEvent is a typing special form rather than a class, so the topic stays untyped (default Any); subscribers pass result_type=TResponseStreamEvent on their own subscribe call.

This was originally split out of PR #1423 (Workflow Streams library) and is now rebuilt as a single commit on top of merged main.

Test plan

  • poe lint clean (ruff, pyright, mypy, basedpyright, pydocstyle)
  • tests/contrib/openai_agents/test_openai_streaming.py — covers streaming opt-in, no-topic fallback, MCP path, tool-call interleaving
  • Manual exercise via streaming-comparisons demos

🤖 Generated with Claude Code

Re-applies the openai_agents streaming integration originally split out
of PR #1423 on commit 59c7582, updated for the post-PR API:
TResponseStreamEvent is a typing-special form, not a class, so the
topic stays untyped (default Any) and subscribers pass
result_type=TResponseStreamEvent on their own subscribe call.

Opt in via `OpenAIAgentsPlugin(model_params=ModelActivityParameters(
streaming_event_topic="..."))`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@jssmith jssmith requested review from a team as code owners April 30, 2026 23:52
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an opt-in streaming integration for temporalio.contrib.openai_agents that publishes raw model stream events to Workflow Streams so workflows can fan out model output to external clients in real time while retaining durability.

Changes:

  • Add a new streaming model activity (invoke_model_activity_streaming) that consumes Model.stream_response() and publishes each raw TResponseStreamEvent to a Workflow Streams topic.
  • Enable workflow-side Runner.run_streamed() by wiring Model.stream_response() through the Temporal model stub and adding fail-fast validations for required streaming config.
  • Add docs + integration tests covering opt-in streaming, missing-topic failure, and local-activity incompatibility.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
tests/contrib/openai_agents/test_openai_streaming.py New integration tests validating streaming behavior and failure modes.
temporalio/contrib/openai_agents/_temporal_openai_agents.py Registers the new streaming activity with the worker.
temporalio/contrib/openai_agents/_temporal_model_stub.py Implements Model.stream_response() by invoking the streaming activity and yielding returned events.
temporalio/contrib/openai_agents/_openai_runner.py Enables workflow-safe run_streamed() with fail-fast validation and exception rewrapping.
temporalio/contrib/openai_agents/_model_parameters.py Adds streaming_topic and streaming_batch_interval configuration.
temporalio/contrib/openai_agents/_mcp.py Removes unused logging.
temporalio/contrib/openai_agents/_invoke_model_activity.py Adds streaming activity + Workflow Streams publishing and refactors tool/handoff reconstruction + OpenAI status handling.
temporalio/contrib/openai_agents/README.md Documents streaming usage and updates feature-support tables.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +350 to +401
Calls ``model.stream_response()`` and returns the collected list
of native OpenAI stream events. The workflow's
``Model.stream_response`` stub yields these to the agents
framework, which builds the final ``ModelResponse`` from the
terminal ``ResponseCompletedEvent``.

Each event is also published to the workflow's stream on
``streaming_topic`` so external consumers (UIs, tracing,
etc.) can observe events as they arrive.

Heartbeats run on a background task via ``_auto_heartbeater`` so
long initial-token latency or long pauses between chunks do not
trip ``heartbeat_timeout``.
"""
model = self._model_provider.get_model(input.get("model_name"))
tools, handoffs = _build_tools_and_handoffs(input)

topic = input["streaming_topic"]
batch_interval = input.get(
"streaming_batch_interval", timedelta(milliseconds=100)
)
events: list[TResponseStreamEvent] = []

stream = WorkflowStreamClient.from_within_activity(
batch_interval=batch_interval
)
# TResponseStreamEvent is a typing.Annotated[Union[...]] — a typing
# special form, not a class — so it cannot be passed as type[T].
# Leave the topic untyped (default Any); subscribers that want
# typed decode can pass result_type=TResponseStreamEvent on
# their own subscribe call.
events_topic = stream.topic(topic)
async with stream:
try:
async for event in model.stream_response(
system_instructions=input.get("system_instructions"),
input=input["input"],
model_settings=input["model_settings"],
tools=tools,
output_schema=input.get("output_schema"),
handoffs=handoffs,
tracing=ModelTracing(input["tracing"]),
previous_response_id=input.get("previous_response_id"),
conversation_id=input.get("conversation_id"),
prompt=input.get("prompt"),
):
events.append(event)
events_topic.publish(event)
except APIStatusError as e:
_raise_for_openai_status(e)

return events
Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invoke_model_activity_streaming collects and returns all stream events as the activity result. For long responses this can easily exceed Temporal payload limits and/or cause large in-memory lists (one per activity attempt). Consider returning only the minimal terminal event(s) needed to build the final ModelResponse, adding a configurable cap/truncation strategy for returned events (while still publishing all events to the workflow stream), and/or explicitly documenting the size limitation for workflow-side stream_events() consumers.

Copilot uses AI. Check for mistakes.
Comment on lines +281 to +282
heartbeat_timeout=self.model_params.heartbeat_timeout
or timedelta(seconds=30),
Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The streaming path forces a default heartbeat_timeout of 30s when ModelActivityParameters.heartbeat_timeout is unset. This changes behavior vs non-streaming calls (which leave heartbeat_timeout=None) and may introduce unexpected timeouts for users who intentionally rely only on start/schedule timeouts. Consider either requiring callers to set heartbeat_timeout explicitly when enabling streaming, or keeping None semantics and documenting that heartbeats are optional unless configured.

Suggested change
heartbeat_timeout=self.model_params.heartbeat_timeout
or timedelta(seconds=30),
heartbeat_timeout=self.model_params.heartbeat_timeout,

Copilot uses AI. Check for mistakes.
Comment on lines +622 to +625
To publish raw model events to external subscribers, host a
`WorkflowStream` in the workflow and configure
`OpenAIAgentsPlugin(model_params=ModelActivityParameters(streaming_topic="events"))`. See [`temporalio.contrib.workflow_streams`](../workflow_streams/README.md) for the
publisher and subscriber API.
Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description mentions opting in via ModelActivityParameters(streaming_event_topic=...), but the implementation/docs use ModelActivityParameters.streaming_topic. Please align the PR description (or rename in code if the intended public API is streaming_event_topic) to avoid confusion for users copying examples.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants