Skip to content

contrib/google_adk_agents: stream LlmResponse chunks via Workflow Streams#1498

Open
jssmith wants to merge 4 commits intomainfrom
contrib/google-adk-streaming
Open

contrib/google_adk_agents: stream LlmResponse chunks via Workflow Streams#1498
jssmith wants to merge 4 commits intomainfrom
contrib/google-adk-streaming

Conversation

@jssmith
Copy link
Copy Markdown
Contributor

@jssmith jssmith commented Apr 30, 2026

Summary

Adds a streaming integration to temporalio.contrib.google_adk_agents. The bridge honors stream=True and publishes raw LlmResponse chunks through a typed Workflow Streams topic handle, so a workflow can deliver token-level model output to clients while preserving durability.

  • Opt in via the plugin's streaming_topic parameter. With no topic set, behavior is unchanged.
  • Activity input is wrapped in StreamingInvokeInput (single dataclass per Temporal activity convention); the topic is required when streaming, since invoking streaming without publishing delivers no real-time value (the workflow only sees chunks batched at activity completion).

This was originally split out of PR #1423 (Workflow Streams library) and is now rebuilt as a single commit on top of merged main.

Test plan

  • poe lint clean (ruff, pyright, mypy, basedpyright, pydocstyle)
  • tests/contrib/google_adk_agents/test_adk_streaming.py — covers streaming chunk publishing and the typed topic contract
  • Manual exercise via streaming-comparisons demos

🤖 Generated with Claude Code

Re-applies the google_adk_agents streaming integration originally split
out of PR #1423 on commit 59c7582. The bridge honors `stream=True`
and publishes raw `LlmResponse` chunks through a typed topic handle.

Opt in via the plugin's `streaming_event_topic` parameter.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@jssmith jssmith requested review from a team as code owners April 30, 2026 23:52
Comment thread tests/contrib/google_adk_agents/test_adk_streaming.py Outdated
Comment thread tests/contrib/google_adk_agents/test_adk_streaming.py Outdated
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds opt-in streaming support for temporalio.contrib.google_adk_agents by invoking ADK models with stream=True and publishing raw LlmResponse chunks to a Workflow Streams topic so external consumers can observe incremental output.

Changes:

  • Introduces a new streaming activity (invoke_model_streaming) and an activity input wrapper (StreamingInvokeInput) to publish LlmResponse chunks via WorkflowStreamClient.
  • Extends TemporalModel with streaming_topic / streaming_batch_interval and routes generate_content_async(stream=True) through the streaming activity.
  • Adds integration tests validating chunk publishing and the “topic required for streaming” contract.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
temporalio/contrib/google_adk_agents/_model.py Adds streaming activity + model options; publishes LlmResponse chunks to Workflow Streams when stream=True.
temporalio/contrib/google_adk_agents/_plugin.py Registers the new streaming activity with the worker plugin.
tests/contrib/google_adk_agents/test_adk_streaming.py New integration tests covering streaming publish behavior and required-topic failure mode.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +86 to +89
):
activity.heartbeat()
responses.append(response)
events.publish(response)
Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heartbeating on every streamed chunk can create a very high heartbeat rate for token-level streaming, adding significant overhead (each call schedules heartbeat work and can trigger RPCs). Consider heartbeating on a timer based on activity.info().heartbeat_timeout (similar to temporalio.contrib.openai_agents._heartbeat_decorator) or at a coarser interval (e.g., once per N chunks / per second).

Copilot uses AI. Check for mistakes.
Comment on lines +78 to +80

stream = WorkflowStreamClient.from_within_activity(
batch_interval=input.streaming_batch_interval,
Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WorkflowStreamClient generates a random publisher_id per instance. If this activity is retried (e.g., transient failure after some chunks were published), the retry will use a new publisher_id and the workflow-side dedup will not suppress already-published chunks, resulting in duplicated stream items. Consider making the publisher id stable across activity retries (e.g., derived from activity.info().workflow_id + activity.info().activity_id, or extending WorkflowStreamClient.from_within_activity/create to accept an explicit publisher_id).

Suggested change
stream = WorkflowStreamClient.from_within_activity(
batch_interval=input.streaming_batch_interval,
activity_info = activity.info()
publisher_id = f"{activity_info.workflow_id}:{activity_info.activity_id}"
stream = WorkflowStreamClient.from_within_activity(
batch_interval=input.streaming_batch_interval,
publisher_id=publisher_id,

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants