From ec1bc2c976791650339e84bada19801fd77b7218 Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 13 Nov 2024 22:07:06 +0800 Subject: [PATCH 1/4] sort out codes of single column group by. --- .../src/aggregates/group_values/mod.rs | 18 +++++++++-------- .../group_values/{ => single_column}/bytes.rs | 0 .../{ => single_column}/bytes_view.rs | 0 .../group_values/single_column/mod.rs | 20 +++++++++++++++++++ .../{ => single_column}/primitive.rs | 0 .../src/aggregates/topk/hash_table.rs | 2 +- 6 files changed, 31 insertions(+), 9 deletions(-) rename datafusion/physical-plan/src/aggregates/group_values/{ => single_column}/bytes.rs (100%) rename datafusion/physical-plan/src/aggregates/group_values/{ => single_column}/bytes_view.rs (100%) create mode 100644 datafusion/physical-plan/src/aggregates/group_values/single_column/mod.rs rename datafusion/physical-plan/src/aggregates/group_values/{ => single_column}/primitive.rs (100%) diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index 12ed25a0ea349..e361f0db9dd37 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -20,24 +20,26 @@ use arrow::record_batch::RecordBatch; use arrow_array::{downcast_primitive, ArrayRef}; use arrow_schema::{DataType, SchemaRef}; -use bytes_view::GroupValuesBytesView; use datafusion_common::Result; -pub(crate) mod primitive; use datafusion_expr::EmitTo; -use primitive::GroupValuesPrimitive; mod multi_column; mod row; +mod single_column; +use datafusion_physical_expr::binary_map::OutputType; use multi_column::GroupValuesColumn; use row::GroupValuesRows; -mod bytes; -mod bytes_view; -use bytes::GroupValuesByes; -use datafusion_physical_expr::binary_map::OutputType; +pub(crate) use single_column::primitive::HashValue; -use crate::aggregates::order::GroupOrdering; +use crate::aggregates::{ + group_values::single_column::{ + bytes::GroupValuesByes, bytes_view::GroupValuesBytesView, + primitive::GroupValuesPrimitive, + }, + order::GroupOrdering, +}; mod null_builder; diff --git a/datafusion/physical-plan/src/aggregates/group_values/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/single_column/bytes.rs similarity index 100% rename from datafusion/physical-plan/src/aggregates/group_values/bytes.rs rename to datafusion/physical-plan/src/aggregates/group_values/single_column/bytes.rs diff --git a/datafusion/physical-plan/src/aggregates/group_values/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/single_column/bytes_view.rs similarity index 100% rename from datafusion/physical-plan/src/aggregates/group_values/bytes_view.rs rename to datafusion/physical-plan/src/aggregates/group_values/single_column/bytes_view.rs diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_column/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/single_column/mod.rs new file mode 100644 index 0000000000000..30282bdea5422 --- /dev/null +++ b/datafusion/physical-plan/src/aggregates/group_values/single_column/mod.rs @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub(crate) mod bytes; +pub(crate) mod bytes_view; +pub(crate) mod primitive; diff --git a/datafusion/physical-plan/src/aggregates/group_values/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/single_column/primitive.rs similarity index 100% rename from datafusion/physical-plan/src/aggregates/group_values/primitive.rs rename to datafusion/physical-plan/src/aggregates/group_values/single_column/primitive.rs diff --git a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs index 34df643b6cf0c..23a07ebec305f 100644 --- a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs +++ b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs @@ -17,7 +17,7 @@ //! A wrapper around `hashbrown::RawTable` that allows entries to be tracked by index -use crate::aggregates::group_values::primitive::HashValue; +use crate::aggregates::group_values::HashValue; use crate::aggregates::topk::heap::Comparable; use ahash::RandomState; use arrow::datatypes::i256; From 7862958b36c5455451e1f396a567c401c3f277ee Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 13 Nov 2024 22:15:26 +0800 Subject: [PATCH 2/4] sort out codes. --- .../src/aggregates/group_values/mod.rs | 15 +++++++-------- .../{multi_column => multi_group_by}/bytes.rs | 4 ++-- .../bytes_view.rs | 4 ++-- .../{multi_column => multi_group_by}/mod.rs | 9 +++++++-- .../{multi_column => multi_group_by}/primitive.rs | 4 ++-- .../group_values/{ => multi_group_by}/row.rs | 0 .../{single_column => single_group_by}/bytes.rs | 0 .../bytes_view.rs | 0 .../{single_column => single_group_by}/mod.rs | 2 ++ .../primitive.rs | 0 10 files changed, 22 insertions(+), 16 deletions(-) rename datafusion/physical-plan/src/aggregates/group_values/{multi_column => multi_group_by}/bytes.rs (99%) rename datafusion/physical-plan/src/aggregates/group_values/{multi_column => multi_group_by}/bytes_view.rs (99%) rename datafusion/physical-plan/src/aggregates/group_values/{multi_column => multi_group_by}/mod.rs (99%) rename datafusion/physical-plan/src/aggregates/group_values/{multi_column => multi_group_by}/primitive.rs (98%) rename datafusion/physical-plan/src/aggregates/group_values/{ => multi_group_by}/row.rs (100%) rename datafusion/physical-plan/src/aggregates/group_values/{single_column => single_group_by}/bytes.rs (100%) rename datafusion/physical-plan/src/aggregates/group_values/{single_column => single_group_by}/bytes_view.rs (100%) rename datafusion/physical-plan/src/aggregates/group_values/{single_column => single_group_by}/mod.rs (93%) rename datafusion/physical-plan/src/aggregates/group_values/{single_column => single_group_by}/primitive.rs (100%) diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index e361f0db9dd37..c04bf209f057d 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -24,17 +24,16 @@ use datafusion_common::Result; use datafusion_expr::EmitTo; -mod multi_column; -mod row; -mod single_column; +mod multi_group_by; + +mod single_group_by; use datafusion_physical_expr::binary_map::OutputType; -use multi_column::GroupValuesColumn; -use row::GroupValuesRows; +use multi_group_by::{row::GroupValuesRows, GroupValuesColumn}; -pub(crate) use single_column::primitive::HashValue; +pub(crate) use single_group_by::primitive::HashValue; use crate::aggregates::{ - group_values::single_column::{ + group_values::single_group_by::{ bytes::GroupValuesByes, bytes_view::GroupValuesBytesView, primitive::GroupValuesPrimitive, }, @@ -149,7 +148,7 @@ pub fn new_group_values( } } - if multi_column::supported_schema(schema.as_ref()) { + if multi_group_by::supported_schema(schema.as_ref()) { if matches!(group_ordering, GroupOrdering::None) { Ok(Box::new(GroupValuesColumn::::try_new(schema)?)) } else { diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_column/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs similarity index 99% rename from datafusion/physical-plan/src/aggregates/group_values/multi_column/bytes.rs rename to datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs index 820d28fc58e7b..35a79cbd91edb 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_column/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::aggregates::group_values::multi_column::{nulls_equal_to, GroupColumn}; +use crate::aggregates::group_values::multi_group_by::{nulls_equal_to, GroupColumn}; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use arrow::array::{AsArray, BufferBuilder, GenericBinaryArray, GenericStringArray}; use arrow::buffer::{OffsetBuffer, ScalarBuffer}; @@ -403,7 +403,7 @@ where mod tests { use std::sync::Arc; - use crate::aggregates::group_values::multi_column::bytes::ByteGroupValueBuilder; + use crate::aggregates::group_values::multi_group_by::bytes::ByteGroupValueBuilder; use arrow_array::{ArrayRef, StringArray}; use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; use datafusion_physical_expr::binary_map::OutputType; diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_column/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs similarity index 99% rename from datafusion/physical-plan/src/aggregates/group_values/multi_column/bytes_view.rs rename to datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs index 032b4d9e2a918..811790f4e5885 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_column/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::aggregates::group_values::multi_column::{nulls_equal_to, GroupColumn}; +use crate::aggregates::group_values::multi_group_by::{nulls_equal_to, GroupColumn}; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use arrow::array::{make_view, AsArray, ByteView}; use arrow::buffer::ScalarBuffer; @@ -544,7 +544,7 @@ impl GroupColumn for ByteViewGroupValueBuilder { mod tests { use std::sync::Arc; - use crate::aggregates::group_values::multi_column::bytes_view::ByteViewGroupValueBuilder; + use crate::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder; use arrow::array::AsArray; use arrow::datatypes::StringViewType; use arrow_array::{ArrayRef, StringViewArray}; diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_column/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs similarity index 99% rename from datafusion/physical-plan/src/aggregates/group_values/multi_column/mod.rs rename to datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 191292c549f41..618f9a283d76b 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_column/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -15,13 +15,16 @@ // specific language governing permissions and limitations // under the License. +//! `GroupValues` implementations for multi group by cases + mod bytes; mod bytes_view; mod primitive; +pub mod row; use std::mem::{self, size_of}; -use crate::aggregates::group_values::multi_column::{ +use crate::aggregates::group_values::multi_group_by::{ bytes::ByteGroupValueBuilder, bytes_view::ByteViewGroupValueBuilder, primitive::PrimitiveGroupValueBuilder, }; @@ -1138,7 +1141,9 @@ mod tests { use datafusion_common::utils::proxy::RawTableAllocExt; use datafusion_expr::EmitTo; - use crate::aggregates::group_values::{multi_column::GroupValuesColumn, GroupValues}; + use crate::aggregates::group_values::{ + multi_group_by::GroupValuesColumn, GroupValues, + }; use super::GroupIndexView; diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_column/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs similarity index 98% rename from datafusion/physical-plan/src/aggregates/group_values/multi_column/primitive.rs rename to datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs index dff85ff7eb1a4..4da4822474589 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_column/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::aggregates::group_values::multi_column::{nulls_equal_to, GroupColumn}; +use crate::aggregates::group_values::multi_group_by::{nulls_equal_to, GroupColumn}; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use arrow::buffer::ScalarBuffer; use arrow_array::cast::AsArray; @@ -208,7 +208,7 @@ impl GroupColumn mod tests { use std::sync::Arc; - use crate::aggregates::group_values::multi_column::primitive::PrimitiveGroupValueBuilder; + use crate::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder; use arrow::datatypes::Int64Type; use arrow_array::{ArrayRef, Int64Array}; use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; diff --git a/datafusion/physical-plan/src/aggregates/group_values/row.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/row.rs similarity index 100% rename from datafusion/physical-plan/src/aggregates/group_values/row.rs rename to datafusion/physical-plan/src/aggregates/group_values/multi_group_by/row.rs diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_column/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs similarity index 100% rename from datafusion/physical-plan/src/aggregates/group_values/single_column/bytes.rs rename to datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_column/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs similarity index 100% rename from datafusion/physical-plan/src/aggregates/group_values/single_column/bytes_view.rs rename to datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_column/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs similarity index 93% rename from datafusion/physical-plan/src/aggregates/group_values/single_column/mod.rs rename to datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs index 30282bdea5422..417618ba66af4 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_column/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! `GroupValues` implementations for single group by cases + pub(crate) mod bytes; pub(crate) mod bytes_view; pub(crate) mod primitive; diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_column/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs similarity index 100% rename from datafusion/physical-plan/src/aggregates/group_values/single_column/primitive.rs rename to datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs From 23197101c47f897b945aebd43412ce2fa2ba65f1 Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 13 Nov 2024 22:36:18 +0800 Subject: [PATCH 3/4] move row to suitable place, and improve comments. --- .../src/aggregates/group_values/mod.rs | 18 +++++++++++++++++- .../group_values/multi_group_by/mod.rs | 1 - .../group_values/{multi_group_by => }/row.rs | 0 3 files changed, 17 insertions(+), 2 deletions(-) rename datafusion/physical-plan/src/aggregates/group_values/{multi_group_by => }/row.rs (100%) diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index c04bf209f057d..2290342911161 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -26,9 +26,11 @@ use datafusion_expr::EmitTo; mod multi_group_by; +mod row; mod single_group_by; use datafusion_physical_expr::binary_map::OutputType; -use multi_group_by::{row::GroupValuesRows, GroupValuesColumn}; +use multi_group_by::GroupValuesColumn; +use row::GroupValuesRows; pub(crate) use single_group_by::primitive::HashValue; @@ -107,6 +109,20 @@ pub trait GroupValues: Send { } /// Return a specialized implementation of [`GroupValues`] for the given schema. +/// +/// [`GroupValues`] implementations choosing logic: +/// +/// - If group by single column, and type of this column has +/// the specific [`GroupValues`] implementation, such implementation +/// will be chosen. +/// +/// - If group by multiple columns, and all column types have the specific +/// [`GroupColumn`] implementations, [`GroupValuesColumn`] will be chosen. +/// +/// - Otherwise, the general implementation [`GroupValuesRows`] will be chosen. +/// +/// [`GroupColumn`]: crate::aggregates::group_values::multi_group_by::GroupColumn +/// pub fn new_group_values( schema: SchemaRef, group_ordering: &GroupOrdering, diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 618f9a283d76b..83b0f9d773693 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -20,7 +20,6 @@ mod bytes; mod bytes_view; mod primitive; -pub mod row; use std::mem::{self, size_of}; diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/row.rs b/datafusion/physical-plan/src/aggregates/group_values/row.rs similarity index 100% rename from datafusion/physical-plan/src/aggregates/group_values/multi_group_by/row.rs rename to datafusion/physical-plan/src/aggregates/group_values/row.rs From b5d2bef5e7bb0c59a7b80b4e5fe9622622b1c7aa Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 13 Nov 2024 23:02:07 +0800 Subject: [PATCH 4/4] fix doc. --- datafusion/physical-plan/src/aggregates/group_values/mod.rs | 6 +++--- datafusion/physical-plan/src/aggregates/mod.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index 2290342911161..a816203b68124 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -24,7 +24,7 @@ use datafusion_common::Result; use datafusion_expr::EmitTo; -mod multi_group_by; +pub(crate) mod multi_group_by; mod row; mod single_group_by; @@ -80,7 +80,7 @@ mod null_builder; /// Each distinct group in a hash aggregation is identified by a unique group id /// (usize) which is assigned by instances of this trait. Group ids are /// continuous without gaps, starting from 0. -pub trait GroupValues: Send { +pub(crate) trait GroupValues: Send { /// Calculates the group id for each input row of `cols`, assigning new /// group ids as necessary. /// @@ -123,7 +123,7 @@ pub trait GroupValues: Send { /// /// [`GroupColumn`]: crate::aggregates::group_values::multi_group_by::GroupColumn /// -pub fn new_group_values( +pub(crate) fn new_group_values( schema: SchemaRef, group_ordering: &GroupOrdering, ) -> Result> { diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 2220007fdd72c..260c3a1c48de6 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -52,7 +52,7 @@ use crate::execution_plan::CardinalityEffect; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use itertools::Itertools; -pub mod group_values; +pub(crate) mod group_values; mod no_grouping; pub mod order; mod row_hash;