feat(scheduling): reuse output storage across region re-executions#5707
Conversation
Add a `reusesOutputStorageOnReExecution` flag to `PhysicalOp` (default false) plus a `withReusesOutputStorageOnReExecution` builder. When set, `RegionExecutionCoordinator` reuses an operator's existing iceberg output and state documents on a region re-run instead of recreating them, via a new pure `provisionOutputDocument` decision function unit-tested by `RegionOutputProvisioningSpec`. The flag is named for the behavior the scheduler checks, not the operator that sets it, so any future operator needing output accumulated across region re-executions can opt in. No operator sets it yet, so the path is dormant and behavior-preserving. Split out of the loop-operators PR (apache#5700) to keep that PR reviewable; the loop feature will set the flag on Loop End.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5707 +/- ##
============================================
- Coverage 53.18% 52.96% -0.23%
+ Complexity 2659 2647 -12
============================================
Files 1095 1094 -1
Lines 42374 42343 -31
Branches 4559 4557 -2
============================================
- Hits 22535 22425 -110
- Misses 18509 18604 +95
+ Partials 1330 1314 -16
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR introduces an opt-in mechanism for operators to preserve their output/state storage across region re-executions (e.g., loop iterations), by adding a flag on PhysicalOp and updating the region scheduler’s output-document provisioning logic to conditionally reuse existing documents. It also adds focused unit tests for the new create-vs-reuse decision function.
Changes:
- Add
reusesOutputStorageOnReExecution: Booleanand awithReusesOutputStorageOnReExecutionbuilder toPhysicalOp. - Extract output document provisioning decision into
RegionExecutionCoordinator.provisionOutputDocumentand use it when provisioning per-output-port result/state documents. - Add
RegionOutputProvisioningSpecunit tests covering the reuse×exists matrix and the non-reuse short-circuit.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala | Adds a new operator-level flag + builder to signal output-storage reuse across region re-executions. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala | Adds a pure provisioning decision function and uses it to create-or-reuse result/state documents per output port based on the owning operator’s flag. |
| amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionOutputProvisioningSpec.scala | Introduces unit tests validating the provisioning decision logic without needing an Iceberg backend. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 423 | 0.258 | 21,278/37,235/37,235 us | 🔴 +17.7% / 🟢 -10.5% |
| ⚪ | bs=100 sw=10 sl=64 | 955 | 0.583 | 104,697/131,056/131,056 us | ⚪ within ±5% / 🟢 +7.1% |
| 🟢 | bs=1000 sw=10 sl=64 | 1,127 | 0.688 | 898,715/937,408/937,408 us | 🟢 -5.6% / 🟢 -8.4% |
Baseline details
Latest main b7fe06e from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 423 tuples/sec | 451 tuples/sec | 410.82 tuples/sec | -6.2% | +3.0% |
| bs=10 sw=10 sl=64 | MB/s | 0.258 MB/s | 0.275 MB/s | 0.251 MB/s | -6.2% | +2.9% |
| bs=10 sw=10 sl=64 | p50 | 21,278 us | 21,592 us | 23,785 us | -1.5% | -10.5% |
| bs=10 sw=10 sl=64 | p95 | 37,235 us | 31,640 us | 34,980 us | +17.7% | +6.4% |
| bs=10 sw=10 sl=64 | p99 | 37,235 us | 31,640 us | 34,980 us | +17.7% | +6.4% |
| bs=100 sw=10 sl=64 | throughput | 955 tuples/sec | 963 tuples/sec | 891.94 tuples/sec | -0.8% | +7.1% |
| bs=100 sw=10 sl=64 | MB/s | 0.583 MB/s | 0.588 MB/s | 0.544 MB/s | -0.9% | +7.1% |
| bs=100 sw=10 sl=64 | p50 | 104,697 us | 102,143 us | 112,277 us | +2.5% | -6.8% |
| bs=100 sw=10 sl=64 | p95 | 131,056 us | 126,908 us | 139,802 us | +3.3% | -6.3% |
| bs=100 sw=10 sl=64 | p99 | 131,056 us | 126,908 us | 139,802 us | +3.3% | -6.3% |
| bs=1000 sw=10 sl=64 | throughput | 1,127 tuples/sec | 1,093 tuples/sec | 1,041 tuples/sec | +3.1% | +8.3% |
| bs=1000 sw=10 sl=64 | MB/s | 0.688 MB/s | 0.667 MB/s | 0.635 MB/s | +3.1% | +8.3% |
| bs=1000 sw=10 sl=64 | p50 | 898,715 us | 912,360 us | 972,714 us | -1.5% | -7.6% |
| bs=1000 sw=10 sl=64 | p95 | 937,408 us | 992,709 us | 1,023,057 us | -5.6% | -8.4% |
| bs=1000 sw=10 sl=64 | p99 | 937,408 us | 992,709 us | 1,023,057 us | -5.6% | -8.4% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,473.01,200,128000,423,0.258,21278.28,37235.25,37235.25
1,100,10,64,20,2094.76,2000,1280000,955,0.583,104697.14,131056.37,131056.37
2,1000,10,64,20,17753.34,20000,12800000,1127,0.688,898715.39,937408.04,937408.04Add a PhysicalOp builder test alongside the existing withParallelizable case, exercising the previously-uncovered `this.copy(...)` line that Codecov flagged on apache#5707 (patch coverage 85.71%, 1 missing line). Asserts the default false, the flipped value, and immutability of the original instance.
Yicong-Huang
left a comment
There was a problem hiding this comment.
Left comments inline!
…utputPort Address @Yicong-Huang's review on apache#5707: - Move the create-or-reuse decision out of RegionExecutionCoordinator into DocumentFactory.createOrReuseDocument -- it is storage-layer logic, not scheduling. - Move the reuse flag off PhysicalOp onto the OutputPort proto, alongside the existing per-port `blocking`/`mode`; storage behavior is port-specific. The coordinator now reads it per output port and maintains no reuse state itself. - Relocate the unit test to DocumentFactorySpec. Per-port differentiation is still required (answering the "why not reuse for all?" question): the loop back-edge re-executes LoopStart and every loop-body operator on the same event as LoopEnd, but only LoopEnd accumulates -- the others must recreate a fresh document each iteration.
…ith apache#5707) apache#5707 redesigned the reuse mechanism per review: the flag moved from PhysicalOp.reusesOutputStorageOnReExecution onto OutputPort.reusesOutputStorage, and the create-or-reuse decision moved out of RegionExecutionCoordinator into DocumentFactory.createOrReuseDocument. Apply the same change here so loop-feb stays internally consistent and rebases cleanly once apache#5707 lands: - mechanism files (workflow.proto, DocumentFactory, RegionExecutionCoordinator, PhysicalOp, DocumentFactorySpec) brought in line with apache#5707; drop the old RegionOutputProvisioningSpec. - LoopOpDesc.getPhysicalOp now sets reusesOutputStorage on the operator's output port (true for Loop End) instead of the removed PhysicalOp builder. - LoopStart/EndOpDescSpec assert the port flag; comment references updated.
Moving the create-or-reuse decision out of RegionExecutionCoordinator (into DocumentFactory) removed the only use of `java.net.URI` here; scalafix RemoveUnused flagged the leftover import in CI. Drop it.
Same lint fix as apache#5707: moving the create-or-reuse decision into DocumentFactory removed the only use of `java.net.URI` in RegionExecutionCoordinator; scalafix RemoveUnused flagged the leftover import in the amber Lint CI step.
…torage Per @Yicong-Huang's review on apache#5707: add a sanity check that iterates every registered operator (OperatorMetadataGenerator.operatorTypeMap) and asserts no output port sets reusesOutputStorage. No operator needs it yet -- Loop End, the only one that will, is not on main -- so this catches an unexpected/accidental enablement. To be updated to allow Loop End's output port (and only it) when the loop operators land.
…guard Follow-up to the apache#5707 guard (@Yicong-Huang). Declare the reusesOutputStorage flag on LoopOpDesc's output port in operatorInfo (alongside where blocking/mode live) instead of mapping it in getPhysicalOp, so it is declarative and the cross-operator guard can see it. Add OutputPortReuseFlagSpec -- the LoopEnd-allowing form of the apache#5707 guard: only Loop End may enable the flag; every other operator's output ports must have it false.
Yicong-Huang
left a comment
There was a problem hiding this comment.
Left minor comments inline.
… the flag Address @Yicong-Huang's review on apache#5707: - DocumentFactory.createOrReuseDocument now returns the VirtualDocument (opened when reused, created otherwise) instead of a Boolean, so the call site need not branch on create-vs-reuse. - RegionExecutionCoordinator adds a runtime require() that no output port sets reusesOutputStorage -- a production guard, since the flag only activates with the loop operators (not on main). Remove/relax it when introducing them.
apache#5707 apache#5707 changed DocumentFactory.createOrReuseDocument to return the VirtualDocument (opened when reused, created otherwise) instead of a Boolean. Sync that here. loop-feb deliberately omits apache#5707's production require-guard on reusesOutputStorage, since the loop operators legitimately set the flag.
Yicong-Huang
left a comment
There was a problem hiding this comment.
Please also update the doc and PR description to reflect the final code: I think better to avoid mentioning alternative solutions that have been reviewed and decided to change.
…mments Address review nits on apache#5707: - rename the proto field OutputPort.reusesOutputStorage -> reuseStorage (it already lives on OutputPort, so the "Output" qualifier is redundant) - drop the "Either way the caller gets the document..." sentence from the createOrReuseDocument scaladoc - reword RegionExecutionCoordinator comments to describe the final behavior instead of contrasting alternatives ("rather than")
|
Also refreshed the PR description to match the final code ( |
review polish Keep loop-feb consistent with the apache#5707 review changes: proto field OutputPort.reusesOutputStorage -> reuseStorage, the createOrReuseDocument scaladoc, and the RegionExecutionCoordinator comment wording. Also rename the LoopOpDesc/LoopEndOpDesc reuseStorage toggle and the spec assertions to match the new field name.
apache#5707 (reuse output storage) is now on main, so its files drop out of loop-feb's diff (proto reuseStorage field, DocumentFactory.createOrReuseDocument + spec). Conflict resolutions: - RegionExecutionCoordinator: keep loop-feb's version WITHOUT the require(!reuseStorage) production guard -- the loop operators legitimately set the flag, so the guard (correct for apache#5707 in isolation) must not fire here. - OutputPortReuseFlagSpec: keep loop-feb's LoopEnd-allowing guard (only Loop End may set reuseStorage), superseding main's all-false form from apache#5707. - test_state_materialization_e2e.py: keep loop-feb's 4-column-State version (loop_counter/loop_start_id/loop_start_state_uri as their own columns). NOTE: this reverts main's apache#5682 class-based-fixture refactor of that test; re-apply that structure on top of the 4-column semantics in the PR2 state PR.
…pache#5707) ### What changes were proposed in this PR? Adds an opt-in mechanism for an output port to **reuse** its storage when the owning operator's region re-executes, instead of recreating the document each time. Dormant and behavior-preserving — no operator sets the flag in this PR. - `OutputPort` gains a `reuseStorage: Boolean` proto field (alongside `blocking` / `mode`). It marks a port whose output accumulates across region re-executions — e.g. a Loop End port whose result builds up over the iterations of its own loop. - `DocumentFactory.createOrReuseDocument(uri, schema, reuseExisting, …)` is the create-or-reuse decision: when reuse is requested and a document already exists it opens and returns that one; otherwise it creates a fresh one. It always returns the document, so the call site does not branch. - `RegionExecutionCoordinator` reads each output port's `reuseStorage` flag while provisioning that port's result/state documents and routes through `createOrReuseDocument`. | port flag | region re-run behavior | |---|---| | `false` (every operator today) | recreate output/state documents — unchanged | | `true` (set by Loop End in the loop PR) | keep and reopen the existing documents | A runtime guard in `RegionExecutionCoordinator` asserts no port sets `reuseStorage` for now: the flag activates only with the loop operators, which are not yet on `main`. The guard keeps the dormant reuse path from being silently exercised before its consumer exists, and is removed when the loop operators land. ### Any related issues, documentation, discussions? Resolves apache#5709 (sub-issue of apache#4442 "Introduce for loop"). Split out of apache#5700 to keep that PR reviewable, per @Xiao-zhen-Liu's [review](apache#4206 (review)). ### How was this PR tested? - `DocumentFactorySpec` — pins the create-or-reuse decision (the reuse × exists matrix plus the "no-reuse never probes existence" short-circuit) with injected document stubs, no iceberg backend. - `OutputPortReuseFlagSpec` — guards that no registered operator enables `reuseStorage` on any output port. - `WorkflowCore` / `WorkflowOperator` / `WorkflowExecutionService` compile; scalafmt + scalafix clean. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF.
What changes were proposed in this PR?
Adds an opt-in mechanism for an output port to reuse its storage when the owning operator's region re-executes, instead of recreating the document each time. Dormant and behavior-preserving — no operator sets the flag in this PR.
OutputPortgains areuseStorage: Booleanproto field (alongsideblocking/mode). It marks a port whose output accumulates across region re-executions — e.g. a Loop End port whose result builds up over the iterations of its own loop.DocumentFactory.createOrReuseDocument(uri, schema, reuseExisting, …)is the create-or-reuse decision: when reuse is requested and a document already exists it opens and returns that one; otherwise it creates a fresh one. It always returns the document, so the call site does not branch.RegionExecutionCoordinatorreads each output port'sreuseStorageflag while provisioning that port's result/state documents and routes throughcreateOrReuseDocument.false(every operator today)true(set by Loop End in the loop PR)A runtime guard in
RegionExecutionCoordinatorasserts no port setsreuseStoragefor now: the flag activates only with the loop operators, which are not yet onmain. The guard keeps the dormant reuse path from being silently exercised before its consumer exists, and is removed when the loop operators land.Any related issues, documentation, discussions?
Resolves #5709 (sub-issue of #4442 "Introduce for loop"). Split out of #5700 to keep that PR reviewable, per @Xiao-zhen-Liu's review.
How was this PR tested?
DocumentFactorySpec— pins the create-or-reuse decision (the reuse × exists matrix plus the "no-reuse never probes existence" short-circuit) with injected document stubs, no iceberg backend.OutputPortReuseFlagSpec— guards that no registered operator enablesreuseStorageon any output port.WorkflowCore/WorkflowOperator/WorkflowExecutionServicecompile; scalafmt + scalafix clean.Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.