Skip to content

Substrait consumer: DuplicateUnqualifiedField when chaining two Window relations with a carried window column #23007

@songkant-aws

Description

@songkant-aws

Substrait consumer fails with DuplicateUnqualifiedField when a plan chains two Window relations and the first window's output column is carried into the second window's input

Which crate

datafusion-substrait (consumer / from_substrait_plan). Reproduced on 54.0.0 (crates.io) and the code path is unchanged on branch-54 / main.

What happens

A logical plan that contains two WindowAggr nodes in series, where a window-derived column produced by the first window survives (via the intervening projections) into the input schema of the second window, round-trips through Substrait and then fails to be consumed:

SchemaError(DuplicateUnqualifiedField {
    name: "avg(data.b) ORDER BY [UInt64(1) ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"
}, Some(""))

(With row_number() windows the duplicated name is
row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING instead — same mechanism.)

The producer side is fine: to_substrait_plan succeeds. The failure is purely on the consumer side, when rebuilding the second Window relation.

Why it happens (root cause)

Substrait intermediate ProjectRel/WindowRel nodes are positional — they carry an output_mapping of field indices, not names. Field names live only at ReadRel.base_schema and at the top-level Plan.relations[].root.names. So any alias a producer attached to an intermediate window column (AS seq1, AS ravg1, …) is dropped in the Substrait IR. When the consumer rebuilds the expressions, a window function gets its default schema name back, e.g. avg(data.b) ... RANGE ... or row_number() ROWS BETWEEN ....

In the consumer, from_project_rel
(src/logical_plan/consumer/rel/project_rel.rs) handles a project that
contains window expressions like this:

// project_rel.rs (54.0.0), lines ~49-82
let mut explicit_exprs: Vec<Expr> = vec![];
let mut window_exprs: HashSet<Expr> = HashSet::new();
for expr in &p.expressions {
    let e = consumer.consume_expression(expr, input.clone().schema()).await?;
    if let Expr::WindowFunction(_) = &e {
        window_exprs.insert(e.clone());
    }
    explicit_exprs.push(name_tracker.get_uniquely_named_expr(e)?);   // (A)
}

let input = if !window_exprs.is_empty() {
    LogicalPlanBuilder::window_plan(input, window_exprs)?            // (B)  <-- error originates here
} else {
    input
};

let mut final_exprs: Vec<Expr> = vec![];
for index in 0..original_schema.fields().len() {
    let e = Expr::Column(Column::from(original_schema.qualified_field(index)));
    final_exprs.push(name_tracker.get_uniquely_named_expr(e)?);      // (C)
}
final_exprs.append(&mut explicit_exprs);
project(input, final_exprs)                                         // (D)

The NameTracker dedup at (A) and (C) only governs the names of the
outer projection built at (D). The collision, however, happens earlier,
at (B), inside LogicalPlanBuilder::window_plan.window(...)
Window::try_new (datafusion-expr plan.rs), which builds
window_fields = [ ...all input fields... ] ++ [ ...new window expr fields... ]
and passes it to DFSchema::new_with_metadata, whose check_names() raises
DuplicateUnqualifiedField:

[ ...all input fields... ] ++ [ ...new window expr fields... ]

When the first window's output column has been carried down into input
(here named avg(data.b) ... RANGE ...), and the second window re-introduces an
expression with the same default schema name (because its alias was lost over
Substrait), the resulting DFSchema has two fields with identical unqualified
names → DuplicateUnqualifiedField.

NameTracker never inspects the schema inside window_plan, so it cannot
prevent this. It also does not seed itself with the input schema's existing field
names, so even at the project level an inherited window column and a freshly-built
identical one are not deduplicated against each other.

Relationship to #15211

PR #15211 added NameTracker to dedup duplicate window functions referenced
multiple times within a single project
. That fix is real but orthogonal: it
covers duplicates within one from_project_rel call's explicit expressions. It
does not cover the case here — an inherited window column (from a previous
window relation, already in the input schema) colliding with a newly-built window
expression of the same default name. The duplicate is across the input-schema /
new-window boundary inside window_plan, which NameTracker doesn't reach.

Minimal reproductions

All four are datafusion-substrait integration tests
(datafusion/substrait/tests/cases/roundtrip_logical_plan.rs) using the existing
create_context() / roundtrip_logical_plan_with_ctx() helpers.

Repro 1 — two identical row_number() windows (fails)

#[tokio::test]
async fn chained_identical_window_functions() -> Result<()> {
    use datafusion::functions_window::expr_fn::row_number;
    let ctx = create_context().await?;
    let scan = ctx.table("data").await?.into_optimized_plan()?;
    let plan = LogicalPlanBuilder::from(scan)
        .window(vec![row_number().alias("rn1")])?
        .window(vec![row_number().alias("rn2")])?
        .build()?;
    roundtrip_logical_plan_with_ctx(plan, ctx).await?;   // FAILS: DuplicateUnqualifiedField
    Ok(())
}

Repro 2 — seq window + sort + project-except, chained (PASSES — important contrast)

#[tokio::test]
async fn chained_seq_with_sort_and_project_except() -> Result<()> {
    use datafusion::functions_window::expr_fn::row_number;
    let ctx = create_context().await?;
    let scan = ctx.table("data").await?.into_optimized_plan()?;
    let plan = LogicalPlanBuilder::from(scan)
        .window(vec![row_number().alias("seq1")])?
        .project(vec![col("a"), col("b"), col("seq1")])?
        .sort(vec![col("seq1").sort(true, false)])?
        .project(vec![col("a"), col("b")])?              // DROP seq1
        .window(vec![row_number().alias("seq2")])?
        .project(vec![col("a"), col("b"), col("seq2")])?
        .sort(vec![col("seq2").sort(true, false)])?
        .project(vec![col("a"), col("b")])?              // DROP seq2
        .build()?;
    roundtrip_logical_plan_with_ctx(plan, ctx).await?;   // PASSES
    Ok(())
}

This passes because dropping seq1 lets the plan flatten so the two
row_number() columns never coexist in a single Window input schema. This case
isolates the trigger: it is not sort/project-except that breaks things.

Repro 3 — faithful shape: window emits seq + running aggregate, the aggregate is carried through (fails)

This mirrors a real chained streamstats plan: each window emits a sequence
column and a running aggregate; the running aggregate is carried through the
intervening projects (only the seq is dropped). Carrying that window column is
what keeps window #1's output alive in window #2's input.

#[tokio::test]
async fn chained_window_with_carried_running_aggregate() -> Result<()> {
    use datafusion::functions_aggregate::average::avg_udaf;
    use datafusion::functions_window::expr_fn::row_number;
    use datafusion::logical_expr::expr::WindowFunction;
    use datafusion::logical_expr::{Expr, ExprFunctionExt, WindowFrame};

    let ctx = create_context().await?;
    let scan = ctx.table("data").await?.into_optimized_plan()?;

    let running1 = Expr::from(WindowFunction::new(avg_udaf(), vec![col("b")]))
        .window_frame(WindowFrame::new(Some(false))).build()?.alias("ravg1");
    let running2 = Expr::from(WindowFunction::new(avg_udaf(), vec![col("b")]))
        .window_frame(WindowFrame::new(Some(false))).build()?.alias("ravg2");

    let plan = LogicalPlanBuilder::from(scan)
        .window(vec![row_number().alias("seq1"), running1])?
        .project(vec![col("a"), col("b"), col("seq1"), col("ravg1")])?
        .sort(vec![col("seq1").sort(true, false)])?
        .project(vec![col("a"), col("b"), col("ravg1")])?            // drop seq1, KEEP ravg1
        .window(vec![row_number().alias("seq2"), running2])?
        .project(vec![col("a"), col("b"), col("ravg1"), col("seq2"), col("ravg2")])?
        .sort(vec![col("seq2").sort(true, false)])?
        .project(vec![col("a"), col("b"), col("ravg1"), col("ravg2")])?  // drop seq2
        .build()?;

    roundtrip_logical_plan_with_ctx(plan, ctx).await?;
    // FAILS: DuplicateUnqualifiedField {
    //   name: "avg(data.b) ORDER BY [UInt64(1) ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" }
    Ok(())
}

Suggested fix (direction, not a final patch)

Extend the existing NameTracker mechanism so it also dedups against names that
already exist in the window relation's input schema. Concretely, in
from_project_rel, before building window_exprs, seed the tracker (or apply
get_uniquely_named_expr) with the input schema's field names, and pass uniquely
re-aliased window expressions into window_plan, so an inherited window column
forces the newly-built one to receive a __temp__N suffix — the same disambiguation
#15211 already uses, just extended across the input-schema boundary. This keeps the
positional output_mapping semantics intact (the suffix only affects internal
schema names, not the emitted ordering).

I'm happy to open a PR along these lines and add the four tests above (the passing
one included, as a guard against "fixing" it by breaking the flatten path).

Environment

  • datafusion / datafusion-substrait 54.0.0 (also present on branch-54, HEAD 08da279, and main).
  • Discovered via OpenSearch's PPL streamstats command, which lowers chained
    streamstats into exactly this window → project(carry) → sort → project(drop seq)
    shape and ships the plan to a DataFusion backend over Substrait.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions