feat: add loop operators#5700
Draft
aglinxinyuan wants to merge 332 commits into
Draft
Conversation
Signed-off-by: Xinyuan Lin <xinyual3@uci.edu>
Signed-off-by: Xinyuan Lin <xinyual3@uci.edu>
Signed-off-by: Xinyuan Lin <xinyual3@uci.edu>
Signed-off-by: Xinyuan Lin <xinyual3@uci.edu>
… switch Mirrors the same change on state-handshake-redesign (the upstream PR candidate, apache#4560 review feedback). Functionally equivalent to the prior design that removed all three per-task finallys; this design keeps them and drops the run-loop's end-of-body _switch_context() instead, which keeps process_state / process_internal_marker / process_tuple unchanged from origin/main.
review polish Keep loop-feb consistent with the apache#5707 review changes: proto field OutputPort.reusesOutputStorage -> reuseStorage, the createOrReuseDocument scaladoc, and the RegionExecutionCoordinator comment wording. Also rename the LoopOpDesc/LoopEndOpDesc reuseStorage toggle and the spec assertions to match the new field name.
apache#5707 (reuse output storage) is now on main, so its files drop out of loop-feb's diff (proto reuseStorage field, DocumentFactory.createOrReuseDocument + spec). Conflict resolutions: - RegionExecutionCoordinator: keep loop-feb's version WITHOUT the require(!reuseStorage) production guard -- the loop operators legitimately set the flag, so the guard (correct for apache#5707 in isolation) must not fire here. - OutputPortReuseFlagSpec: keep loop-feb's LoopEnd-allowing guard (only Loop End may set reuseStorage), superseding main's all-false form from apache#5707. - test_state_materialization_e2e.py: keep loop-feb's 4-column-State version (loop_counter/loop_start_id/loop_start_state_uri as their own columns). NOTE: this reverts main's apache#5682 class-based-fixture refactor of that test; re-apply that structure on top of the 4-column semantics in the PR2 state PR.
aglinxinyuan
added a commit
to aglinxinyuan/texera
that referenced
this pull request
Jun 17, 2026
…pache#5707) ### What changes were proposed in this PR? Adds an opt-in mechanism for an output port to **reuse** its storage when the owning operator's region re-executes, instead of recreating the document each time. Dormant and behavior-preserving — no operator sets the flag in this PR. - `OutputPort` gains a `reuseStorage: Boolean` proto field (alongside `blocking` / `mode`). It marks a port whose output accumulates across region re-executions — e.g. a Loop End port whose result builds up over the iterations of its own loop. - `DocumentFactory.createOrReuseDocument(uri, schema, reuseExisting, …)` is the create-or-reuse decision: when reuse is requested and a document already exists it opens and returns that one; otherwise it creates a fresh one. It always returns the document, so the call site does not branch. - `RegionExecutionCoordinator` reads each output port's `reuseStorage` flag while provisioning that port's result/state documents and routes through `createOrReuseDocument`. | port flag | region re-run behavior | |---|---| | `false` (every operator today) | recreate output/state documents — unchanged | | `true` (set by Loop End in the loop PR) | keep and reopen the existing documents | A runtime guard in `RegionExecutionCoordinator` asserts no port sets `reuseStorage` for now: the flag activates only with the loop operators, which are not yet on `main`. The guard keeps the dormant reuse path from being silently exercised before its consumer exists, and is removed when the loop operators land. ### Any related issues, documentation, discussions? Resolves apache#5709 (sub-issue of apache#4442 "Introduce for loop"). Split out of apache#5700 to keep that PR reviewable, per @Xiao-zhen-Liu's [review](apache#4206 (review)). ### How was this PR tested? - `DocumentFactorySpec` — pins the create-or-reuse decision (the reuse × exists matrix plus the "no-reuse never probes existence" short-circuit) with injected document stubs, no iceberg backend. - `OutputPortReuseFlagSpec` — guards that no registered operator enables `reuseStorage` on any output port. - `WorkflowCore` / `WorkflowOperator` / `WorkflowExecutionService` compile; scalafmt + scalafix clean. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF.
This was referenced Jun 18, 2026
…e#5783) Keep loop-feb's console_message_manager.py identical to apache#5783 so it drops cleanly from loop-feb's diff when that PR merges.
…le_message util apache#5783 moved error-message construction out of ConsoleMessageManager into core.util.console_message.error_message.create_error_console_message (per Yicong's review). Sync loop-feb's copies so they drop cleanly when apache#5783 merges: revert console_message_manager.py to message-management-only, add the util + its test, and route both callers (DataProcessor and the loop-condition path in main_loop.py) through create_error_console_message + put_message.
Contributor
Automated Reviewer SuggestionsBased on the
|
…erialization) Keep loop-feb's PhysicalOp + CostBasedScheduleGenerator identical to apache#5720 after its review cleanup (reworded requiresMaterializedExecution doc + dropped the duplicate inline comment), so they drop cleanly when apache#5720 merges.
aglinxinyuan
added a commit
to aglinxinyuan/texera
that referenced
this pull request
Jun 22, 2026
…ed by the scheduler (apache#5720) ### What changes were proposed in this PR? Lets an operator declare it can only run under a fully-materialized schedule, and has the scheduler honor it: - `PhysicalOp` gains `requiresMaterializedExecution: Boolean = false` (+ a `withRequiresMaterializedExecution` builder). It is a physical-execution property, so it lives on the physical op. - `CostBasedScheduleGenerator` consumes it: when any physical op requires materialized execution it forces a fully-materialized schedule regardless of the requested execution mode; otherwise the existing PIPELINED/MATERIALIZED logic runs unchanged. Default `false` ⇒ dormant and behavior-preserving: no operator requires it yet, so the scheduler's effective mode is unchanged today. The loop operators set the flag on their physical op. ### Any related issues, documentation, discussions? Resolves apache#5719 (sub-issue of apache#4442 "Introduce for loop"). Split out of apache#5700. Reflects the review discussion with @Yicong-Huang: the property belongs on `PhysicalOp`, and it is consumed by the scheduler. ### How was this PR tested? `WorkflowCoreTypesSpec` covers the `PhysicalOp.requiresMaterializedExecution` default + builder. `WorkflowExecutionService/Test/compile`, `scalafixAll --check`, and `scalafmtCheckAll` pass locally. The scheduler consumer is exercised end-to-end by the loop integration tests once the loop operators (which set the flag) land. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF.
aglinxinyuan
added a commit
to aglinxinyuan/texera
that referenced
this pull request
Jun 22, 2026
…che#5783) ### What changes were proposed in this PR? `DataProcessor` built the operator-facing ERROR console message for an uncaught UDF exception inline (`_report_exception`). This moves that construction into a small factory — `core.util.console_message.error_message.create_error_console_message(worker_id, exc_info) -> ConsoleMessage`. `DataProcessor` builds the message via the factory and queues it through the existing `ConsoleMessageManager.put_message`. Per review, `ConsoleMessageManager` stays purely about **message management** (its interfaces are all at the `msg: ConsoleMessage` level); message *construction* lives in the util. Behavior-preserving: the same ERROR `ConsoleMessage` is produced — | field | value | |---|---| | `msg_type` | `ConsoleMessageType.ERROR` | | `title` | the exception's final line (e.g. `ValueError: ...`) | | `message` | the full formatted traceback | | `source` | `module:func:line` of the raising frame | Centralizing the factory lets other uncaught-exception paths report identically (the loop operators' main-loop condition evaluation reuses it in a follow-up). ### Any related issues, documentation, discussions? Split out of apache#5700 (loop operators) to keep that PR focused; the refactor is independent and behavior-preserving on `main`. ### How was this PR tested? - New `test_error_message.py::test_builds_error_console_message_from_exc_info` pins the factory output (worker id, ERROR type, title, traceback body, `module:func:line` source) — written test-first. - The existing `test_data_processor.py` (asserts console messages after a UDF raises) still passes unchanged, confirming the delegation preserves behavior. - `cd amber && pytest -m "not integration"` on the affected files: 12 passed; `black --check` clean. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF.
yangzhang75
pushed a commit
to yangzhang75/texera
that referenced
this pull request
Jun 22, 2026
…ache#5706) ### What changes were proposed in this PR? Centralizes the Python worker's worker-id parsing in `core/util/virtual_identity.py`: - Adds `get_operator_id(worker_id)` — extracts the logical operator id from a worker actor name (`Worker:WF<wf>-<op>-<layer>-<idx>`), raising `ValueError` on a malformed id. - Generalizes `worker_name_pattern` to capture the workflow id and operator id explicitly. - Switches both `get_worker_index` and `get_operator_id` to `re.fullmatch`, so a malformed id with trailing junk now fails loudly instead of parsing silently — matching the Scala `VirtualIdentityUtils.getPhysicalOpId` full-match semantics the docstring already claims. | case | before | after | |---|---|---| | `get_worker_index`, well-formed id | worker index | same value | | `get_worker_index`, malformed id (trailing junk) | parsed silently | raises `ValueError` | | `get_operator_id` | — | new helper | Behavior-preserving for well-formed worker ids. `get_operator_id`'s production caller lands with the for-loop feature; the helper and its test are independent and mergeable now. ### Any related issues, documentation, discussions? Resolves apache#5708 (sub-issue of apache#4442 "Introduce for loop"). Split out of apache#5700 to keep that PR reviewable, per @Xiao-zhen-Liu's [review](apache#4206 (review)). ### How was this PR tested? `pytest src/test/python/core/util/test_virtual_identity.py` — 23 passing, covering well-formed ids, the new `get_operator_id`, and malformed ids that now raise `ValueError`. `ruff check`/`format` clean. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF.
yangzhang75
pushed a commit
to yangzhang75/texera
that referenced
this pull request
Jun 22, 2026
apache#5712) ### What changes were proposed in this PR? Extracts the repeated "run a workflow and read its materialized results" boilerplate from the amber e2e specs into two reusable helpers on `TestUtils`: - `readMaterializedResults(executionId, operatorIds, extract)` — resolve + open each operator's external RESULT document and apply `extract` to the opened `VirtualDocument[Tuple]` (operators with no materialized output are skipped). - `runWorkflowAndReadResults(system, workflow, operatorIds, extract, completionTimeout)` — run a workflow to `COMPLETED` (a `FatalError` aborts and surfaces as the awaited exception), then read results via `readMaterializedResults`. `DataProcessingSpec.executeWorkflow` now calls the shared harness instead of its own inline copy. The helpers are loop/state-agnostic — they only use existing core APIs (`DocumentFactory`, `VirtualDocument[Tuple]`, `AmberClient`, `ExecutionStateUpdate`, `FatalError`), so other e2e specs can adopt them too. ### Any related issues, documentation, discussions? Resolves apache#5711 (sub-issue of apache#4442 "Introduce for loop"). Split out of apache#5700 to keep that PR reviewable, per @Xiao-zhen-Liu's [review](apache#4206 (review)). ### How was this PR tested? Behavior-preserving refactor of existing e2e test infrastructure. `WorkflowExecutionService/Test/compile` and `WorkflowExecutionService/scalafmtCheckAll` pass locally. The `@IntegrationTest` specs that exercise the harness (e.g. `DataProcessingSpec`) run in CI — they spawn Python workers and can't run on Windows. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF.
yangzhang75
pushed a commit
to yangzhang75/texera
that referenced
this pull request
Jun 22, 2026
…pache#5707) ### What changes were proposed in this PR? Adds an opt-in mechanism for an output port to **reuse** its storage when the owning operator's region re-executes, instead of recreating the document each time. Dormant and behavior-preserving — no operator sets the flag in this PR. - `OutputPort` gains a `reuseStorage: Boolean` proto field (alongside `blocking` / `mode`). It marks a port whose output accumulates across region re-executions — e.g. a Loop End port whose result builds up over the iterations of its own loop. - `DocumentFactory.createOrReuseDocument(uri, schema, reuseExisting, …)` is the create-or-reuse decision: when reuse is requested and a document already exists it opens and returns that one; otherwise it creates a fresh one. It always returns the document, so the call site does not branch. - `RegionExecutionCoordinator` reads each output port's `reuseStorage` flag while provisioning that port's result/state documents and routes through `createOrReuseDocument`. | port flag | region re-run behavior | |---|---| | `false` (every operator today) | recreate output/state documents — unchanged | | `true` (set by Loop End in the loop PR) | keep and reopen the existing documents | A runtime guard in `RegionExecutionCoordinator` asserts no port sets `reuseStorage` for now: the flag activates only with the loop operators, which are not yet on `main`. The guard keeps the dormant reuse path from being silently exercised before its consumer exists, and is removed when the loop operators land. ### Any related issues, documentation, discussions? Resolves apache#5709 (sub-issue of apache#4442 "Introduce for loop"). Split out of apache#5700 to keep that PR reviewable, per @Xiao-zhen-Liu's [review](apache#4206 (review)). ### How was this PR tested? - `DocumentFactorySpec` — pins the create-or-reuse decision (the reuse × exists matrix plus the "no-reuse never probes existence" short-circuit) with injected document stubs, no iceberg backend. - `OutputPortReuseFlagSpec` — guards that no registered operator enables `reuseStorage` on any output port. - `WorkflowCore` / `WorkflowOperator` / `WorkflowExecutionService` compile; scalafmt + scalafix clean. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF.
yangzhang75
pushed a commit
to yangzhang75/texera
that referenced
this pull request
Jun 24, 2026
…ed by the scheduler (apache#5720) ### What changes were proposed in this PR? Lets an operator declare it can only run under a fully-materialized schedule, and has the scheduler honor it: - `PhysicalOp` gains `requiresMaterializedExecution: Boolean = false` (+ a `withRequiresMaterializedExecution` builder). It is a physical-execution property, so it lives on the physical op. - `CostBasedScheduleGenerator` consumes it: when any physical op requires materialized execution it forces a fully-materialized schedule regardless of the requested execution mode; otherwise the existing PIPELINED/MATERIALIZED logic runs unchanged. Default `false` ⇒ dormant and behavior-preserving: no operator requires it yet, so the scheduler's effective mode is unchanged today. The loop operators set the flag on their physical op. ### Any related issues, documentation, discussions? Resolves apache#5719 (sub-issue of apache#4442 "Introduce for loop"). Split out of apache#5700. Reflects the review discussion with @Yicong-Huang: the property belongs on `PhysicalOp`, and it is consumed by the scheduler. ### How was this PR tested? `WorkflowCoreTypesSpec` covers the `PhysicalOp.requiresMaterializedExecution` default + builder. `WorkflowExecutionService/Test/compile`, `scalafixAll --check`, and `scalafmtCheckAll` pass locally. The scheduler consumer is exercised end-to-end by the loop integration tests once the loop operators (which set the flag) land. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF.
yangzhang75
pushed a commit
to yangzhang75/texera
that referenced
this pull request
Jun 24, 2026
…che#5783) ### What changes were proposed in this PR? `DataProcessor` built the operator-facing ERROR console message for an uncaught UDF exception inline (`_report_exception`). This moves that construction into a small factory — `core.util.console_message.error_message.create_error_console_message(worker_id, exc_info) -> ConsoleMessage`. `DataProcessor` builds the message via the factory and queues it through the existing `ConsoleMessageManager.put_message`. Per review, `ConsoleMessageManager` stays purely about **message management** (its interfaces are all at the `msg: ConsoleMessage` level); message *construction* lives in the util. Behavior-preserving: the same ERROR `ConsoleMessage` is produced — | field | value | |---|---| | `msg_type` | `ConsoleMessageType.ERROR` | | `title` | the exception's final line (e.g. `ValueError: ...`) | | `message` | the full formatted traceback | | `source` | `module:func:line` of the raising frame | Centralizing the factory lets other uncaught-exception paths report identically (the loop operators' main-loop condition evaluation reuses it in a follow-up). ### Any related issues, documentation, discussions? Split out of apache#5700 (loop operators) to keep that PR focused; the refactor is independent and behavior-preserving on `main`. ### How was this PR tested? - New `test_error_message.py::test_builds_error_console_message_from_exc_info` pins the factory output (worker id, ERROR type, title, traceback body, `module:func:line` source) — written test-first. - The existing `test_data_processor.py` (asserts console messages after a UDF raises) still passes unchanged, confirming the delegation preserves behavior. - `cd amber && pytest -m "not integration"` on the affected files: 12 passed; `black --check` clean. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF.
Yicong-Huang
pushed a commit
that referenced
this pull request
Jun 24, 2026
…et (#5781) ### What changes were proposed in this PR? When workflow execution initialization fails, the error was recorded into the execution metadata store but never pushed to the websocket, so connected frontend clients saw nothing — particularly for failures during `WorkflowExecutionService` construction, which happens *before* the execution is published to subscribers. `WorkflowService.initExecutionService`'s catch arm now, after `errorHandler(e)` records the fatal error, pushes a `WorkflowErrorEvent` (carrying the recorded fatal errors) to `errorSubject` — the workflow-level channel that `connect()` subscribers listen on — so init-time failures surface in the UI. | init failure | before | after | |---|---|---| | during `WorkflowExecutionService` construction (pre-publish) | logged + stored, invisible to the UI | `WorkflowErrorEvent` delivered to the frontend | | during `executeWorkflow()` | recorded; UI delivery depended on subscription timing | `WorkflowErrorEvent` delivered to the frontend | The push is extracted into a small `reportFatalErrorsToSubscribers` method so it can be unit-tested without a database (the init path itself is DB-bound). ### Any related issues, documentation, discussions? Resolves #5782. Discovered while splitting #5700 (loop operators) into smaller PRs; this fix is independent of that feature and applies to `main` on its own. ### How was this PR tested? New `WorkflowServiceSpec` (TDD, red → green): pins that `reportFatalErrorsToSubscribers` delivers a `WorkflowErrorEvent` to a `connect()` subscriber carrying exactly the fatal errors recorded in the execution state store (single error, and all errors when several are present). `sbt "WorkflowExecutionService/testOnly *WorkflowServiceSpec"` passes (2/2); scalafmt + scalafix clean. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF. (backported from commit 1c580e5)
Ma77Ball
pushed a commit
to Ma77Ball/texera
that referenced
this pull request
Jun 24, 2026
…et (apache#5781) ### What changes were proposed in this PR? When workflow execution initialization fails, the error was recorded into the execution metadata store but never pushed to the websocket, so connected frontend clients saw nothing — particularly for failures during `WorkflowExecutionService` construction, which happens *before* the execution is published to subscribers. `WorkflowService.initExecutionService`'s catch arm now, after `errorHandler(e)` records the fatal error, pushes a `WorkflowErrorEvent` (carrying the recorded fatal errors) to `errorSubject` — the workflow-level channel that `connect()` subscribers listen on — so init-time failures surface in the UI. | init failure | before | after | |---|---|---| | during `WorkflowExecutionService` construction (pre-publish) | logged + stored, invisible to the UI | `WorkflowErrorEvent` delivered to the frontend | | during `executeWorkflow()` | recorded; UI delivery depended on subscription timing | `WorkflowErrorEvent` delivered to the frontend | The push is extracted into a small `reportFatalErrorsToSubscribers` method so it can be unit-tested without a database (the init path itself is DB-bound). ### Any related issues, documentation, discussions? Resolves apache#5782. Discovered while splitting apache#5700 (loop operators) into smaller PRs; this fix is independent of that feature and applies to `main` on its own. ### How was this PR tested? New `WorkflowServiceSpec` (TDD, red → green): pins that `reportFatalErrorsToSubscribers` delivers a `WorkflowErrorEvent` to a `connect()` subscriber carrying exactly the fatal errors recorded in the execution state store (single error, and all errors when several are present). `sbt "WorkflowExecutionService/testOnly *WorkflowServiceSpec"` passes (2/2); scalafmt + scalafix clean. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF.
xuang7
pushed a commit
to ELin2025/texera
that referenced
this pull request
Jul 1, 2026
…ant) (apache#5900) ### What changes were proposed in this PR? Extends the cross-region **State materialization** format from a single `content` column to **3 columns** — `content`, `loop_counter`, `loop_start_id` — promoting loop bookkeeping to first-class columns (never inside the content JSON). The transport carries them end to end: the `OutputManager` state writer + `emit_state`, the Python network sender/receiver, the materialization reader, and the Scala `state.toTuple` call sites. In memory the two loop fields ride on the `StateFrame` envelope; they are materialized/serialized as their own columns (parallel to `content`), and `from_tuple` / `fromTuple` read only `content` back into the `State`. The loop-back write address (LoopStart's input-port URI) is **intentionally not** carried in State. It's constant per-execution config, not per-iteration data, so it will be delivered to Loop End at **setup** in the loop PR rather than round-tripping through every State row. (An earlier revision of this PR carried a `loop_start_state_uri` column; it was dropped after review — better than shipping a dormant column and removing it later.) On the Python side the column-name → value mapping is defined once in `State.to_columns` and reused by both `to_tuple` (iceberg) and the network sender's `StateFrame` branch, so adding a column later is a single-line change in one place rather than an edit in every serializer. **Dormant on `main`** — nothing observable changes without the loop operators: - `to_tuple()` / `toTuple()` and `OutputManager.save_state_to_storage_if_needed` / `emit_state` default the two loop columns to `0` / `""`, so every existing non-loop caller is unchanged. - `from_tuple` / `fromTuple` read only the `content` column, so round-trip identity is preserved and the extra columns are inert. No backward-compatible read of old 1-column State is needed: State materialization is **intra-execution only** — the iceberg state-document URI is execution-scoped (`…/eid/{executionId}/`) and recreated fresh each run, and State tuples are never replayed across executions or engine versions, so a 1-column tuple can never reach the 3-column reader. This is the state-format prerequisite the loop operators build on; the columns carry non-default values only once Loop Start/End set them (follow-up PR). ### Any related issues, documentation, discussions? Extracted from apache#5700 (loop operators) per @Xiao-zhen-Liu's split request; part of apache#4442 ("Introduce for loop"). ### How was this PR tested? - **Format / round-trip:** `test_state.py` (loop columns are their own columns, never in content JSON, default to `0` / `""`), Scala `StateSpec` (both loop columns round-trip through a tuple with non-default values, not just `content`), `ArrowUtilsSpec` (3-column Arrow vector round-trip), `IcebergDocumentSpec` (iceberg state-doc round-trip). - **Transport:** `test_network_receiver.py`, `test_input_port_materialization_reader_runnable.py`, and `test_state_materialization_e2e.py` — the e2e (hermetic sqlite catalog) writes non-default values for both loop columns end-to-end and asserts they replay both on the `StateFrame` and on the raw iceberg row, exercising the real Tuple/Schema/iceberg path. - **Dormancy:** `test_output_manager.py::test_defaults_loop_columns_when_omitted` pins that a no-loop caller (no `loop_counter`) still produces a valid 3-column tuple with the loop columns at `0` / `""`. - Local: `workflow-core` + `amber` compile; `StateSpec` + `ArrowUtilsSpec` pass; Python state + transport + e2e tests pass; scalafmt + scalafix + black clean. (`IcebergDocumentSpec` needs the iceberg catalog backend, so it runs in CI.) ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF.
Reworks the loop back-edge write address per the apache#5900 review decision: the URI is constant per-execution config, not per-iteration data, so it no longer rides State rows / the StateFrame envelope (the merged 3-column format has no loop_start_state_uri). Instead the scheduler resolves it and ships it to workers at setup: - PhysicalOp gains a compile-time `isLoopStart` marker (set by LoopStartOpDesc, mirroring requiresMaterializedExecution). - WorkflowExecutionCoordinator derives {LoopStart logical op id -> state URI of its single input port} from the final resource-allocated schedule; the two single-port/single-reader guards that lived in the Python worker's _compute_loop_start_id move controller-side. - InitializeExecutorRequest gains `map<string,string> loopStartStateUris` (field 4), sent to every worker each region (re-)run; the Python handler stores it on Context. No static LoopStart<->LoopEnd pairing is needed: a Loop End selects the entry by the loop_start_id carried on the StateFrame it consumes (nested loops work unchanged). - MainLoop: _compute_loop_start_id shrinks to the id parse; _jump_to_loop_start looks the URI up from the injected config and fails loudly (before the jump RPC) if it is missing; the inner-LoopStart nested pass-through gate keys on frame.loop_start_id instead of the removed URI field. - LoopIntegrationSpec adopts the per-suite id isolation from apache#5888 (specId 5), and WorkflowService drops a leftover errorSubject call removed on main by apache#5922. Tests: reworked test_main_loop.py to the 3-arg emit convention and the config-driven jump (plus a new missing-URI fail-loud case), added InitializeExecutorHandler coverage for the new field, pinned isLoopStart true/false in the Loop Start/End descriptor specs.
Two asserts in the pre-existing ECM-priority test were rewrapped by a different formatter version during the merge; the repo's pinned ruff (0.14.7, what CI checks) wants main's original style back.
…ative
Review-driven cleanup of the loop feature (net -235 lines), no mechanism
changes:
- _eval_loop_expr evaluates the user expression with eval() instead of
exec("output = " + expr), so "output" stops being a reserved name and
_RESERVED_STATE_KEYS shrinks to {"table"}. Two deliberate user-visible
changes: a loop variable named `output` now persists like any other,
and multi-statement expressions now raise SyntaxError instead of
silently executing.
- Drop the dead scratch-key strip in _jump_to_loop_start (run_update is
@OVERRIDES.final and always persists _strip_reserved(...) before any
jump can fire) and inline the one-line _compute_loop_start_id wrapper.
- Drop the dead .withSuggestedWorkerNum(1) (WorkerConfig forces
workerCount=1 for non-parallelizable ops) and its mixin pin.
- Deduplicate tests: merge the twin loop_counter>0 pass-through tests;
merge the two happy-path jump tests into one that also pins
RPC-before-write ordering via a shared event log; fold the
envelope-exclusion assertions into the consume test; drop the 8 pyb
tricky-input descriptor tests (PythonTemplateBuilderSpec on main pins
the base64 guarantee) and the engine-simulation single-loop test
(LoopIntegrationSpec runs it for real with identical expressions).
- Keep one canonical home per design narrative (proto field doc for the
setup-config story, table_to_ipc_bytes for Arrow-IPC-not-pickle, the
reset_output_storage call site for the inner-reset story); other
copies reduced to pointers. Fix three stale TestLoopCounterRuntime
cross-references.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this PR?
Adds two new operators — Loop Start and Loop End — that let users write a for-loop inside a visual workflow. The loop iterates over rows of a pandas table. The user supplies four small Python snippets:
initializationi = 0outputtable.iloc[i]— the row passed into the loop body each iterationupdatei += 1conditioni < len(table)— keep looping while this is trueOperators placed between Loop Start and Loop End make up the loop body and run once per iteration. When
conditionreturns true, the runtime starts another iteration; when it returns false, downstream operators run on the accumulated output.initializationandupdateare Python statements;outputandconditionare single Python expressions (evaluated witheval, so multi-statement input is rejected up front). The only reserved name in loop state istable(the runtime seeds it into the expression namespace and strips it from persisted state).How an iteration works
The arrow from Loop End back to Loop Start is not an edge in the workflow graph — the region DAG stays acyclic. The loop-back is done with two separate steps when an iteration ends:
jump_to_operator_region, asking the controller to schedule the Loop Start region one more time.i, any accumulators, and the table (Arrow IPC) — into the iceberg state table that Loop Start reads from at the start of every iteration.The newly scheduled Loop Start region then picks up that state and runs the next iteration. The "iceberg table Loop Start reads from" is the same cross-region state channel introduced in #4490; this PR reuses it as the back-edge for loops.
How Loop End knows where to write (setup-time config)
The loop-back write address is the same every iteration — same Loop Start, same input port, same execution — so it is config, not per-iteration data, and it is deliberately not carried in State (settled during the #5900 review; the materialized State format has exactly 3 columns:
content,loop_counter,loop_start_id). Instead:LoopStartOpDescmarks its physical op with a compile-timeisLoopStartflag (mirroring the existingrequiresMaterializedExecutionprecedent).WorkflowExecutionCoordinatorderives a whole-plan map{Loop Start logical op id → state URI of its single input port}from the final schedule — the same URIsAssignPortships to that Loop Start's input readers.InitializeExecutorRequestgains aloopStartStateUrisfield, re-sent identically on every region (re-)run, and the Python worker stores it on itsContext.loop_start_idstamped on the StateFrame it consumes — the same id it already uses for the jump DCM — so nested loops work unchanged. A missing entry fails loudly before the jump, so a misconfigured loop can't rewind the schedule without a back-edge write.What changed
LoopStartOpDesc.scala,LoopEndOpDesc.scalainitialization/output/update/conditionexpressions; Loop Start additionally setsisLoopStart, Loop End setsreuseStorageon its output portcore/models/operator.py—LoopStartOperator,LoopEndOperatortablename out of user loop statePhysicalOp.scala—isLoopStart+requiresMaterializedExecutionisLoopStart, and any loop op forces whole-plan MATERIALIZED scheduling viarequiresMaterializedExecution(the back-edge is a cross-region materialized state channel)WorkflowExecutionCoordinator.scala{Loop Start op id → input-port state URI}from the final resource-allocated schedule; fails at StartWorkflow (not mid-loop) if a Loop Start's input port isn't materializedcontrolcommands.proto—InitializeExecutorRequest.loopStartStateUris;RegionExecutionCoordinator.scala;initialize_executor_handler.py+ContextStartWorkerspawns the readers that replay statesMainLoop._process_state_frame,_compute_loop_start_id,_jump_to_loop_startloop_counterand the nested pass-through; on Loop End completion: look up the write address by the captured id, send the jump DCM, write the next stateOutputPort.reuseStorage(set by Loop End) +RegionExecutionCoordinatorDocumentFactory.createOrReuseDocument) instead of recreating itOutputManager.reset_output_storageloop_counter > 0pass-through), dropping and recreating that Loop End's result/state tables so each outer iteration accumulates from empty. A single / outermost Loop End never resets. Safe because loops run MATERIALIZED — downstream doesn't read until the loop region completesLoopStart.png,LoopEnd.pngNested loops
Each state carries a
loop_counterinteger marking which loop's iteration the state belongs to. This is what keeps an inner Loop End from accidentally consuming an outer loop's state.LoopStartpass-throughloop_start_idis stamped on the envelope),loop_counter += 1and pass it through — the operator is skipped.LoopEndpass-throughloop_counter > 0, decrement and pass it through (this state belongs to an outer loop); also reset this inner Loop End's output so the new outer iteration accumulates from empty.LoopEndloop_counter == 0, the state is mine: runupdate, evaluatecondition, jump back to the Loop Start whose id is stamped on the state.So when two loops are nested, the outer loop's state walks through the inner Loop Start (+1) and the inner Loop End (−1) untouched, arrives at the outer Loop End at
loop_counter == 0, and only there is it consumed. Each Loop End writes back to whichever Loop Start stamped the state it consumed — the setup-injected map has an entry per Loop Start, so no extra machinery is needed for nesting.Any related issues, documentation, discussions?
Closes #4442. Builds on #4490 (cross-region state materialization) and #5085 (
DocumentFactory.documentExists).Prerequisites split out of this PR per @Xiao-zhen-Liu's request, all now merged: #5706 (worker-id helper), #5707 (state test harness), #5900 (the 3-column State materialization format —
content/loop_counter/loop_start_id— dormant on main until this PR activates it).How was this PR tested?
test_main_loop.py::TestMainLoop(loop-runtime cases) — nested pass-through (+1 at an inner Loop Start / −1 + reset-once at an inner Loop End, operator skipped, id rides through), consume atloop_counter == 0with envelope metadata never leaking into user State, and the full_jump_to_loop_startcontract (jump DCM target, DCM-before-storage-write ordering, exact iceberg write sequence) driven off the setup-injected config — including the fail-loud case when the map lacks the captured id.test_initialize_executor_handler.py— the new proto field lands onContext.loop_start_state_uris(and resets to empty when absent, since recreated workers re-run the RPC).test_loop_operators.py— runtime base classes: flat-loop matching branch (runsupdate/condition), nested-loop pass-through, guarded expression evaluation, and a multi-iteration loop driven to completion.LoopStartOpDescSpec.scala/LoopEndOpDescSpec.scala— code-gen output, ports, JSON round-trip, and the flag pins:requiresMaterializedExecution(both),isLoopStart(true / false),reuseStorage(Loop End only).LoopIntegrationSpec.scala(@IntegrationTest, CI) — end-to-end single loop (3 iterations) and nested 3×3 loop (9 inner iterations), verified via materialized iceberg row counts; adopts the per-suite id isolation from fix(test): isolate e2e suites by unique workflow/execution id #5888.sbt scalafmtCheckAll+scalafixAll --checkand Pythonblackclean; full Python unit suite passes.Manual workflows
Input for both is a 3-row table from
TextInput("1\n2\n3"). Each loop's condition isi < len(table).TextInput → LoopStart → LoopEndTextInput → OuterLoopStart → InnerLoopStart → InnerLoopEnd → OuterLoopEndDemo:

Basic Loop:
Nested Loop:

Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude (Opus 4.7, Opus 4.8) in compliance with ASF.