Skip to content

feat: add loop operators#5700

Draft
aglinxinyuan wants to merge 332 commits into
apache:mainfrom
aglinxinyuan:loop-feb
Draft

feat: add loop operators#5700
aglinxinyuan wants to merge 332 commits into
apache:mainfrom
aglinxinyuan:loop-feb

Conversation

@aglinxinyuan

@aglinxinyuan aglinxinyuan commented Jun 13, 2026

Copy link
Copy Markdown
Contributor

Re-opened from my fork to satisfy the requirement that contributions come from a fork rather than a branch on the main repository (the prior PR #4206 was on an apache/texera branch). The full review history — Copilot's and @Xiao-zhen-Liu's review threads and my replies — is preserved on #4206 for reference.


What changes were proposed in this PR?

Adds two new operators — Loop Start and Loop End — that let users write a for-loop inside a visual workflow. The loop iterates over rows of a pandas table. The user supplies four small Python snippets:

Field Where Example
initialization Loop Start i = 0
output Loop Start table.iloc[i] — the row passed into the loop body each iteration
update Loop End i += 1
condition Loop End i < len(table) — keep looping while this is true

Operators placed between Loop Start and Loop End make up the loop body and run once per iteration. When condition returns true, the runtime starts another iteration; when it returns false, downstream operators run on the accumulated output.

initialization and update are Python statements; output and condition are single Python expressions (evaluated with eval, so multi-statement input is rejected up front). The only reserved name in loop state is table (the runtime seeds it into the expression namespace and strips it from persisted state).

How an iteration works

   Upstream Table
        │
        ▼
   ┌──────────┐   loop variables: row i,   ┌──────────┐               ┌─────────┐
   │ Loop     ├───counters, accumulators ─►│   loop   ├──────────────►│  Loop   │
   │ Start    │   (the loop's "state")     │   body   │               │   End   │
   └──────────┘                            └──────────┘               └────┬────┘
        ▲                                                                  │
        │  (1) DCM: "schedule the Loop Start region again"                 │
        │  (2) write the next iteration's state (i, accumulators, table)   │
        │      to the iceberg table that Loop Start reads its input from   │
        └──────────────────────────────────────────────────────────────────┘
                                when condition() == True

The arrow from Loop End back to Loop Start is not an edge in the workflow graph — the region DAG stays acyclic. The loop-back is done with two separate steps when an iteration ends:

  1. Loop End sends a DCM (Direct Control Message — Texera's worker→controller control-channel message; it does not flow along data edges, so it doesn't break the acyclic DAG) named jump_to_operator_region, asking the controller to schedule the Loop Start region one more time.
  2. Loop End writes the updated state — a dict with i, any accumulators, and the table (Arrow IPC) — into the iceberg state table that Loop Start reads from at the start of every iteration.

The newly scheduled Loop Start region then picks up that state and runs the next iteration. The "iceberg table Loop Start reads from" is the same cross-region state channel introduced in #4490; this PR reuses it as the back-edge for loops.

How Loop End knows where to write (setup-time config)

The loop-back write address is the same every iteration — same Loop Start, same input port, same execution — so it is config, not per-iteration data, and it is deliberately not carried in State (settled during the #5900 review; the materialized State format has exactly 3 columns: content, loop_counter, loop_start_id). Instead:

  • LoopStartOpDesc marks its physical op with a compile-time isLoopStart flag (mirroring the existing requiresMaterializedExecution precedent).
  • After resource allocation, WorkflowExecutionCoordinator derives a whole-plan map {Loop Start logical op id → state URI of its single input port} from the final schedule — the same URIs AssignPort ships to that Loop Start's input readers.
  • The map rides the existing per-operator setup RPC: InitializeExecutorRequest gains a loopStartStateUris field, re-sent identically on every region (re-)run, and the Python worker stores it on its Context.
  • No static Loop Start ↔ Loop End pairing is needed: a Loop End selects the map entry by the loop_start_id stamped on the StateFrame it consumes — the same id it already uses for the jump DCM — so nested loops work unchanged. A missing entry fails loudly before the jump, so a misconfigured loop can't rewind the schedule without a back-edge write.

What changed

Area File Purpose
Operator definitions LoopStartOpDesc.scala, LoopEndOpDesc.scala Code-gen the Python operator from the user's initialization / output / update / condition expressions; Loop Start additionally sets isLoopStart, Loop End sets reuseStorage on its output port
Operator runtime base core/models/operator.pyLoopStartOperator, LoopEndOperator Python superclasses the generated code extends; guarded expression evaluation keeps the reserved table name out of user loop state
Physical plan PhysicalOp.scalaisLoopStart + requiresMaterializedExecution Compile-time markers: the scheduler resolves each Loop Start's loop-back write address from isLoopStart, and any loop op forces whole-plan MATERIALIZED scheduling via requiresMaterializedExecution (the back-edge is a cross-region materialized state channel)
Loop-back address resolution WorkflowExecutionCoordinator.scala Derives {Loop Start op id → input-port state URI} from the final resource-allocated schedule; fails at StartWorkflow (not mid-loop) if a Loop Start's input port isn't materialized
Setup delivery controlcommands.protoInitializeExecutorRequest.loopStartStateUris; RegionExecutionCoordinator.scala; initialize_executor_handler.py + Context Ships the map to every worker at executor initialization, each region (re-)run — always before StartWorker spawns the readers that replay states
Worker runtime MainLoop._process_state_frame, _compute_loop_start_id, _jump_to_loop_start Loop Start stamps its op id on emitted state; the runtime owns loop_counter and the nested pass-through; on Loop End completion: look up the write address by the captured id, send the jump DCM, write the next state
Storage reuse OutputPort.reuseStorage (set by Loop End) + RegionExecutionCoordinator A Loop End's materialized output accumulates across its own iterations: the scheduler reuses its output doc on region re-runs (DocumentFactory.createOrReuseDocument) instead of recreating it
Worker output OutputManager.reset_output_storage Only for the inner Loop End of a nested loop: fires once per outer iteration (on the loop_counter > 0 pass-through), dropping and recreating that Loop End's result/state tables so each outer iteration accumulates from empty. A single / outermost Loop End never resets. Safe because loops run MATERIALIZED — downstream doesn't read until the loop region completes
Frontend LoopStart.png, LoopEnd.png Operator icons

Nested loops

Each state carries a loop_counter integer marking which loop's iteration the state belongs to. This is what keeps an inner Loop End from accidentally consuming an outer loop's state.

Operator Rule
Inner LoopStart pass-through If the state already came from a Loop Start (loop_start_id is stamped on the envelope), loop_counter += 1 and pass it through — the operator is skipped.
Inner LoopEnd pass-through If loop_counter > 0, decrement and pass it through (this state belongs to an outer loop); also reset this inner Loop End's output so the new outer iteration accumulates from empty.
Matching LoopEnd If loop_counter == 0, the state is mine: run update, evaluate condition, jump back to the Loop Start whose id is stamped on the state.

So when two loops are nested, the outer loop's state walks through the inner Loop Start (+1) and the inner Loop End (−1) untouched, arrives at the outer Loop End at loop_counter == 0, and only there is it consumed. Each Loop End writes back to whichever Loop Start stamped the state it consumed — the setup-injected map has an entry per Loop Start, so no extra machinery is needed for nesting.

Any related issues, documentation, discussions?

Closes #4442. Builds on #4490 (cross-region state materialization) and #5085 (DocumentFactory.documentExists).

Prerequisites split out of this PR per @Xiao-zhen-Liu's request, all now merged: #5706 (worker-id helper), #5707 (state test harness), #5900 (the 3-column State materialization format — content / loop_counter / loop_start_id — dormant on main until this PR activates it).

How was this PR tested?

  • test_main_loop.py::TestMainLoop (loop-runtime cases) — nested pass-through (+1 at an inner Loop Start / −1 + reset-once at an inner Loop End, operator skipped, id rides through), consume at loop_counter == 0 with envelope metadata never leaking into user State, and the full _jump_to_loop_start contract (jump DCM target, DCM-before-storage-write ordering, exact iceberg write sequence) driven off the setup-injected config — including the fail-loud case when the map lacks the captured id.
  • test_initialize_executor_handler.py — the new proto field lands on Context.loop_start_state_uris (and resets to empty when absent, since recreated workers re-run the RPC).
  • test_loop_operators.py — runtime base classes: flat-loop matching branch (runs update / condition), nested-loop pass-through, guarded expression evaluation, and a multi-iteration loop driven to completion.
  • LoopStartOpDescSpec.scala / LoopEndOpDescSpec.scala — code-gen output, ports, JSON round-trip, and the flag pins: requiresMaterializedExecution (both), isLoopStart (true / false), reuseStorage (Loop End only).
  • LoopIntegrationSpec.scala (@IntegrationTest, CI) — end-to-end single loop (3 iterations) and nested 3×3 loop (9 inner iterations), verified via materialized iceberg row counts; adopts the per-suite id isolation from fix(test): isolate e2e suites by unique workflow/execution id #5888.
  • sbt scalafmtCheckAll + scalafixAll --check and Python black clean; full Python unit suite passes.

Manual workflows

Input for both is a 3-row table from TextInput("1\n2\n3"). Each loop's condition is i < len(table).

Workflow Topology Expected
Loop.json TextInput → LoopStart → LoopEnd 3 iterations, workflow terminates.
Nested Loop.json TextInput → OuterLoopStart → InnerLoopStart → InnerLoopEnd → OuterLoopEnd Outer runs 3 times, inner runs 3 times per outer iteration = 9 total inner iterations. Workflow terminates.

Demo:
Basic Loop:
loop

Nested Loop:
nested

Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude (Opus 4.7, Opus 4.8) in compliance with ASF.

Signed-off-by: Xinyuan Lin <xinyual3@uci.edu>
Signed-off-by: Xinyuan Lin <xinyual3@uci.edu>
Signed-off-by: Xinyuan Lin <xinyual3@uci.edu>
Signed-off-by: Xinyuan Lin <xinyual3@uci.edu>
… switch

Mirrors the same change on state-handshake-redesign (the upstream PR
candidate, apache#4560 review feedback). Functionally equivalent to the prior
design that removed all three per-task finallys; this design keeps them
and drops the run-loop's end-of-body _switch_context() instead, which
keeps process_state / process_internal_marker / process_tuple unchanged
from origin/main.
 review polish

Keep loop-feb consistent with the apache#5707 review changes: proto field
OutputPort.reusesOutputStorage -> reuseStorage, the createOrReuseDocument
scaladoc, and the RegionExecutionCoordinator comment wording. Also rename
the LoopOpDesc/LoopEndOpDesc reuseStorage toggle and the spec assertions
to match the new field name.
apache#5707 (reuse output storage) is now on main, so its files drop out of
loop-feb's diff (proto reuseStorage field, DocumentFactory.createOrReuseDocument
+ spec). Conflict resolutions:

- RegionExecutionCoordinator: keep loop-feb's version WITHOUT the
  require(!reuseStorage) production guard -- the loop operators legitimately
  set the flag, so the guard (correct for apache#5707 in isolation) must not fire here.
- OutputPortReuseFlagSpec: keep loop-feb's LoopEnd-allowing guard (only Loop
  End may set reuseStorage), superseding main's all-false form from apache#5707.
- test_state_materialization_e2e.py: keep loop-feb's 4-column-State version
  (loop_counter/loop_start_id/loop_start_state_uri as their own columns).
  NOTE: this reverts main's apache#5682 class-based-fixture refactor of that test;
  re-apply that structure on top of the 4-column semantics in the PR2 state PR.
aglinxinyuan added a commit to aglinxinyuan/texera that referenced this pull request Jun 17, 2026
…pache#5707)

### What changes were proposed in this PR?

Adds an opt-in mechanism for an output port to **reuse** its storage
when the owning operator's region re-executes, instead of recreating the
document each time. Dormant and behavior-preserving — no operator sets
the flag in this PR.

- `OutputPort` gains a `reuseStorage: Boolean` proto field (alongside
`blocking` / `mode`). It marks a port whose output accumulates across
region re-executions — e.g. a Loop End port whose result builds up over
the iterations of its own loop.
- `DocumentFactory.createOrReuseDocument(uri, schema, reuseExisting, …)`
is the create-or-reuse decision: when reuse is requested and a document
already exists it opens and returns that one; otherwise it creates a
fresh one. It always returns the document, so the call site does not
branch.
- `RegionExecutionCoordinator` reads each output port's `reuseStorage`
flag while provisioning that port's result/state documents and routes
through `createOrReuseDocument`.

| port flag | region re-run behavior |
|---|---|
| `false` (every operator today) | recreate output/state documents —
unchanged |
| `true` (set by Loop End in the loop PR) | keep and reopen the existing
documents |

A runtime guard in `RegionExecutionCoordinator` asserts no port sets
`reuseStorage` for now: the flag activates only with the loop operators,
which are not yet on `main`. The guard keeps the dormant reuse path from
being silently exercised before its consumer exists, and is removed when
the loop operators land.

### Any related issues, documentation, discussions?

Resolves apache#5709 (sub-issue of apache#4442 "Introduce for loop"). Split out of
apache#5700 to keep that PR reviewable, per @Xiao-zhen-Liu's
[review](apache#4206 (review)).

### How was this PR tested?

- `DocumentFactorySpec` — pins the create-or-reuse decision (the reuse ×
exists matrix plus the "no-reuse never probes existence" short-circuit)
with injected document stubs, no iceberg backend.
- `OutputPortReuseFlagSpec` — guards that no registered operator enables
`reuseStorage` on any output port.
- `WorkflowCore` / `WorkflowOperator` / `WorkflowExecutionService`
compile; scalafmt + scalafix clean.

### Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.
aglinxinyuan and others added 2 commits June 18, 2026 16:39
…e#5783)

Keep loop-feb's console_message_manager.py identical to apache#5783 so it drops
cleanly from loop-feb's diff when that PR merges.
…le_message util

apache#5783 moved error-message construction out of ConsoleMessageManager into
core.util.console_message.error_message.create_error_console_message (per Yicong's
review). Sync loop-feb's copies so they drop cleanly when apache#5783 merges: revert
console_message_manager.py to message-management-only, add the util + its test,
and route both callers (DataProcessor and the loop-condition path in main_loop.py)
through create_error_console_message + put_message.
@github-actions

github-actions Bot commented Jun 21, 2026

Copy link
Copy Markdown
Contributor

Automated Reviewer Suggestions

Based on the git blame history of the changed files, we recommend the following reviewers:

  • Contributors with relevant context: @Ma77Ball, @Yicong-Huang, @mengw15
    You can notify them by mentioning @Ma77Ball, @Yicong-Huang, @mengw15 in a comment.

…erialization)

Keep loop-feb's PhysicalOp + CostBasedScheduleGenerator identical to apache#5720 after
its review cleanup (reworded requiresMaterializedExecution doc + dropped the
duplicate inline comment), so they drop cleanly when apache#5720 merges.
aglinxinyuan added a commit to aglinxinyuan/texera that referenced this pull request Jun 22, 2026
…ed by the scheduler (apache#5720)

### What changes were proposed in this PR?

Lets an operator declare it can only run under a fully-materialized
schedule, and has the scheduler honor it:

- `PhysicalOp` gains `requiresMaterializedExecution: Boolean = false` (+
a `withRequiresMaterializedExecution` builder). It is a
physical-execution property, so it lives on the physical op.
- `CostBasedScheduleGenerator` consumes it: when any physical op
requires materialized execution it forces a fully-materialized schedule
regardless of the requested execution mode; otherwise the existing
PIPELINED/MATERIALIZED logic runs unchanged.

Default `false` ⇒ dormant and behavior-preserving: no operator requires
it yet, so the scheduler's effective mode is unchanged today. The loop
operators set the flag on their physical op.

### Any related issues, documentation, discussions?

Resolves apache#5719 (sub-issue of apache#4442 "Introduce for loop"). Split out of
apache#5700. Reflects the review discussion with @Yicong-Huang: the property
belongs on `PhysicalOp`, and it is consumed by the scheduler.

### How was this PR tested?

`WorkflowCoreTypesSpec` covers the
`PhysicalOp.requiresMaterializedExecution` default + builder.
`WorkflowExecutionService/Test/compile`, `scalafixAll --check`, and
`scalafmtCheckAll` pass locally. The scheduler consumer is exercised
end-to-end by the loop integration tests once the loop operators (which
set the flag) land.

### Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.
aglinxinyuan added a commit to aglinxinyuan/texera that referenced this pull request Jun 22, 2026
…che#5783)

### What changes were proposed in this PR?

`DataProcessor` built the operator-facing ERROR console message for an
uncaught UDF exception inline (`_report_exception`). This moves that
construction into a small factory —
`core.util.console_message.error_message.create_error_console_message(worker_id,
exc_info) -> ConsoleMessage`. `DataProcessor` builds the message via the
factory and queues it through the existing
`ConsoleMessageManager.put_message`.

Per review, `ConsoleMessageManager` stays purely about **message
management** (its interfaces are all at the `msg: ConsoleMessage`
level); message *construction* lives in the util.

Behavior-preserving: the same ERROR `ConsoleMessage` is produced —

| field | value |
|---|---|
| `msg_type` | `ConsoleMessageType.ERROR` |
| `title` | the exception's final line (e.g. `ValueError: ...`) |
| `message` | the full formatted traceback |
| `source` | `module:func:line` of the raising frame |

Centralizing the factory lets other uncaught-exception paths report
identically (the loop operators' main-loop condition evaluation reuses
it in a follow-up).

### Any related issues, documentation, discussions?

Split out of apache#5700 (loop operators) to keep that PR focused; the
refactor is independent and behavior-preserving on `main`.

### How was this PR tested?

- New
`test_error_message.py::test_builds_error_console_message_from_exc_info`
pins the factory output (worker id, ERROR type, title, traceback body,
`module:func:line` source) — written test-first.
- The existing `test_data_processor.py` (asserts console messages after
a UDF raises) still passes unchanged, confirming the delegation
preserves behavior.
- `cd amber && pytest -m "not integration"` on the affected files: 12
passed; `black --check` clean.

### Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.
yangzhang75 pushed a commit to yangzhang75/texera that referenced this pull request Jun 22, 2026
…ache#5706)

### What changes were proposed in this PR?

Centralizes the Python worker's worker-id parsing in
`core/util/virtual_identity.py`:

- Adds `get_operator_id(worker_id)` — extracts the logical operator id
from a worker actor name (`Worker:WF<wf>-<op>-<layer>-<idx>`), raising
`ValueError` on a malformed id.
- Generalizes `worker_name_pattern` to capture the workflow id and
operator id explicitly.
- Switches both `get_worker_index` and `get_operator_id` to
`re.fullmatch`, so a malformed id with trailing junk now fails loudly
instead of parsing silently — matching the Scala
`VirtualIdentityUtils.getPhysicalOpId` full-match semantics the
docstring already claims.

| case | before | after |
|---|---|---|
| `get_worker_index`, well-formed id | worker index | same value |
| `get_worker_index`, malformed id (trailing junk) | parsed silently |
raises `ValueError` |
| `get_operator_id` | — | new helper |

Behavior-preserving for well-formed worker ids. `get_operator_id`'s
production caller lands with the for-loop feature; the helper and its
test are independent and mergeable now.

### Any related issues, documentation, discussions?

Resolves apache#5708 (sub-issue of apache#4442 "Introduce for loop"). Split out of
apache#5700 to keep that PR reviewable, per @Xiao-zhen-Liu's
[review](apache#4206 (review)).

### How was this PR tested?

`pytest src/test/python/core/util/test_virtual_identity.py` — 23
passing, covering well-formed ids, the new `get_operator_id`, and
malformed ids that now raise `ValueError`. `ruff check`/`format` clean.

### Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.
yangzhang75 pushed a commit to yangzhang75/texera that referenced this pull request Jun 22, 2026
apache#5712)

### What changes were proposed in this PR?

Extracts the repeated "run a workflow and read its materialized results"
boilerplate from the amber e2e specs into two reusable helpers on
`TestUtils`:

- `readMaterializedResults(executionId, operatorIds, extract)` — resolve
+ open each operator's external RESULT document and apply `extract` to
the opened `VirtualDocument[Tuple]` (operators with no materialized
output are skipped).
- `runWorkflowAndReadResults(system, workflow, operatorIds, extract,
completionTimeout)` — run a workflow to `COMPLETED` (a `FatalError`
aborts and surfaces as the awaited exception), then read results via
`readMaterializedResults`.

`DataProcessingSpec.executeWorkflow` now calls the shared harness
instead of its own inline copy. The helpers are loop/state-agnostic —
they only use existing core APIs (`DocumentFactory`,
`VirtualDocument[Tuple]`, `AmberClient`, `ExecutionStateUpdate`,
`FatalError`), so other e2e specs can adopt them too.

### Any related issues, documentation, discussions?

Resolves apache#5711 (sub-issue of apache#4442 "Introduce for loop"). Split out of
apache#5700 to keep that PR reviewable, per @Xiao-zhen-Liu's
[review](apache#4206 (review)).

### How was this PR tested?

Behavior-preserving refactor of existing e2e test infrastructure.
`WorkflowExecutionService/Test/compile` and
`WorkflowExecutionService/scalafmtCheckAll` pass locally. The
`@IntegrationTest` specs that exercise the harness (e.g.
`DataProcessingSpec`) run in CI — they spawn Python workers and can't
run on Windows.

### Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.
yangzhang75 pushed a commit to yangzhang75/texera that referenced this pull request Jun 22, 2026
…pache#5707)

### What changes were proposed in this PR?

Adds an opt-in mechanism for an output port to **reuse** its storage
when the owning operator's region re-executes, instead of recreating the
document each time. Dormant and behavior-preserving — no operator sets
the flag in this PR.

- `OutputPort` gains a `reuseStorage: Boolean` proto field (alongside
`blocking` / `mode`). It marks a port whose output accumulates across
region re-executions — e.g. a Loop End port whose result builds up over
the iterations of its own loop.
- `DocumentFactory.createOrReuseDocument(uri, schema, reuseExisting, …)`
is the create-or-reuse decision: when reuse is requested and a document
already exists it opens and returns that one; otherwise it creates a
fresh one. It always returns the document, so the call site does not
branch.
- `RegionExecutionCoordinator` reads each output port's `reuseStorage`
flag while provisioning that port's result/state documents and routes
through `createOrReuseDocument`.

| port flag | region re-run behavior |
|---|---|
| `false` (every operator today) | recreate output/state documents —
unchanged |
| `true` (set by Loop End in the loop PR) | keep and reopen the existing
documents |

A runtime guard in `RegionExecutionCoordinator` asserts no port sets
`reuseStorage` for now: the flag activates only with the loop operators,
which are not yet on `main`. The guard keeps the dormant reuse path from
being silently exercised before its consumer exists, and is removed when
the loop operators land.

### Any related issues, documentation, discussions?

Resolves apache#5709 (sub-issue of apache#4442 "Introduce for loop"). Split out of
apache#5700 to keep that PR reviewable, per @Xiao-zhen-Liu's
[review](apache#4206 (review)).

### How was this PR tested?

- `DocumentFactorySpec` — pins the create-or-reuse decision (the reuse ×
exists matrix plus the "no-reuse never probes existence" short-circuit)
with injected document stubs, no iceberg backend.
- `OutputPortReuseFlagSpec` — guards that no registered operator enables
`reuseStorage` on any output port.
- `WorkflowCore` / `WorkflowOperator` / `WorkflowExecutionService`
compile; scalafmt + scalafix clean.

### Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.
yangzhang75 pushed a commit to yangzhang75/texera that referenced this pull request Jun 24, 2026
…ed by the scheduler (apache#5720)

### What changes were proposed in this PR?

Lets an operator declare it can only run under a fully-materialized
schedule, and has the scheduler honor it:

- `PhysicalOp` gains `requiresMaterializedExecution: Boolean = false` (+
a `withRequiresMaterializedExecution` builder). It is a
physical-execution property, so it lives on the physical op.
- `CostBasedScheduleGenerator` consumes it: when any physical op
requires materialized execution it forces a fully-materialized schedule
regardless of the requested execution mode; otherwise the existing
PIPELINED/MATERIALIZED logic runs unchanged.

Default `false` ⇒ dormant and behavior-preserving: no operator requires
it yet, so the scheduler's effective mode is unchanged today. The loop
operators set the flag on their physical op.

### Any related issues, documentation, discussions?

Resolves apache#5719 (sub-issue of apache#4442 "Introduce for loop"). Split out of
apache#5700. Reflects the review discussion with @Yicong-Huang: the property
belongs on `PhysicalOp`, and it is consumed by the scheduler.

### How was this PR tested?

`WorkflowCoreTypesSpec` covers the
`PhysicalOp.requiresMaterializedExecution` default + builder.
`WorkflowExecutionService/Test/compile`, `scalafixAll --check`, and
`scalafmtCheckAll` pass locally. The scheduler consumer is exercised
end-to-end by the loop integration tests once the loop operators (which
set the flag) land.

### Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.
yangzhang75 pushed a commit to yangzhang75/texera that referenced this pull request Jun 24, 2026
…che#5783)

### What changes were proposed in this PR?

`DataProcessor` built the operator-facing ERROR console message for an
uncaught UDF exception inline (`_report_exception`). This moves that
construction into a small factory —
`core.util.console_message.error_message.create_error_console_message(worker_id,
exc_info) -> ConsoleMessage`. `DataProcessor` builds the message via the
factory and queues it through the existing
`ConsoleMessageManager.put_message`.

Per review, `ConsoleMessageManager` stays purely about **message
management** (its interfaces are all at the `msg: ConsoleMessage`
level); message *construction* lives in the util.

Behavior-preserving: the same ERROR `ConsoleMessage` is produced —

| field | value |
|---|---|
| `msg_type` | `ConsoleMessageType.ERROR` |
| `title` | the exception's final line (e.g. `ValueError: ...`) |
| `message` | the full formatted traceback |
| `source` | `module:func:line` of the raising frame |

Centralizing the factory lets other uncaught-exception paths report
identically (the loop operators' main-loop condition evaluation reuses
it in a follow-up).

### Any related issues, documentation, discussions?

Split out of apache#5700 (loop operators) to keep that PR focused; the
refactor is independent and behavior-preserving on `main`.

### How was this PR tested?

- New
`test_error_message.py::test_builds_error_console_message_from_exc_info`
pins the factory output (worker id, ERROR type, title, traceback body,
`module:func:line` source) — written test-first.
- The existing `test_data_processor.py` (asserts console messages after
a UDF raises) still passes unchanged, confirming the delegation
preserves behavior.
- `cd amber && pytest -m "not integration"` on the affected files: 12
passed; `black --check` clean.

### Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.
Yicong-Huang pushed a commit that referenced this pull request Jun 24, 2026
…et (#5781)

### What changes were proposed in this PR?

When workflow execution initialization fails, the error was recorded
into the execution metadata store but never pushed to the websocket, so
connected frontend clients saw nothing — particularly for failures
during `WorkflowExecutionService` construction, which happens *before*
the execution is published to subscribers.

`WorkflowService.initExecutionService`'s catch arm now, after
`errorHandler(e)` records the fatal error, pushes a `WorkflowErrorEvent`
(carrying the recorded fatal errors) to `errorSubject` — the
workflow-level channel that `connect()` subscribers listen on — so
init-time failures surface in the UI.

| init failure | before | after |
|---|---|---|
| during `WorkflowExecutionService` construction (pre-publish) | logged
+ stored, invisible to the UI | `WorkflowErrorEvent` delivered to the
frontend |
| during `executeWorkflow()` | recorded; UI delivery depended on
subscription timing | `WorkflowErrorEvent` delivered to the frontend |

The push is extracted into a small `reportFatalErrorsToSubscribers`
method so it can be unit-tested without a database (the init path itself
is DB-bound).

### Any related issues, documentation, discussions?

Resolves #5782. Discovered while splitting #5700 (loop operators) into
smaller PRs; this fix is independent of that feature and applies to
`main` on its own.

### How was this PR tested?

New `WorkflowServiceSpec` (TDD, red → green): pins that
`reportFatalErrorsToSubscribers` delivers a `WorkflowErrorEvent` to a
`connect()` subscriber carrying exactly the fatal errors recorded in the
execution state store (single error, and all errors when several are
present). `sbt "WorkflowExecutionService/testOnly *WorkflowServiceSpec"`
passes (2/2); scalafmt + scalafix clean.

### Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.

(backported from commit 1c580e5)
Ma77Ball pushed a commit to Ma77Ball/texera that referenced this pull request Jun 24, 2026
…et (apache#5781)

### What changes were proposed in this PR?

When workflow execution initialization fails, the error was recorded
into the execution metadata store but never pushed to the websocket, so
connected frontend clients saw nothing — particularly for failures
during `WorkflowExecutionService` construction, which happens *before*
the execution is published to subscribers.

`WorkflowService.initExecutionService`'s catch arm now, after
`errorHandler(e)` records the fatal error, pushes a `WorkflowErrorEvent`
(carrying the recorded fatal errors) to `errorSubject` — the
workflow-level channel that `connect()` subscribers listen on — so
init-time failures surface in the UI.

| init failure | before | after |
|---|---|---|
| during `WorkflowExecutionService` construction (pre-publish) | logged
+ stored, invisible to the UI | `WorkflowErrorEvent` delivered to the
frontend |
| during `executeWorkflow()` | recorded; UI delivery depended on
subscription timing | `WorkflowErrorEvent` delivered to the frontend |

The push is extracted into a small `reportFatalErrorsToSubscribers`
method so it can be unit-tested without a database (the init path itself
is DB-bound).

### Any related issues, documentation, discussions?

Resolves apache#5782. Discovered while splitting apache#5700 (loop operators) into
smaller PRs; this fix is independent of that feature and applies to
`main` on its own.

### How was this PR tested?

New `WorkflowServiceSpec` (TDD, red → green): pins that
`reportFatalErrorsToSubscribers` delivers a `WorkflowErrorEvent` to a
`connect()` subscriber carrying exactly the fatal errors recorded in the
execution state store (single error, and all errors when several are
present). `sbt "WorkflowExecutionService/testOnly *WorkflowServiceSpec"`
passes (2/2); scalafmt + scalafix clean.

### Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.
xuang7 pushed a commit to ELin2025/texera that referenced this pull request Jul 1, 2026
…ant) (apache#5900)

### What changes were proposed in this PR?

Extends the cross-region **State materialization** format from a single
`content` column to **3 columns** — `content`, `loop_counter`,
`loop_start_id` — promoting loop bookkeeping to first-class columns
(never inside the content JSON). The transport carries them end to end:
the `OutputManager` state writer + `emit_state`, the Python network
sender/receiver, the materialization reader, and the Scala
`state.toTuple` call sites. In memory the two loop fields ride on the
`StateFrame` envelope; they are materialized/serialized as their own
columns (parallel to `content`), and `from_tuple` / `fromTuple` read
only `content` back into the `State`.

The loop-back write address (LoopStart's input-port URI) is
**intentionally not** carried in State. It's constant per-execution
config, not per-iteration data, so it will be delivered to Loop End at
**setup** in the loop PR rather than round-tripping through every State
row. (An earlier revision of this PR carried a `loop_start_state_uri`
column; it was dropped after review — better than shipping a dormant
column and removing it later.)

On the Python side the column-name → value mapping is defined once in
`State.to_columns` and reused by both `to_tuple` (iceberg) and the
network sender's `StateFrame` branch, so adding a column later is a
single-line change in one place rather than an edit in every serializer.

**Dormant on `main`** — nothing observable changes without the loop
operators:

- `to_tuple()` / `toTuple()` and
`OutputManager.save_state_to_storage_if_needed` / `emit_state` default
the two loop columns to `0` / `""`, so every existing non-loop caller is
unchanged.
- `from_tuple` / `fromTuple` read only the `content` column, so
round-trip identity is preserved and the extra columns are inert.

No backward-compatible read of old 1-column State is needed: State
materialization is **intra-execution only** — the iceberg state-document
URI is execution-scoped (`…/eid/{executionId}/`) and recreated fresh
each run, and State tuples are never replayed across executions or
engine versions, so a 1-column tuple can never reach the 3-column
reader.

This is the state-format prerequisite the loop operators build on; the
columns carry non-default values only once Loop Start/End set them
(follow-up PR).

### Any related issues, documentation, discussions?

Extracted from apache#5700 (loop operators) per @Xiao-zhen-Liu's split
request; part of apache#4442 ("Introduce for loop").

### How was this PR tested?

- **Format / round-trip:** `test_state.py` (loop columns are their own
columns, never in content JSON, default to `0` / `""`), Scala
`StateSpec` (both loop columns round-trip through a tuple with
non-default values, not just `content`), `ArrowUtilsSpec` (3-column
Arrow vector round-trip), `IcebergDocumentSpec` (iceberg state-doc
round-trip).
- **Transport:** `test_network_receiver.py`,
`test_input_port_materialization_reader_runnable.py`, and
`test_state_materialization_e2e.py` — the e2e (hermetic sqlite catalog)
writes non-default values for both loop columns end-to-end and asserts
they replay both on the `StateFrame` and on the raw iceberg row,
exercising the real Tuple/Schema/iceberg path.
- **Dormancy:**
`test_output_manager.py::test_defaults_loop_columns_when_omitted` pins
that a no-loop caller (no `loop_counter`) still produces a valid
3-column tuple with the loop columns at `0` / `""`.
- Local: `workflow-core` + `amber` compile; `StateSpec` +
`ArrowUtilsSpec` pass; Python state + transport + e2e tests pass;
scalafmt + scalafix + black clean. (`IcebergDocumentSpec` needs the
iceberg catalog backend, so it runs in CI.)

### Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.
aglinxinyuan and others added 5 commits July 1, 2026 21:54
Reworks the loop back-edge write address per the apache#5900 review decision:
the URI is constant per-execution config, not per-iteration data, so it
no longer rides State rows / the StateFrame envelope (the merged 3-column
format has no loop_start_state_uri). Instead the scheduler resolves it and
ships it to workers at setup:

- PhysicalOp gains a compile-time `isLoopStart` marker (set by
  LoopStartOpDesc, mirroring requiresMaterializedExecution).
- WorkflowExecutionCoordinator derives {LoopStart logical op id -> state
  URI of its single input port} from the final resource-allocated
  schedule; the two single-port/single-reader guards that lived in the
  Python worker's _compute_loop_start_id move controller-side.
- InitializeExecutorRequest gains `map<string,string> loopStartStateUris`
  (field 4), sent to every worker each region (re-)run; the Python
  handler stores it on Context. No static LoopStart<->LoopEnd pairing is
  needed: a Loop End selects the entry by the loop_start_id carried on
  the StateFrame it consumes (nested loops work unchanged).
- MainLoop: _compute_loop_start_id shrinks to the id parse;
  _jump_to_loop_start looks the URI up from the injected config and fails
  loudly (before the jump RPC) if it is missing; the inner-LoopStart
  nested pass-through gate keys on frame.loop_start_id instead of the
  removed URI field.
- LoopIntegrationSpec adopts the per-suite id isolation from apache#5888
  (specId 5), and WorkflowService drops a leftover errorSubject call
  removed on main by apache#5922.

Tests: reworked test_main_loop.py to the 3-arg emit convention and the
config-driven jump (plus a new missing-URI fail-loud case), added
InitializeExecutorHandler coverage for the new field, pinned
isLoopStart true/false in the Loop Start/End descriptor specs.
Two asserts in the pre-existing ECM-priority test were rewrapped by a
different formatter version during the merge; the repo's pinned ruff
(0.14.7, what CI checks) wants main's original style back.
…ative

Review-driven cleanup of the loop feature (net -235 lines), no mechanism
changes:

- _eval_loop_expr evaluates the user expression with eval() instead of
  exec("output = " + expr), so "output" stops being a reserved name and
  _RESERVED_STATE_KEYS shrinks to {"table"}. Two deliberate user-visible
  changes: a loop variable named `output` now persists like any other,
  and multi-statement expressions now raise SyntaxError instead of
  silently executing.
- Drop the dead scratch-key strip in _jump_to_loop_start (run_update is
  @OVERRIDES.final and always persists _strip_reserved(...) before any
  jump can fire) and inline the one-line _compute_loop_start_id wrapper.
- Drop the dead .withSuggestedWorkerNum(1) (WorkerConfig forces
  workerCount=1 for non-parallelizable ops) and its mixin pin.
- Deduplicate tests: merge the twin loop_counter>0 pass-through tests;
  merge the two happy-path jump tests into one that also pins
  RPC-before-write ordering via a shared event log; fold the
  envelope-exclusion assertions into the consume test; drop the 8 pyb
  tricky-input descriptor tests (PythonTemplateBuilderSpec on main pins
  the base64 guarantee) and the engine-simulation single-loop test
  (LoopIntegrationSpec runs it for real with identical expressions).
- Keep one canonical home per design narrative (proto field doc for the
  setup-config story, table_to_ipc_bytes for Arrow-IPC-not-pickle, the
  reset_output_storage call site for the inner-reset story); other
  copies reduced to pointers. Fix three stale TestLoopCounterRuntime
  cross-references.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Introduce for loop

5 participants