contrib/openai_agents: stream model events via Workflow Streams#1497
contrib/openai_agents: stream model events via Workflow Streams#1497
Conversation
Re-applies the openai_agents streaming integration originally split out of PR #1423 on commit 59c7582, updated for the post-PR API: TResponseStreamEvent is a typing-special form, not a class, so the topic stays untyped (default Any) and subscribers pass result_type=TResponseStreamEvent on their own subscribe call. Opt in via `OpenAIAgentsPlugin(model_params=ModelActivityParameters( streaming_event_topic="..."))`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds an opt-in streaming integration for temporalio.contrib.openai_agents that publishes raw model stream events to Workflow Streams so workflows can fan out model output to external clients in real time while retaining durability.
Changes:
- Add a new streaming model activity (
invoke_model_activity_streaming) that consumesModel.stream_response()and publishes each rawTResponseStreamEventto a Workflow Streams topic. - Enable workflow-side
Runner.run_streamed()by wiringModel.stream_response()through the Temporal model stub and adding fail-fast validations for required streaming config. - Add docs + integration tests covering opt-in streaming, missing-topic failure, and local-activity incompatibility.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/contrib/openai_agents/test_openai_streaming.py | New integration tests validating streaming behavior and failure modes. |
| temporalio/contrib/openai_agents/_temporal_openai_agents.py | Registers the new streaming activity with the worker. |
| temporalio/contrib/openai_agents/_temporal_model_stub.py | Implements Model.stream_response() by invoking the streaming activity and yielding returned events. |
| temporalio/contrib/openai_agents/_openai_runner.py | Enables workflow-safe run_streamed() with fail-fast validation and exception rewrapping. |
| temporalio/contrib/openai_agents/_model_parameters.py | Adds streaming_topic and streaming_batch_interval configuration. |
| temporalio/contrib/openai_agents/_mcp.py | Removes unused logging. |
| temporalio/contrib/openai_agents/_invoke_model_activity.py | Adds streaming activity + Workflow Streams publishing and refactors tool/handoff reconstruction + OpenAI status handling. |
| temporalio/contrib/openai_agents/README.md | Documents streaming usage and updates feature-support tables. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Calls ``model.stream_response()`` and returns the collected list | ||
| of native OpenAI stream events. The workflow's | ||
| ``Model.stream_response`` stub yields these to the agents | ||
| framework, which builds the final ``ModelResponse`` from the | ||
| terminal ``ResponseCompletedEvent``. | ||
|
|
||
| Each event is also published to the workflow's stream on | ||
| ``streaming_topic`` so external consumers (UIs, tracing, | ||
| etc.) can observe events as they arrive. | ||
|
|
||
| Heartbeats run on a background task via ``_auto_heartbeater`` so | ||
| long initial-token latency or long pauses between chunks do not | ||
| trip ``heartbeat_timeout``. | ||
| """ | ||
| model = self._model_provider.get_model(input.get("model_name")) | ||
| tools, handoffs = _build_tools_and_handoffs(input) | ||
|
|
||
| topic = input["streaming_topic"] | ||
| batch_interval = input.get( | ||
| "streaming_batch_interval", timedelta(milliseconds=100) | ||
| ) | ||
| events: list[TResponseStreamEvent] = [] | ||
|
|
||
| stream = WorkflowStreamClient.from_within_activity( | ||
| batch_interval=batch_interval | ||
| ) | ||
| # TResponseStreamEvent is a typing.Annotated[Union[...]] — a typing | ||
| # special form, not a class — so it cannot be passed as type[T]. | ||
| # Leave the topic untyped (default Any); subscribers that want | ||
| # typed decode can pass result_type=TResponseStreamEvent on | ||
| # their own subscribe call. | ||
| events_topic = stream.topic(topic) | ||
| async with stream: | ||
| try: | ||
| async for event in model.stream_response( | ||
| system_instructions=input.get("system_instructions"), | ||
| input=input["input"], | ||
| model_settings=input["model_settings"], | ||
| tools=tools, | ||
| output_schema=input.get("output_schema"), | ||
| handoffs=handoffs, | ||
| tracing=ModelTracing(input["tracing"]), | ||
| previous_response_id=input.get("previous_response_id"), | ||
| conversation_id=input.get("conversation_id"), | ||
| prompt=input.get("prompt"), | ||
| ): | ||
| events.append(event) | ||
| events_topic.publish(event) | ||
| except APIStatusError as e: | ||
| _raise_for_openai_status(e) | ||
|
|
||
| return events |
There was a problem hiding this comment.
invoke_model_activity_streaming collects and returns all stream events as the activity result. For long responses this can easily exceed Temporal payload limits and/or cause large in-memory lists (one per activity attempt). Consider returning only the minimal terminal event(s) needed to build the final ModelResponse, adding a configurable cap/truncation strategy for returned events (while still publishing all events to the workflow stream), and/or explicitly documenting the size limitation for workflow-side stream_events() consumers.
| heartbeat_timeout=self.model_params.heartbeat_timeout | ||
| or timedelta(seconds=30), |
There was a problem hiding this comment.
The streaming path forces a default heartbeat_timeout of 30s when ModelActivityParameters.heartbeat_timeout is unset. This changes behavior vs non-streaming calls (which leave heartbeat_timeout=None) and may introduce unexpected timeouts for users who intentionally rely only on start/schedule timeouts. Consider either requiring callers to set heartbeat_timeout explicitly when enabling streaming, or keeping None semantics and documenting that heartbeats are optional unless configured.
| heartbeat_timeout=self.model_params.heartbeat_timeout | |
| or timedelta(seconds=30), | |
| heartbeat_timeout=self.model_params.heartbeat_timeout, |
| To publish raw model events to external subscribers, host a | ||
| `WorkflowStream` in the workflow and configure | ||
| `OpenAIAgentsPlugin(model_params=ModelActivityParameters(streaming_topic="events"))`. See [`temporalio.contrib.workflow_streams`](../workflow_streams/README.md) for the | ||
| publisher and subscriber API. |
There was a problem hiding this comment.
The PR description mentions opting in via ModelActivityParameters(streaming_event_topic=...), but the implementation/docs use ModelActivityParameters.streaming_topic. Please align the PR description (or rename in code if the intended public API is streaming_event_topic) to avoid confusion for users copying examples.
Summary
Adds a streaming integration to
temporalio.contrib.openai_agentsthat publishes rawTResponseStreamEventvalues from the model activity to a Workflow Streams topic, so a workflow can fan model output out to clients in real time without losing durability.OpenAIAgentsPlugin(model_params=ModelActivityParameters(streaming_event_topic="...")). With no topic set, behavior is unchanged.Runner.run_streamedand publishes eachTResponseStreamEventchunk through a typed topic handle. Final response and tool-call events are still returned to the workflow as before.TResponseStreamEventis a typing special form rather than a class, so the topic stays untyped (defaultAny); subscribers passresult_type=TResponseStreamEventon their ownsubscribecall.This was originally split out of PR #1423 (Workflow Streams library) and is now rebuilt as a single commit on top of merged main.
Test plan
poe lintclean (ruff, pyright, mypy, basedpyright, pydocstyle)tests/contrib/openai_agents/test_openai_streaming.py— covers streaming opt-in, no-topic fallback, MCP path, tool-call interleaving🤖 Generated with Claude Code