Substrait consumer fails with DuplicateUnqualifiedField when a plan chains two Window relations and the first window's output column is carried into the second window's input
Which crate
datafusion-substrait (consumer / from_substrait_plan). Reproduced on 54.0.0 (crates.io) and the code path is unchanged on branch-54 / main.
What happens
A logical plan that contains two WindowAggr nodes in series, where a window-derived column produced by the first window survives (via the intervening projections) into the input schema of the second window, round-trips through Substrait and then fails to be consumed:
SchemaError(DuplicateUnqualifiedField {
name: "avg(data.b) ORDER BY [UInt64(1) ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"
}, Some(""))
(With row_number() windows the duplicated name is
row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING instead — same mechanism.)
The producer side is fine: to_substrait_plan succeeds. The failure is purely on the consumer side, when rebuilding the second Window relation.
Why it happens (root cause)
Substrait intermediate ProjectRel/WindowRel nodes are positional — they carry an output_mapping of field indices, not names. Field names live only at ReadRel.base_schema and at the top-level Plan.relations[].root.names. So any alias a producer attached to an intermediate window column (AS seq1, AS ravg1, …) is dropped in the Substrait IR. When the consumer rebuilds the expressions, a window function gets its default schema name back, e.g. avg(data.b) ... RANGE ... or row_number() ROWS BETWEEN ....
In the consumer, from_project_rel
(src/logical_plan/consumer/rel/project_rel.rs) handles a project that
contains window expressions like this:
// project_rel.rs (54.0.0), lines ~49-82
let mut explicit_exprs: Vec<Expr> = vec![];
let mut window_exprs: HashSet<Expr> = HashSet::new();
for expr in &p.expressions {
let e = consumer.consume_expression(expr, input.clone().schema()).await?;
if let Expr::WindowFunction(_) = &e {
window_exprs.insert(e.clone());
}
explicit_exprs.push(name_tracker.get_uniquely_named_expr(e)?); // (A)
}
let input = if !window_exprs.is_empty() {
LogicalPlanBuilder::window_plan(input, window_exprs)? // (B) <-- error originates here
} else {
input
};
let mut final_exprs: Vec<Expr> = vec![];
for index in 0..original_schema.fields().len() {
let e = Expr::Column(Column::from(original_schema.qualified_field(index)));
final_exprs.push(name_tracker.get_uniquely_named_expr(e)?); // (C)
}
final_exprs.append(&mut explicit_exprs);
project(input, final_exprs) // (D)
The NameTracker dedup at (A) and (C) only governs the names of the
outer projection built at (D). The collision, however, happens earlier,
at (B), inside LogicalPlanBuilder::window_plan → .window(...) →
Window::try_new (datafusion-expr plan.rs), which builds
window_fields = [ ...all input fields... ] ++ [ ...new window expr fields... ]
and passes it to DFSchema::new_with_metadata, whose check_names() raises
DuplicateUnqualifiedField:
[ ...all input fields... ] ++ [ ...new window expr fields... ]
When the first window's output column has been carried down into input
(here named avg(data.b) ... RANGE ...), and the second window re-introduces an
expression with the same default schema name (because its alias was lost over
Substrait), the resulting DFSchema has two fields with identical unqualified
names → DuplicateUnqualifiedField.
NameTracker never inspects the schema inside window_plan, so it cannot
prevent this. It also does not seed itself with the input schema's existing field
names, so even at the project level an inherited window column and a freshly-built
identical one are not deduplicated against each other.
Relationship to #15211
PR #15211 added NameTracker to dedup duplicate window functions referenced
multiple times within a single project. That fix is real but orthogonal: it
covers duplicates within one from_project_rel call's explicit expressions. It
does not cover the case here — an inherited window column (from a previous
window relation, already in the input schema) colliding with a newly-built window
expression of the same default name. The duplicate is across the input-schema /
new-window boundary inside window_plan, which NameTracker doesn't reach.
Minimal reproductions
All four are datafusion-substrait integration tests
(datafusion/substrait/tests/cases/roundtrip_logical_plan.rs) using the existing
create_context() / roundtrip_logical_plan_with_ctx() helpers.
Repro 1 — two identical row_number() windows (fails)
#[tokio::test]
async fn chained_identical_window_functions() -> Result<()> {
use datafusion::functions_window::expr_fn::row_number;
let ctx = create_context().await?;
let scan = ctx.table("data").await?.into_optimized_plan()?;
let plan = LogicalPlanBuilder::from(scan)
.window(vec![row_number().alias("rn1")])?
.window(vec![row_number().alias("rn2")])?
.build()?;
roundtrip_logical_plan_with_ctx(plan, ctx).await?; // FAILS: DuplicateUnqualifiedField
Ok(())
}
Repro 2 — seq window + sort + project-except, chained (PASSES — important contrast)
#[tokio::test]
async fn chained_seq_with_sort_and_project_except() -> Result<()> {
use datafusion::functions_window::expr_fn::row_number;
let ctx = create_context().await?;
let scan = ctx.table("data").await?.into_optimized_plan()?;
let plan = LogicalPlanBuilder::from(scan)
.window(vec![row_number().alias("seq1")])?
.project(vec![col("a"), col("b"), col("seq1")])?
.sort(vec![col("seq1").sort(true, false)])?
.project(vec![col("a"), col("b")])? // DROP seq1
.window(vec![row_number().alias("seq2")])?
.project(vec![col("a"), col("b"), col("seq2")])?
.sort(vec![col("seq2").sort(true, false)])?
.project(vec![col("a"), col("b")])? // DROP seq2
.build()?;
roundtrip_logical_plan_with_ctx(plan, ctx).await?; // PASSES
Ok(())
}
This passes because dropping seq1 lets the plan flatten so the two
row_number() columns never coexist in a single Window input schema. This case
isolates the trigger: it is not sort/project-except that breaks things.
Repro 3 — faithful shape: window emits seq + running aggregate, the aggregate is carried through (fails)
This mirrors a real chained streamstats plan: each window emits a sequence
column and a running aggregate; the running aggregate is carried through the
intervening projects (only the seq is dropped). Carrying that window column is
what keeps window #1's output alive in window #2's input.
#[tokio::test]
async fn chained_window_with_carried_running_aggregate() -> Result<()> {
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::functions_window::expr_fn::row_number;
use datafusion::logical_expr::expr::WindowFunction;
use datafusion::logical_expr::{Expr, ExprFunctionExt, WindowFrame};
let ctx = create_context().await?;
let scan = ctx.table("data").await?.into_optimized_plan()?;
let running1 = Expr::from(WindowFunction::new(avg_udaf(), vec![col("b")]))
.window_frame(WindowFrame::new(Some(false))).build()?.alias("ravg1");
let running2 = Expr::from(WindowFunction::new(avg_udaf(), vec![col("b")]))
.window_frame(WindowFrame::new(Some(false))).build()?.alias("ravg2");
let plan = LogicalPlanBuilder::from(scan)
.window(vec![row_number().alias("seq1"), running1])?
.project(vec![col("a"), col("b"), col("seq1"), col("ravg1")])?
.sort(vec![col("seq1").sort(true, false)])?
.project(vec![col("a"), col("b"), col("ravg1")])? // drop seq1, KEEP ravg1
.window(vec![row_number().alias("seq2"), running2])?
.project(vec![col("a"), col("b"), col("ravg1"), col("seq2"), col("ravg2")])?
.sort(vec![col("seq2").sort(true, false)])?
.project(vec![col("a"), col("b"), col("ravg1"), col("ravg2")])? // drop seq2
.build()?;
roundtrip_logical_plan_with_ctx(plan, ctx).await?;
// FAILS: DuplicateUnqualifiedField {
// name: "avg(data.b) ORDER BY [UInt64(1) ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" }
Ok(())
}
Suggested fix (direction, not a final patch)
Extend the existing NameTracker mechanism so it also dedups against names that
already exist in the window relation's input schema. Concretely, in
from_project_rel, before building window_exprs, seed the tracker (or apply
get_uniquely_named_expr) with the input schema's field names, and pass uniquely
re-aliased window expressions into window_plan, so an inherited window column
forces the newly-built one to receive a __temp__N suffix — the same disambiguation
#15211 already uses, just extended across the input-schema boundary. This keeps the
positional output_mapping semantics intact (the suffix only affects internal
schema names, not the emitted ordering).
I'm happy to open a PR along these lines and add the four tests above (the passing
one included, as a guard against "fixing" it by breaking the flatten path).
Environment
datafusion / datafusion-substrait 54.0.0 (also present on branch-54, HEAD 08da279, and main).
- Discovered via OpenSearch's PPL
streamstats command, which lowers chained
streamstats into exactly this window → project(carry) → sort → project(drop seq)
shape and ships the plan to a DataFusion backend over Substrait.
Substrait consumer fails with
DuplicateUnqualifiedFieldwhen a plan chains two Window relations and the first window's output column is carried into the second window's inputWhich crate
datafusion-substrait(consumer /from_substrait_plan). Reproduced on54.0.0(crates.io) and the code path is unchanged onbranch-54/main.What happens
A logical plan that contains two
WindowAggrnodes in series, where a window-derived column produced by the first window survives (via the intervening projections) into the input schema of the second window, round-trips through Substrait and then fails to be consumed:(With
row_number()windows the duplicated name isrow_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWINGinstead — same mechanism.)The producer side is fine:
to_substrait_plansucceeds. The failure is purely on the consumer side, when rebuilding the secondWindowrelation.Why it happens (root cause)
Substrait intermediate
ProjectRel/WindowRelnodes are positional — they carry anoutput_mappingof field indices, not names. Field names live only atReadRel.base_schemaand at the top-levelPlan.relations[].root.names. So any alias a producer attached to an intermediate window column (AS seq1,AS ravg1, …) is dropped in the Substrait IR. When the consumer rebuilds the expressions, a window function gets its default schema name back, e.g.avg(data.b) ... RANGE ...orrow_number() ROWS BETWEEN ....In the consumer,
from_project_rel(
src/logical_plan/consumer/rel/project_rel.rs) handles a project thatcontains window expressions like this:
The
NameTrackerdedup at (A) and (C) only governs the names of theouter projection built at (D). The collision, however, happens earlier,
at (B), inside
LogicalPlanBuilder::window_plan→.window(...)→Window::try_new(datafusion-exprplan.rs), which buildswindow_fields = [ ...all input fields... ] ++ [ ...new window expr fields... ]and passes it to
DFSchema::new_with_metadata, whosecheck_names()raisesDuplicateUnqualifiedField:When the first window's output column has been carried down into
input(here named
avg(data.b) ... RANGE ...), and the second window re-introduces anexpression with the same default schema name (because its alias was lost over
Substrait), the resulting
DFSchemahas two fields with identical unqualifiednames →
DuplicateUnqualifiedField.NameTrackernever inspects the schema insidewindow_plan, so it cannotprevent this. It also does not seed itself with the input schema's existing field
names, so even at the project level an inherited window column and a freshly-built
identical one are not deduplicated against each other.
Relationship to #15211
PR #15211 added
NameTrackerto dedup duplicate window functions referencedmultiple times within a single project. That fix is real but orthogonal: it
covers duplicates within one
from_project_relcall's explicit expressions. Itdoes not cover the case here — an inherited window column (from a previous
window relation, already in the input schema) colliding with a newly-built window
expression of the same default name. The duplicate is across the input-schema /
new-window boundary inside
window_plan, whichNameTrackerdoesn't reach.Minimal reproductions
All four are
datafusion-substraitintegration tests(
datafusion/substrait/tests/cases/roundtrip_logical_plan.rs) using the existingcreate_context()/roundtrip_logical_plan_with_ctx()helpers.Repro 1 — two identical
row_number()windows (fails)Repro 2 — seq window + sort + project-except, chained (PASSES — important contrast)
This passes because dropping
seq1lets the plan flatten so the tworow_number()columns never coexist in a single Window input schema. This caseisolates the trigger: it is not
sort/project-exceptthat breaks things.Repro 3 — faithful shape: window emits seq + running aggregate, the aggregate is carried through (fails)
This mirrors a real chained
streamstatsplan: each window emits a sequencecolumn and a running aggregate; the running aggregate is carried through the
intervening projects (only the seq is dropped). Carrying that window column is
what keeps window #1's output alive in window #2's input.
Suggested fix (direction, not a final patch)
Extend the existing
NameTrackermechanism so it also dedups against names thatalready exist in the window relation's input schema. Concretely, in
from_project_rel, before buildingwindow_exprs, seed the tracker (or applyget_uniquely_named_expr) with the input schema's field names, and pass uniquelyre-aliased window expressions into
window_plan, so an inherited window columnforces the newly-built one to receive a
__temp__Nsuffix — the same disambiguation#15211 already uses, just extended across the input-schema boundary. This keeps the
positional
output_mappingsemantics intact (the suffix only affects internalschema names, not the emitted ordering).
I'm happy to open a PR along these lines and add the four tests above (the passing
one included, as a guard against "fixing" it by breaking the flatten path).
Environment
datafusion/datafusion-substrait54.0.0(also present onbranch-54, HEAD08da279, andmain).streamstatscommand, which lowers chainedstreamstatsinto exactly thiswindow → project(carry) → sort → project(drop seq)shape and ships the plan to a DataFusion backend over Substrait.