feat: add support for _partition metadata column#2668
Conversation
|
@advancedxy fyi |
074f903 to
caa7fb0
Compare
| /// # Arguments | ||
| /// * `partition_specs` - Iterator over all partition specs in the table | ||
| /// * `schema` - The current table schema (needed to determine result types of transforms) | ||
| pub fn compute_unified_partition_type<'a>( |
There was a problem hiding this comment.
nit: it's a bit of odd that this function is added in metadata_column.rs. Do you think it's a good idea to create partitioning.rs and put this function into partitioning.rs?
There was a problem hiding this comment.
Created crates/iceberg/src/partitioning.rs, moved the function there, registered as pub mod partitioning in lib.rs
| let mut seen_field_ids = std::collections::HashSet::new(); | ||
| let mut struct_fields: Vec<NestedFieldRef> = Vec::new(); | ||
|
|
||
| for spec in partition_specs { |
There was a problem hiding this comment.
I notice some inconsistent with java's impl:
- unknown specs are rejected in java.
- specs are sorted by spec id first(in reverse order), which means newer partition spec's field name will be picked fist.
- V1 table's void transform field is also handled in java: the partition field that was dropped later.
There was a problem hiding this comment.
Good point!
Updated the implementation (partitioning.rs) with -
- Sort specs by spec_id descending (newer field names take precedence)
- Skips Transform::Void fields (dropped partition columns)
- Deduplicates by field_id
| let partition_column_constant = | ||
| if let Some(ref unified_partition_type) = self.unified_partition_type { | ||
| let partition_spec = self | ||
| .table_metadata | ||
| .partition_spec_by_id(self.partition_spec_id); | ||
| if let Some(spec) = partition_spec { | ||
| let constant = build_partition_column_constant( | ||
| unified_partition_type, | ||
| spec, | ||
| &self.manifest_entry.data_file.partition, | ||
| )?; | ||
| Some(Arc::new(constant)) | ||
| } else { | ||
| None | ||
| } | ||
| } else { | ||
| None | ||
| }; |
There was a problem hiding this comment.
I'm not sure this is a good idea to calculate the partition column in the plan/scan phase and it adds build_partition_column_constant dep from record_batch_transformer' mod into the scan side.
There was a problem hiding this comment.
Fair point. Removed build_partition_column_constant from scan/context.rs. The scan phase only passes unified_partition_type through to the task. The actual struct constant is computed lazily at read time in pipeline.rs.
| #[serde(serialize_with = "serialize_not_implemented")] | ||
| #[serde(deserialize_with = "deserialize_not_implemented")] | ||
| #[builder(default)] | ||
| pub partition_column_constant: Option<Arc<PartitionColumnConstant>>, |
There was a problem hiding this comment.
Like the comment in https://github.com/apache/iceberg-rust/pull/2668/changes#r3448373215, I think it's better to pass the unified partition type here rather than passing the actual unified partition value. It would be easier for comet to pooling the type rather than the actual value.
BTW, this would be unnecessary if we can rebuild/access the table/metadata when reading on the executor side. Java archives this by SerializableTable and with Spark's broadcast. We don't have similar thing on the rust yet.
There was a problem hiding this comment.
FileScanTask now carries unified_partition_type: Option<Arc<StructType>> instead of partition_column_constant: Option<Arc<PartitionColumnConstant>>. The reader computes the value from type + spec + partition data.
Also removed the table_metadata field from ManifestEntryContext and ManifestFileContext (it was only used for the old precomputation path)
| let mut ids: Vec<i32> = value.identifier_field_ids.into_iter().collect(); | ||
| ids.sort_unstable(); | ||
| Some(ids) | ||
| }, |
There was a problem hiding this comment.
the changes in this file seem unrelated? And I don't think the spec requires sorting identifier field ids.
There was a problem hiding this comment.
You're right, this got left over by accident. Removed.
| (DataType::LargeBinary, Some(PrimitiveLiteral::Binary(value))) => { | ||
| Arc::new(LargeBinaryArray::from_vec(vec![value; num_rows])) | ||
| } | ||
| (DataType::LargeBinary, None) => { | ||
| let vals: Vec<Option<&[u8]>> = vec![None; num_rows]; | ||
| Arc::new(LargeBinaryArray::from_opt_vec(vals)) | ||
| } | ||
| (DataType::FixedSizeBinary(len), Some(PrimitiveLiteral::Binary(value))) => { | ||
| let repeated: Vec<&[u8]> = vec![value.as_slice(); num_rows]; | ||
| Arc::new(FixedSizeBinaryArray::try_from_iter(repeated.into_iter()).map_err(|e| { | ||
| Error::new( | ||
| ErrorKind::DataInvalid, | ||
| format!("Failed to create FixedSizeBinary({len}) array: {e}"), | ||
| ) | ||
| })?) | ||
| } | ||
| (DataType::FixedSizeBinary(len), None) => { | ||
| let repeated: Vec<Option<&[u8]>> = vec![None; num_rows]; | ||
| Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(repeated.into_iter(), *len).map_err(|e| { | ||
| Error::new( | ||
| ErrorKind::DataInvalid, | ||
| format!("Failed to create null FixedSizeBinary({len}) array: {e}"), | ||
| ) | ||
| })?) | ||
| } |
There was a problem hiding this comment.
these seems not related?
Our internal integration also shows iceberg mapping Iceberg's binary to Arrow's LargeBinary though, which should also be updated.
There was a problem hiding this comment.
these seems not related?
This is needed, I think. When a table is partitioned by a Binary, UUID, or Time column, create_primitive_array_repeated must handle those Arrow types to produce the child arrays of the _partition struct.
Our internal integration also shows iceberg mapping Iceberg's binary to Arrow's LargeBinary though, which should also be updated.
the default type_to_arrow_type mapping for Iceberg's Binary type should produce LargeBinary (matching Java). However, that change affects all Binary column reads (not just _partition), so it should probably be a follow up.
The LargeBinary arm in create_primitive_array_repeated ensures we handle it correctly if/when that mapping change lands.
There was a problem hiding this comment.
This is needed, I think. When a table is partitioned by a Binary, UUID, or Time column, create_primitive_array_repeated must handle those Arrow types to produce the child arrays of the _partition struct.
thanks for the explanation.
the default type_to_arrow_type mapping for Iceberg's Binary type should produce LargeBinary (matching Java)
Actually, java maps Iceberg's Binary to Arrow's Binary at least in the java arrow reader, see https://github.com/apache/iceberg/blob/main/arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java#L134 and I think parquet-rs reads the binary column in parquet as binary rather than large binary as well.
However, that change affects all Binary column reads (not just _partition), so it should probably be a follow up.
Yes, it should be addressed in a follow-up.
The LargeBinary arm in create_primitive_array_repeated ensures we handle it correctly if/when that mapping change lands.
That totally makes sense.
|
|
||
| // A struct column where each child is a constant primitive value. | ||
| // Used for the _partition metadata column. | ||
| AddStructConstant { |
There was a problem hiding this comment.
hmmm. why this is not added to the constant fields? I think java's impl simply add partition's field constant map with a struct projection to mapping the task's partition data into the unified type?
There was a problem hiding this comment.
Java's approach uses ConstantVectorReader with a StructProjection to map partition data into the unified type. This works because Java's constant map can hold arbitrary values including struct projections. In iceberg-rust, the constant_fields: HashMap<i32, Datum> infrastructure only supports primitive Datum values — Datum has no struct variant. Adding struct support to Datum would be a significant separate effort (touching the spec, serde, and value layers).
The AddStructConstant column source achieves the same functional result through a different mechanism - it builds the struct array directly from the precomputed child values. The per-batch cost is the same (materialize N primitive arrays + wrap in StructArray). If Datum gains struct support in the future, we can consolidated it into the existing constant field infrastructure but for now, the separate variant keeps the change self-contained without requiring changes to the Datum type system.
There was a problem hiding this comment.
In iceberg-rust, the constant_fields: HashMap<i32, Datum> infrastructure only supports primitive Datum values — Datum has no struct variant. Adding struct support to Datum would be a significant separate effort (touching the spec, serde, and value layers).
I see the trade off now. I think it's better to choose your approach now to avoid touching too much internals.
However, I think iceberg-rust already has Struct literal in crates/iceberg/src/spec/values/literal.rs, we can use that to replace the child_values?
And BTW, taking a step forward, the AddStructConstant is essentially the same as
Add {
target_type: DataType,
value: Option<PrimitiveLiteral>,
},which should also be refactor to use Option<Literal> in value.
Maybe, we can create a follow-up issue now to refactor these operations. And we can go with your approach for now.
|
@parthchandra thanks for pinging me and working on this. I think I'm concerned that the unified partition value is carried in the file scan task, which seems a bit of odd. |
caa7fb0 to
33b690a
Compare
| schema: &Schema, | ||
| ) -> Result<StructType> { | ||
| let mut seen_field_ids = std::collections::HashSet::new(); | ||
| let mut struct_fields: Vec<NestedFieldRef> = Vec::new(); |
There was a problem hiding this comment.
nit: should unknown transform should be handled too?
|
@parthchandra I did another round and left some follow-up comments. I think it's in good shape overall. |
Which issue does this PR close?
What changes are included in this PR?
Implements the _partition metadata column for table scans. This is a struct column whose type is the union of all partition fields across all partition specs (handling partition evolution). Each row gets the
partition values for its data file.
Are these changes tested?
Because we do not have write support yet, I made the corresponding change to comet and then tested by adding tests in Comet which uses iceberg-java to write files and then iceberg-rust to read them back.
https://github.com/parthchandra/datafusion-comet/blob/iceberg-metadata-columns/spark/src/test/resources/sql-tests/iceberg/metadata_column_partition.sql