diff --git a/cpp/src/generated/parquet_types.cpp b/cpp/src/generated/parquet_types.cpp index 0ee973f2a2d6..a0e780bb2473 100644 --- a/cpp/src/generated/parquet_types.cpp +++ b/cpp/src/generated/parquet_types.cpp @@ -2255,6 +2255,78 @@ void GeographyType::printTo(std::ostream& out) const { } + +FixedSizeListType::~FixedSizeListType() noexcept { +} + +FixedSizeListType::FixedSizeListType() noexcept + : type(static_cast(0)), + num_values(0) { +} + +void FixedSizeListType::__set_type(const Type::type val) { + this->type = val; +__isset.type = true; +} + +void FixedSizeListType::__set_num_values(const int32_t val) { + this->num_values = val; +__isset.num_values = true; +} +std::ostream& operator<<(std::ostream& out, const FixedSizeListType& obj) +{ + obj.printTo(out); + return out; +} + + +void swap(FixedSizeListType &a, FixedSizeListType &b) { + using ::std::swap; + swap(a.type, b.type); + swap(a.num_values, b.num_values); + swap(a.__isset, b.__isset); +} + +bool FixedSizeListType::operator==(const FixedSizeListType & rhs) const +{ + if (!(type == rhs.type)) + return false; + if (!(num_values == rhs.num_values)) + return false; + return true; +} + +FixedSizeListType::FixedSizeListType(const FixedSizeListType& other110) noexcept { + type = other110.type; + num_values = other110.num_values; + __isset = other110.__isset; +} +FixedSizeListType::FixedSizeListType(FixedSizeListType&& other111) noexcept { + type = other111.type; + num_values = other111.num_values; + __isset = other111.__isset; +} +FixedSizeListType& FixedSizeListType::operator=(const FixedSizeListType& other112) noexcept { + type = other112.type; + num_values = other112.num_values; + __isset = other112.__isset; + return *this; +} +FixedSizeListType& FixedSizeListType::operator=(FixedSizeListType&& other113) noexcept { + type = other113.type; + num_values = other113.num_values; + __isset = other113.__isset; + return *this; +} +void FixedSizeListType::printTo(std::ostream& out) const { + using ::apache::thrift::to_string; + out << "FixedSizeListType("; + out << "type=" << to_string(type); + out << ", " << "num_values=" << to_string(num_values); + out << ")"; +} + + LogicalType::~LogicalType() noexcept { } @@ -2345,6 +2417,11 @@ void LogicalType::__set_GEOGRAPHY(const GeographyType& val) { this->GEOGRAPHY = val; __isset.GEOGRAPHY = true; } + +void LogicalType::__set_FIXED_SIZE_LIST(const FixedSizeListType& val) { + this->FIXED_SIZE_LIST = val; +__isset.FIXED_SIZE_LIST = true; +} std::ostream& operator<<(std::ostream& out, const LogicalType& obj) { obj.printTo(out); @@ -2371,6 +2448,7 @@ void swap(LogicalType &a, LogicalType &b) { swap(a.VARIANT, b.VARIANT); swap(a.GEOMETRY, b.GEOMETRY); swap(a.GEOGRAPHY, b.GEOGRAPHY); + swap(a.FIXED_SIZE_LIST, b.FIXED_SIZE_LIST); swap(a.__isset, b.__isset); } @@ -2444,6 +2522,10 @@ bool LogicalType::operator==(const LogicalType & rhs) const return false; else if (__isset.GEOGRAPHY && !(GEOGRAPHY == rhs.GEOGRAPHY)) return false; + if (__isset.FIXED_SIZE_LIST != rhs.__isset.FIXED_SIZE_LIST) + return false; + else if (__isset.FIXED_SIZE_LIST && !(FIXED_SIZE_LIST == rhs.FIXED_SIZE_LIST)) + return false; return true; } @@ -2465,6 +2547,7 @@ LogicalType::LogicalType(const LogicalType& other119) { VARIANT = other119.VARIANT; GEOMETRY = other119.GEOMETRY; GEOGRAPHY = other119.GEOGRAPHY; + FIXED_SIZE_LIST = other119.FIXED_SIZE_LIST; __isset = other119.__isset; } LogicalType::LogicalType(LogicalType&& other120) noexcept { @@ -2485,6 +2568,7 @@ LogicalType::LogicalType(LogicalType&& other120) noexcept { VARIANT = std::move(other120.VARIANT); GEOMETRY = std::move(other120.GEOMETRY); GEOGRAPHY = std::move(other120.GEOGRAPHY); + FIXED_SIZE_LIST = std::move(other120.FIXED_SIZE_LIST); __isset = other120.__isset; } LogicalType& LogicalType::operator=(const LogicalType& other121) { @@ -2505,6 +2589,7 @@ LogicalType& LogicalType::operator=(const LogicalType& other121) { VARIANT = other121.VARIANT; GEOMETRY = other121.GEOMETRY; GEOGRAPHY = other121.GEOGRAPHY; + FIXED_SIZE_LIST = other121.FIXED_SIZE_LIST; __isset = other121.__isset; return *this; } @@ -2526,6 +2611,7 @@ LogicalType& LogicalType::operator=(LogicalType&& other122) noexcept { VARIANT = std::move(other122.VARIANT); GEOMETRY = std::move(other122.GEOMETRY); GEOGRAPHY = std::move(other122.GEOGRAPHY); + FIXED_SIZE_LIST = std::move(other122.FIXED_SIZE_LIST); __isset = other122.__isset; return *this; } @@ -2549,6 +2635,7 @@ void LogicalType::printTo(std::ostream& out) const { out << ", " << "VARIANT="; (__isset.VARIANT ? (out << to_string(VARIANT)) : (out << "")); out << ", " << "GEOMETRY="; (__isset.GEOMETRY ? (out << to_string(GEOMETRY)) : (out << "")); out << ", " << "GEOGRAPHY="; (__isset.GEOGRAPHY ? (out << to_string(GEOGRAPHY)) : (out << "")); + out << ", " << "FIXED_SIZE_LIST="; (__isset.FIXED_SIZE_LIST ? (out << to_string(FIXED_SIZE_LIST)) : (out << "")); out << ")"; } diff --git a/cpp/src/generated/parquet_types.h b/cpp/src/generated/parquet_types.h index 1f1e254f5cf2..5b7ff69ad553 100644 --- a/cpp/src/generated/parquet_types.h +++ b/cpp/src/generated/parquet_types.h @@ -1620,8 +1620,59 @@ void swap(GeographyType &a, GeographyType &b); std::ostream& operator<<(std::ostream& out, const GeographyType& obj); +typedef struct _FixedSizeListType__isset { + _FixedSizeListType__isset() : type(false), num_values(false) {} + bool type :1; + bool num_values :1; +} _FixedSizeListType__isset; + +/** + * Fixed-size list logical type annotation for FIXED_LEN_BYTE_ARRAY columns. + */ +class FixedSizeListType { + public: + + FixedSizeListType(const FixedSizeListType&) noexcept; + FixedSizeListType(FixedSizeListType&&) noexcept; + FixedSizeListType& operator=(const FixedSizeListType&) noexcept; + FixedSizeListType& operator=(FixedSizeListType&&) noexcept; + FixedSizeListType() noexcept; + + virtual ~FixedSizeListType() noexcept; + /** + * + * @see Type + */ + Type::type type; + int32_t num_values; + + _FixedSizeListType__isset __isset; + + void __set_type(const Type::type val); + + void __set_num_values(const int32_t val); + + bool operator == (const FixedSizeListType & rhs) const; + bool operator != (const FixedSizeListType &rhs) const { + return !(*this == rhs); + } + + bool operator < (const FixedSizeListType & ) const; + + template + uint32_t read(Protocol_* iprot); + template + uint32_t write(Protocol_* oprot) const; + + virtual void printTo(std::ostream& out) const; +}; + +void swap(FixedSizeListType &a, FixedSizeListType &b); + +std::ostream& operator<<(std::ostream& out, const FixedSizeListType& obj); + typedef struct _LogicalType__isset { - _LogicalType__isset() : STRING(false), MAP(false), LIST(false), ENUM(false), DECIMAL(false), DATE(false), TIME(false), TIMESTAMP(false), INTEGER(false), UNKNOWN(false), JSON(false), BSON(false), UUID(false), FLOAT16(false), VARIANT(false), GEOMETRY(false), GEOGRAPHY(false) {} + _LogicalType__isset() : STRING(false), MAP(false), LIST(false), ENUM(false), DECIMAL(false), DATE(false), TIME(false), TIMESTAMP(false), INTEGER(false), UNKNOWN(false), JSON(false), BSON(false), UUID(false), FLOAT16(false), VARIANT(false), GEOMETRY(false), GEOGRAPHY(false), FIXED_SIZE_LIST(false) {} bool STRING :1; bool MAP :1; bool LIST :1; @@ -1639,6 +1690,7 @@ typedef struct _LogicalType__isset { bool VARIANT :1; bool GEOMETRY :1; bool GEOGRAPHY :1; + bool FIXED_SIZE_LIST :1; } _LogicalType__isset; /** @@ -1675,6 +1727,7 @@ class LogicalType { VariantType VARIANT; GeometryType GEOMETRY; GeographyType GEOGRAPHY; + FixedSizeListType FIXED_SIZE_LIST; _LogicalType__isset __isset; @@ -1712,6 +1765,8 @@ class LogicalType { void __set_GEOGRAPHY(const GeographyType& val); + void __set_FIXED_SIZE_LIST(const FixedSizeListType& val); + bool operator == (const LogicalType & rhs) const; bool operator != (const LogicalType &rhs) const { return !(*this == rhs); diff --git a/cpp/src/generated/parquet_types.tcc b/cpp/src/generated/parquet_types.tcc index 78e3e2549394..5222e24167d6 100644 --- a/cpp/src/generated/parquet_types.tcc +++ b/cpp/src/generated/parquet_types.tcc @@ -1625,6 +1625,86 @@ uint32_t GeographyType::write(Protocol_* oprot) const { return xfer; } + +template +uint32_t FixedSizeListType::read(Protocol_* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + bool isset_type = false; + bool isset_num_values = false; + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == ::apache::thrift::protocol::T_I32) { + int32_t ecast_fixed_size_list_type; + xfer += iprot->readI32(ecast_fixed_size_list_type); + this->type = static_cast(ecast_fixed_size_list_type); + this->__isset.type = true; + isset_type = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->num_values); + this->__isset.num_values = true; + isset_num_values = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + if (!isset_type) + throw TProtocolException(TProtocolException::INVALID_DATA); + if (!isset_num_values) + throw TProtocolException(TProtocolException::INVALID_DATA); + return xfer; +} + +template +uint32_t FixedSizeListType::write(Protocol_* oprot) const { + uint32_t xfer = 0; + ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("FixedSizeListType"); + + xfer += oprot->writeFieldBegin("type", ::apache::thrift::protocol::T_I32, 1); + xfer += oprot->writeI32(static_cast(this->type)); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("num_values", ::apache::thrift::protocol::T_I32, 2); + xfer += oprot->writeI32(this->num_values); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + template uint32_t LogicalType::read(Protocol_* iprot) { @@ -1783,6 +1863,14 @@ uint32_t LogicalType::read(Protocol_* iprot) { xfer += iprot->skip(ftype); } break; + case 19: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->FIXED_SIZE_LIST.read(iprot); + this->__isset.FIXED_SIZE_LIST = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -1886,6 +1974,11 @@ uint32_t LogicalType::write(Protocol_* oprot) const { xfer += this->GEOGRAPHY.write(oprot); xfer += oprot->writeFieldEnd(); } + if (this->__isset.FIXED_SIZE_LIST) { + xfer += oprot->writeFieldBegin("FIXED_SIZE_LIST", ::apache::thrift::protocol::T_STRUCT, 19); + xfer += this->FIXED_SIZE_LIST.write(oprot); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index a60af69aec9f..57824d8990dd 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -545,6 +545,62 @@ class LeafReader : public ColumnReaderImpl { }; // Column reader for extension arrays +class FixedSizeListFLBAReader : public ColumnReaderImpl { + public: + FixedSizeListFLBAReader(std::shared_ptr field, + std::unique_ptr storage_reader) + : field_(std::move(field)), storage_reader_(std::move(storage_reader)) {} + + Status GetDefLevels(const int16_t** data, int64_t* length) override { + return storage_reader_->GetDefLevels(data, length); + } + + Status GetRepLevels(const int16_t** data, int64_t* length) override { + return storage_reader_->GetRepLevels(data, length); + } + + Status LoadBatch(int64_t number_of_records) final { + return storage_reader_->LoadBatch(number_of_records); + } + + Status BuildArray(int64_t length_upper_bound, + std::shared_ptr* out) override { + std::shared_ptr storage; + RETURN_NOT_OK(storage_reader_->BuildArray(length_upper_bound, &storage)); + + const auto& list_type = + checked_cast(*field_->type()); + std::vector> chunks; + chunks.reserve(storage->num_chunks()); + for (const auto& storage_chunk : storage->chunks()) { + const auto& storage_data = *storage_chunk->data(); + if (storage_data.offset != 0) { + return Status::NotImplemented( + "Reading sliced FIXED_SIZE_LIST FIXED_LEN_BYTE_ARRAY chunks is not " + "implemented"); + } + const int64_t child_length = storage_data.length * list_type.list_size(); + auto child_data = ::arrow::ArrayData::Make( + list_type.value_type(), child_length, {nullptr, storage_data.buffers[1]}, + /*null_count=*/0); + auto data = ::arrow::ArrayData::Make( + field_->type(), storage_data.length, {storage_data.buffers[0]}, + {std::move(child_data)}, storage_data.null_count); + chunks.push_back(::arrow::MakeArray(std::move(data))); + } + *out = std::make_shared(std::move(chunks), field_->type()); + return Status::OK(); + } + + bool IsOrHasRepeatedChild() const final { return false; } + + const std::shared_ptr field() override { return field_; } + + private: + std::shared_ptr field_; + std::unique_ptr storage_reader_; +}; + class ExtensionReader : public ColumnReaderImpl { public: ExtensionReader(std::shared_ptr field, @@ -884,8 +940,20 @@ Status GetReader(const SchemaField& field, const std::shared_ptr& arrow_f } std::unique_ptr input( ctx->iterator_factory(field.column_index, ctx->reader)); - *out = std::make_unique(ctx, arrow_field, std::move(input), - field.level_info); + if (arrow_field->type()->id() == ::arrow::Type::FIXED_SIZE_LIST) { + const auto& list_type = + checked_cast(*arrow_field->type()); + const auto& value_type = + checked_cast(*list_type.value_type()); + auto storage_field = arrow_field->WithType( + ::arrow::fixed_size_binary(list_type.list_size() * value_type.bit_width() / 8)); + *out = std::make_unique( + arrow_field, std::make_unique(ctx, storage_field, std::move(input), + field.level_info)); + } else { + *out = std::make_unique(ctx, arrow_field, std::move(input), + field.level_info); + } } else if (type_id == ::arrow::Type::LIST || type_id == ::arrow::Type::MAP || type_id == ::arrow::Type::FIXED_SIZE_LIST || type_id == ::arrow::Type::LARGE_LIST) { diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 9c4c462c6b8c..ef624b50fa74 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -78,6 +78,24 @@ Repetition::type RepetitionFromNullable(bool is_nullable) { return is_nullable ? Repetition::OPTIONAL : Repetition::REQUIRED; } +Result> FixedSizeListElementStorage( + const ::arrow::DataType& type) { + switch (type.id()) { + case ArrowTypeId::INT32: + return std::make_pair(ParquetType::INT32, 4); + case ArrowTypeId::INT64: + return std::make_pair(ParquetType::INT64, 8); + case ArrowTypeId::FLOAT: + return std::make_pair(ParquetType::FLOAT, 4); + case ArrowTypeId::DOUBLE: + return std::make_pair(ParquetType::DOUBLE, 8); + default: + return Status::NotImplemented( + "FIXED_SIZE_LIST as FIXED_LEN_BYTE_ARRAY is only implemented for int32, " + "int64, float, and double elements"); + } +} + Result> MakeArrowList( std::shared_ptr field, const ArrowReaderProperties& props) { switch (props.list_type()) { @@ -453,7 +471,23 @@ Status FieldToNode(const std::string& name, const std::shared_ptr& field, return StructToNode(struct_type, name, field->nullable(), field_id, properties, arrow_properties, out); } - case ArrowTypeId::FIXED_SIZE_LIST: + case ArrowTypeId::FIXED_SIZE_LIST: { + auto fixed_size_list_type = + std::static_pointer_cast<::arrow::FixedSizeListType>(field->type()); + if (arrow_properties.write_fixed_size_list_as_fixed_len_byte_array() && + !fixed_size_list_type->value_field()->nullable()) { + ARROW_ASSIGN_OR_RAISE( + auto element_storage, + FixedSizeListElementStorage(*fixed_size_list_type->value_type())); + type = ParquetType::FIXED_LEN_BYTE_ARRAY; + length = element_storage.second * fixed_size_list_type->list_size(); + logical_type = LogicalType::FixedSizeList(element_storage.first, + fixed_size_list_type->list_size()); + break; + } + return ListToNode(fixed_size_list_type, name, field->nullable(), field_id, + properties, arrow_properties, out); + } case ArrowTypeId::LARGE_LIST: case ArrowTypeId::LIST: { auto list_type = std::static_pointer_cast<::arrow::BaseListType>(field->type()); @@ -1040,8 +1074,19 @@ Result ApplyOriginalStorageMetadata(const Field& origin_field, const int num_children = inferred_type->num_fields(); - if (num_children > 0 && origin_type->num_fields() == num_children) { - DCHECK_EQ(static_cast(inferred->children.size()), num_children); + if (origin_type->id() == ::arrow::Type::FIXED_SIZE_LIST && + inferred_type->id() == ::arrow::Type::FIXED_SIZE_LIST && + inferred->children.empty()) { + // Option A FIXED_SIZE_LIST columns are primitive Parquet leaves annotated as + // FIXED_LEN_BYTE_ARRAY, so the inferred Arrow type has a child field but the + // SchemaField tree has no child SchemaField. Restore the stored Arrow type + // directly instead of recursing into absent children. + inferred->field = inferred->field->WithType(origin_type); + modified = true; + } + + if (num_children > 0 && origin_type->num_fields() == num_children && + static_cast(inferred->children.size()) == num_children) { const auto factory = GetNestedFactory(*origin_type, *inferred_type); if (factory) { // The type may be modified (e.g. LargeList) while the children stay the same diff --git a/cpp/src/parquet/arrow/schema_internal.cc b/cpp/src/parquet/arrow/schema_internal.cc index 2e8cf764b27f..eefd62d50db8 100644 --- a/cpp/src/parquet/arrow/schema_internal.cc +++ b/cpp/src/parquet/arrow/schema_internal.cc @@ -195,6 +195,30 @@ Result> FromFLBA( const LogicalType& logical_type, int32_t physical_length, const ArrowReaderProperties& reader_properties) { switch (logical_type.type()) { + case LogicalType::Type::FIXED_SIZE_LIST: { + const auto& fixed_size_list = + ::arrow::internal::checked_cast(logical_type); + std::shared_ptr value_type; + switch (fixed_size_list.element_type()) { + case ParquetType::INT32: + value_type = ::arrow::int32(); + break; + case ParquetType::INT64: + value_type = ::arrow::int64(); + break; + case ParquetType::FLOAT: + value_type = ::arrow::float32(); + break; + case ParquetType::DOUBLE: + value_type = ::arrow::float64(); + break; + default: + return Status::NotImplemented("Unsupported FIXED_SIZE_LIST element type"); + } + return ::arrow::fixed_size_list( + ::arrow::field("element", std::move(value_type), /*nullable=*/false), + fixed_size_list.num_values()); + } case LogicalType::Type::DECIMAL: return MakeArrowDecimal(logical_type, reader_properties.smallest_decimal_enabled()); case LogicalType::Type::FLOAT16: diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 4b2b06e5e097..a2f7acea6739 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -18,6 +18,7 @@ #include "parquet/arrow/writer.h" #include +#include #include #include #include @@ -56,6 +57,7 @@ using arrow::ExtensionArray; using arrow::ExtensionType; using arrow::Field; using arrow::FixedSizeBinaryArray; +using arrow::FixedSizeListArray; using arrow::ListArray; using arrow::MemoryPool; using arrow::NumericArray; @@ -107,6 +109,43 @@ bool HasNullableRoot(const SchemaManifest& schema_manifest, return nullable; } +Result> FixedSizeListToFixedSizeBinary( + const ChunkedArray& data, int64_t offset, int64_t size, MemoryPool* pool) { + auto sliced = data.Slice(offset, size); + std::vector> chunks; + chunks.reserve(sliced->num_chunks()); + + for (const auto& chunk : sliced->chunks()) { + const auto& list_array = checked_cast(*chunk); + const auto& list_type = + checked_cast(*chunk->type()); + const auto& value_type = + checked_cast(*list_type.value_type()); + const int32_t value_byte_width = value_type.bit_width() / 8; + const int32_t list_byte_width = list_type.list_size() * value_byte_width; + const int64_t data_size = chunk->length() * list_byte_width; + + auto values = list_array.values(); + if (values->null_count() != 0) { + return Status::Invalid( + "Cannot write FIXED_SIZE_LIST as FIXED_LEN_BYTE_ARRAY when elements are null"); + } + ARROW_ASSIGN_OR_RAISE(auto data_buffer, AllocateBuffer(data_size, pool)); + const auto& values_data = values->data()->buffers[1]; + const uint8_t* source = + values_data->data() + list_array.value_offset(0) * value_byte_width; + std::memcpy(data_buffer->mutable_data(), source, data_size); + + auto storage_type = ::arrow::fixed_size_binary(list_byte_width); + auto array_data = + ::arrow::ArrayData::Make(storage_type, chunk->length(), + {chunk->data()->buffers[0], std::move(data_buffer)}, + chunk->null_count(), /*offset=*/0); + chunks.push_back(::arrow::MakeArray(std::move(array_data))); + } + return std::make_shared(std::move(chunks)); +} + Status GetSchemaMetadata(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, const ArrowWriterProperties& properties, std::shared_ptr* out) { @@ -375,9 +414,19 @@ class FileWriterImpl : public FileWriter { if (row_group_writer_->buffered()) { return Status::Invalid("Cannot write column chunk into the buffered row group."); } + std::shared_ptr data_to_write = data; + const int column_index = row_group_writer_->current_column() + 1; + const auto* parquet_node = + writer_->schema()->Column(column_index)->schema_node().get(); + if (parquet_node->logical_type()->is_fixed_size_list()) { + ARROW_ASSIGN_OR_RAISE( + data_to_write, FixedSizeListToFixedSizeBinary( + *data, offset, size, column_write_context_.memory_pool)); + offset = 0; + } ARROW_ASSIGN_OR_RAISE( std::unique_ptr writer, - ArrowColumnWriterV2::Make(*data, offset, size, schema_manifest_, + ArrowColumnWriterV2::Make(*data_to_write, offset, size, schema_manifest_, row_group_writer_)); return writer->Write(&column_write_context_); } @@ -456,11 +505,21 @@ class FileWriterImpl : public FileWriter { int column_index_start = 0; for (int i = 0; i < batch.num_columns(); i++) { - ChunkedArray chunked_array{batch.column(i)}; - ARROW_ASSIGN_OR_RAISE( - std::unique_ptr writer, - ArrowColumnWriterV2::Make(chunked_array, offset, size, schema_manifest_, - row_group_writer_, column_index_start)); + std::shared_ptr chunked_array = + std::make_shared(batch.column(i)); + int64_t column_offset = offset; + const auto* parquet_node = + writer_->schema()->Column(column_index_start)->schema_node().get(); + if (parquet_node->logical_type()->is_fixed_size_list()) { + ARROW_ASSIGN_OR_RAISE(chunked_array, FixedSizeListToFixedSizeBinary( + *chunked_array, offset, size, + column_write_context_.memory_pool)); + column_offset = 0; + } + ARROW_ASSIGN_OR_RAISE(std::unique_ptr writer, + ArrowColumnWriterV2::Make( + *chunked_array, column_offset, size, schema_manifest_, + row_group_writer_, column_index_start)); column_index_start += writer->leaf_count(); if (arrow_properties_->use_threads()) { writers.emplace_back(std::move(writer)); diff --git a/cpp/src/parquet/parquet.thrift b/cpp/src/parquet/parquet.thrift index e3cc5adb9648..c67af974f319 100644 --- a/cpp/src/parquet/parquet.thrift +++ b/cpp/src/parquet/parquet.thrift @@ -462,6 +462,18 @@ struct GeographyType { 2: optional EdgeInterpolationAlgorithm algorithm; } +/** + * Fixed-size list logical type annotation for FIXED_LEN_BYTE_ARRAY columns. + * + * The annotated value stores num_values fixed-width primitive elements packed + * contiguously. The SchemaElement type_length must equal num_values multiplied + * by the byte width of the element physical type. + */ +struct FixedSizeListType { + 1: required Type type; + 2: required i32 num_values; +} + /** * LogicalType annotations to replace ConvertedType. * @@ -495,6 +507,7 @@ union LogicalType { 16: VariantType VARIANT // no compatible ConvertedType 17: GeometryType GEOMETRY // no compatible ConvertedType 18: GeographyType GEOGRAPHY // no compatible ConvertedType + 19: FixedSizeListType FIXED_SIZE_LIST // no compatible ConvertedType } /** diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 6634bac4f684..db7466f35d83 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -1298,6 +1298,7 @@ class PARQUET_EXPORT ArrowWriterProperties { truncated_timestamps_allowed_(false), store_schema_(false), compliant_nested_types_(true), + write_fixed_size_list_as_fixed_len_byte_array_(false), engine_version_(V2), use_threads_(kArrowDefaultUseThreads), executor_(NULLPTR), @@ -1367,6 +1368,19 @@ class PARQUET_EXPORT ArrowWriterProperties { return this; } + /// \brief EXPERIMENTAL: Write eligible Arrow FixedSizeList columns as + /// FIXED_LEN_BYTE_ARRAY annotated with the Parquet FIXED_SIZE_LIST logical type. + Builder* enable_fixed_size_list_as_fixed_len_byte_array() { + write_fixed_size_list_as_fixed_len_byte_array_ = true; + return this; + } + + /// \brief Disable writing Arrow FixedSizeList as annotated FIXED_LEN_BYTE_ARRAY. + Builder* disable_fixed_size_list_as_fixed_len_byte_array() { + write_fixed_size_list_as_fixed_len_byte_array_ = false; + return this; + } + /// Set the version of the Parquet writer engine. Builder* set_engine_version(EngineVersion version) { engine_version_ = version; @@ -1409,7 +1423,8 @@ class PARQUET_EXPORT ArrowWriterProperties { return std::shared_ptr(new ArrowWriterProperties( write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_, truncated_timestamps_allowed_, store_schema_, compliant_nested_types_, - engine_version_, use_threads_, executor_, write_time_adjusted_to_utc_)); + write_fixed_size_list_as_fixed_len_byte_array_, engine_version_, use_threads_, + executor_, write_time_adjusted_to_utc_)); } private: @@ -1421,6 +1436,7 @@ class PARQUET_EXPORT ArrowWriterProperties { bool store_schema_; bool compliant_nested_types_; + bool write_fixed_size_list_as_fixed_len_byte_array_; EngineVersion engine_version_; bool use_threads_; @@ -1447,6 +1463,10 @@ class PARQUET_EXPORT ArrowWriterProperties { /// "element". bool compliant_nested_types() const { return compliant_nested_types_; } + bool write_fixed_size_list_as_fixed_len_byte_array() const { + return write_fixed_size_list_as_fixed_len_byte_array_; + } + /// \brief The underlying engine version to use when writing Arrow data. /// /// V2 is currently the latest V1 is considered deprecated but left in @@ -1471,6 +1491,7 @@ class PARQUET_EXPORT ArrowWriterProperties { ::arrow::TimeUnit::type coerce_timestamps_unit, bool truncated_timestamps_allowed, bool store_schema, bool compliant_nested_types, + bool write_fixed_size_list_as_fixed_len_byte_array, EngineVersion engine_version, bool use_threads, ::arrow::internal::Executor* executor, bool write_time_adjusted_to_utc) @@ -1480,6 +1501,8 @@ class PARQUET_EXPORT ArrowWriterProperties { truncated_timestamps_allowed_(truncated_timestamps_allowed), store_schema_(store_schema), compliant_nested_types_(compliant_nested_types), + write_fixed_size_list_as_fixed_len_byte_array_( + write_fixed_size_list_as_fixed_len_byte_array), engine_version_(engine_version), use_threads_(use_threads), executor_(executor), @@ -1491,6 +1514,7 @@ class PARQUET_EXPORT ArrowWriterProperties { const bool truncated_timestamps_allowed_; const bool store_schema_; const bool compliant_nested_types_; + const bool write_fixed_size_list_as_fixed_len_byte_array_; const EngineVersion engine_version_; const bool use_threads_; ::arrow::internal::Executor* executor_; diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc index fb4eb92a7544..d225d798b1f2 100644 --- a/cpp/src/parquet/types.cc +++ b/cpp/src/parquet/types.cc @@ -602,6 +602,10 @@ std::shared_ptr LogicalType::FromThrift( } return VariantLogicalType::Make(spec_version); + } else if (type.__isset.FIXED_SIZE_LIST) { + return FixedSizeListLogicalType::Make( + static_cast(type.FIXED_SIZE_LIST.type), + type.FIXED_SIZE_LIST.num_values); } else { // Sentinel type for one we do not recognize return UndefinedLogicalType::Make(); @@ -673,6 +677,11 @@ std::shared_ptr LogicalType::Variant(int8_t spec_version) { return VariantLogicalType::Make(spec_version); } +std::shared_ptr LogicalType::FixedSizeList( + parquet::Type::type element_type, int32_t num_values) { + return FixedSizeListLogicalType::Make(element_type, num_values); +} + std::shared_ptr LogicalType::None() { return NoLogicalType::Make(); } /* @@ -758,6 +767,7 @@ class LogicalType::Impl { class Geometry; class Geography; class Variant; + class FixedSizeList; class No; class Undefined; @@ -839,6 +849,9 @@ bool LogicalType::is_geography() const { bool LogicalType::is_variant() const { return impl_->type() == LogicalType::Type::VARIANT; } +bool LogicalType::is_fixed_size_list() const { + return impl_->type() == LogicalType::Type::FIXED_SIZE_LIST; +} bool LogicalType::is_none() const { return impl_->type() == LogicalType::Type::NONE; } bool LogicalType::is_valid() const { return impl_->type() != LogicalType::Type::UNDEFINED; @@ -2016,6 +2029,113 @@ std::shared_ptr VariantLogicalType::Make(const int8_t spec_ve return logical_type; } +namespace { + +int32_t FixedSizeListElementByteWidth(parquet::Type::type element_type) { + switch (element_type) { + case Type::INT32: + case Type::FLOAT: + return 4; + case Type::INT64: + case Type::DOUBLE: + return 8; + case Type::INT96: + return 12; + default: + return 0; + } +} + +} // namespace + +class LogicalType::Impl::FixedSizeList final : public LogicalType::Impl::Incompatible, + public LogicalType::Impl::Applicable { + public: + friend class FixedSizeListLogicalType; + + bool is_applicable(parquet::Type::type primitive_type, + int32_t primitive_length = -1) const override; + std::string ToString() const override; + std::string ToJSON() const override; + format::LogicalType ToThrift() const override; + bool Equals(const LogicalType& other) const override; + + parquet::Type::type element_type() const { return element_type_; } + int32_t num_values() const { return num_values_; } + + private: + FixedSizeList(parquet::Type::type element_type, int32_t num_values) + : LogicalType::Impl(LogicalType::Type::FIXED_SIZE_LIST, SortOrder::UNKNOWN), + element_type_(element_type), + num_values_(num_values) {} + + parquet::Type::type element_type_; + int32_t num_values_; +}; + +bool LogicalType::Impl::FixedSizeList::is_applicable(parquet::Type::type primitive_type, + int32_t primitive_length) const { + const int32_t element_width = FixedSizeListElementByteWidth(element_type_); + return primitive_type == parquet::Type::FIXED_LEN_BYTE_ARRAY && element_width > 0 && + primitive_length == num_values_ * element_width; +} + +parquet::Type::type FixedSizeListLogicalType::element_type() const { + return (dynamic_cast(*impl_)).element_type(); +} + +int32_t FixedSizeListLogicalType::num_values() const { + return (dynamic_cast(*impl_)).num_values(); +} + +std::string LogicalType::Impl::FixedSizeList::ToString() const { + std::stringstream type; + type << "FixedSizeList(element_type=" << TypeToString(element_type_) + << ", num_values=" << num_values_ << ")"; + return type.str(); +} + +std::string LogicalType::Impl::FixedSizeList::ToJSON() const { + std::stringstream json; + json << R"({"Type": "FixedSizeList", "element_type": ")" << TypeToString(element_type_) + << R"(", "num_values": )" << num_values_ << "}"; + return json.str(); +} + +format::LogicalType LogicalType::Impl::FixedSizeList::ToThrift() const { + format::LogicalType type; + format::FixedSizeListType fixed_size_list_type; + fixed_size_list_type.__set_type(static_cast(element_type_)); + fixed_size_list_type.__set_num_values(num_values_); + type.__set_FIXED_SIZE_LIST(fixed_size_list_type); + return type; +} + +bool LogicalType::Impl::FixedSizeList::Equals(const LogicalType& other) const { + if (other.type() != LogicalType::Type::FIXED_SIZE_LIST) { + return false; + } + const auto& fixed_size_list = dynamic_cast(other); + return fixed_size_list.element_type() == element_type_ && + fixed_size_list.num_values() == num_values_; +} + +std::shared_ptr FixedSizeListLogicalType::Make( + parquet::Type::type element_type, int32_t num_values) { + if (num_values <= 0) { + throw ParquetException("FixedSizeList logical type requires positive num_values"); + } + if (FixedSizeListElementByteWidth(element_type) == 0) { + throw ParquetException("Unsupported FixedSizeList element type: ", + TypeToString(element_type)); + } + auto logical_type = + std::shared_ptr(new FixedSizeListLogicalType()); + logical_type->impl_.reset( + new LogicalType::Impl::FixedSizeList(element_type, num_values)); + return logical_type; +} + class LogicalType::Impl::No final : public LogicalType::Impl::SimpleCompatible, public LogicalType::Impl::UniversalApplicable { public: diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index ad4df5119e75..781c252e64f8 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -162,6 +162,7 @@ class PARQUET_EXPORT LogicalType { GEOMETRY, GEOGRAPHY, VARIANT, + FIXED_SIZE_LIST, NONE // Not a real logical type; should always be last element }; }; @@ -230,6 +231,8 @@ class PARQUET_EXPORT LogicalType { static std::shared_ptr Float16(); static std::shared_ptr Variant( int8_t specVersion = kVariantSpecVersion); + static std::shared_ptr FixedSizeList( + parquet::Type::type element_type, int32_t num_values); static std::shared_ptr Geometry(std::string crs = ""); @@ -293,6 +296,7 @@ class PARQUET_EXPORT LogicalType { bool is_geometry() const; bool is_geography() const; bool is_variant() const; + bool is_fixed_size_list() const; bool is_none() const; /// \brief Return true if this logical type is of a known type. bool is_valid() const; @@ -509,6 +513,19 @@ class PARQUET_EXPORT VariantLogicalType : public LogicalType { VariantLogicalType() = default; }; +/// \brief Allowed for FIXED_LEN_BYTE_ARRAY primitive nodes only. +class PARQUET_EXPORT FixedSizeListLogicalType : public LogicalType { + public: + static std::shared_ptr Make(parquet::Type::type element_type, + int32_t num_values); + + parquet::Type::type element_type() const; + int32_t num_values() const; + + private: + FixedSizeListLogicalType() = default; +}; + /// \brief Allowed for any physical type. class PARQUET_EXPORT NoLogicalType : public LogicalType { public: diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index 534f7790923a..fda0bcba6f49 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -620,6 +620,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions): "coerce_timestamps", "allow_truncated_timestamps", "use_compliant_nested_type", + "write_fixed_size_list_as_fixed_len_byte_array", } setters = set() @@ -676,6 +677,11 @@ cdef class ParquetFileWriteOptions(FileWriteOptions): writer_engine_version="V2", use_compliant_nested_type=( self._properties["use_compliant_nested_type"] + ), + store_schema=True, + write_time_adjusted_to_utc=False, + write_fixed_size_list_as_fixed_len_byte_array=( + self._properties["write_fixed_size_list_as_fixed_len_byte_array"] ) ) @@ -705,6 +711,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions): coerce_timestamps=None, allow_truncated_timestamps=False, use_compliant_nested_type=True, + write_fixed_size_list_as_fixed_len_byte_array=False, encryption_properties=None, write_batch_size=None, dictionary_pagesize_limit=None, diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 36fc2ccf2f33..63d6d55e5acb 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -69,6 +69,7 @@ cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties( use_compliant_nested_type=*, store_schema=*, write_time_adjusted_to_utc=*, + write_fixed_size_list_as_fixed_len_byte_array=*, ) except * diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 2358a961ebd9..dc7ee2a38e6f 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -1464,6 +1464,7 @@ cdef logical_type_name_from_enum(ParquetLogicalTypeId type_): ParquetLogicalType_JSON: 'JSON', ParquetLogicalType_BSON: 'BSON', ParquetLogicalType_UUID: 'UUID', + ParquetLogicalType_FIXED_SIZE_LIST: 'FIXED_SIZE_LIST', ParquetLogicalType_NONE: 'NONE', }.get(type_, 'UNKNOWN') @@ -2281,7 +2282,8 @@ cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties( writer_engine_version=None, use_compliant_nested_type=True, store_schema=True, - write_time_adjusted_to_utc=False) except *: + write_time_adjusted_to_utc=False, + write_fixed_size_list_as_fixed_len_byte_array=False) except *: """Arrow writer properties""" cdef: shared_ptr[ArrowWriterProperties] arrow_properties @@ -2322,6 +2324,13 @@ cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties( else: arrow_props.disable_compliant_nested_types() + # write_fixed_size_list_as_fixed_len_byte_array + + if write_fixed_size_list_as_fixed_len_byte_array: + arrow_props.enable_fixed_size_list_as_fixed_len_byte_array() + else: + arrow_props.disable_fixed_size_list_as_fixed_len_byte_array() + # writer_engine_version if writer_engine_version == "V1": @@ -2396,6 +2405,7 @@ cdef class ParquetWriter(_Weakrefable): store_decimal_as_integer=False, use_content_defined_chunking=False, write_time_adjusted_to_utc=False, + write_fixed_size_list_as_fixed_len_byte_array=False, bloom_filter_options=None): cdef: shared_ptr[WriterProperties] properties @@ -2443,6 +2453,8 @@ cdef class ParquetWriter(_Weakrefable): use_compliant_nested_type=use_compliant_nested_type, store_schema=store_schema, write_time_adjusted_to_utc=write_time_adjusted_to_utc, + write_fixed_size_list_as_fixed_len_byte_array=( + write_fixed_size_list_as_fixed_len_byte_array), ) pool = maybe_unbox_memory_pool(memory_pool) diff --git a/python/pyarrow/includes/libparquet.pxd b/python/pyarrow/includes/libparquet.pxd index df353cc7805f..30c6ea23dfc3 100644 --- a/python/pyarrow/includes/libparquet.pxd +++ b/python/pyarrow/includes/libparquet.pxd @@ -69,6 +69,7 @@ cdef extern from "parquet/api/schema.h" namespace "parquet" nogil: ParquetLogicalType_UUID" parquet::LogicalType::Type::UUID" ParquetLogicalType_GEOMETRY" parquet::LogicalType::Type::GEOMETRY" ParquetLogicalType_GEOGRAPHY" parquet::LogicalType::Type::GEOGRAPHY" + ParquetLogicalType_FIXED_SIZE_LIST" parquet::LogicalType::Type::FIXED_SIZE_LIST" ParquetLogicalType_NONE" parquet::LogicalType::Type::NONE" enum ParquetTimeUnit" parquet::LogicalType::TimeUnit::unit": @@ -529,6 +530,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: Builder* store_schema() Builder* enable_compliant_nested_types() Builder* disable_compliant_nested_types() + Builder* enable_fixed_size_list_as_fixed_len_byte_array() + Builder* disable_fixed_size_list_as_fixed_len_byte_array() Builder* set_engine_version(ArrowWriterEngineVersion version) Builder* set_time_adjusted_to_utc(c_bool adjusted) shared_ptr[ArrowWriterProperties] build() diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index ff880fdcf52c..9a439b91f4f2 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -884,6 +884,10 @@ def _sanitize_table(table, new_schema, flavor): it will restore the timezone (Parquet only stores the UTC values without timezone), or columns with duration type will be restored from the int64 Parquet column. +write_fixed_size_list_as_fixed_len_byte_array : bool, default False + EXPERIMENTAL: If True, write eligible Arrow FixedSizeList columns as + Parquet FIXED_LEN_BYTE_ARRAY columns annotated with FIXED_SIZE_LIST. If + False, write them as regular Parquet LIST columns for compatibility. write_page_index : bool, default False Whether to write a page index in general for all columns. Writing statistics to the page index disables the old method of writing @@ -1077,6 +1081,7 @@ def __init__(self, where, schema, filesystem=None, write_batch_size=None, dictionary_pagesize_limit=None, store_schema=True, + write_fixed_size_list_as_fixed_len_byte_array=False, write_page_index=False, write_page_checksum=False, sorting_columns=None, @@ -1132,6 +1137,8 @@ def __init__(self, where, schema, filesystem=None, write_batch_size=write_batch_size, dictionary_pagesize_limit=dictionary_pagesize_limit, store_schema=store_schema, + write_fixed_size_list_as_fixed_len_byte_array=( + write_fixed_size_list_as_fixed_len_byte_array), write_page_index=write_page_index, write_page_checksum=write_page_checksum, sorting_columns=sorting_columns, @@ -2010,6 +2017,7 @@ def write_table(table, where, row_group_size=None, version='2.6', write_batch_size=None, dictionary_pagesize_limit=None, store_schema=True, + write_fixed_size_list_as_fixed_len_byte_array=False, write_page_index=False, write_page_checksum=False, sorting_columns=None, @@ -2044,6 +2052,8 @@ def write_table(table, where, row_group_size=None, version='2.6', write_batch_size=write_batch_size, dictionary_pagesize_limit=dictionary_pagesize_limit, store_schema=store_schema, + write_fixed_size_list_as_fixed_len_byte_array=( + write_fixed_size_list_as_fixed_len_byte_array), write_page_index=write_page_index, write_page_checksum=write_page_checksum, sorting_columns=sorting_columns,