Skip to content

feat: add support for _partition metadata column#2668

Open
parthchandra wants to merge 5 commits into
apache:mainfrom
parthchandra:metadata-columns
Open

feat: add support for _partition metadata column#2668
parthchandra wants to merge 5 commits into
apache:mainfrom
parthchandra:metadata-columns

Conversation

@parthchandra

Copy link
Copy Markdown

Which issue does this PR close?

What changes are included in this PR?

Implements the _partition metadata column for table scans. This is a struct column whose type is the union of all partition fields across all partition specs (handling partition evolution). Each row gets the
partition values for its data file.

  • Adds compute_unified_partition_type() to compute the union of partition fields across all specs (equivalent to Java's Partitioning.partitionType())
  • Adds PartitionColumnConstant and build_partition_column_constant() for pre-computing the struct values per file
  • Adds ColumnSource::AddStructConstant variant to RecordBatchTransformer for materializing struct columns
  • Threads the unified partition type through scan planning and populates the constant in FileScanTask
  • Pipeline detects RESERVED_FIELD_ID_PARTITION in projected fields and injects the struct constant

Are these changes tested?

Because we do not have write support yet, I made the corresponding change to comet and then tested by adding tests in Comet which uses iceberg-java to write files and then iceberg-rust to read them back.
https://github.com/parthchandra/datafusion-comet/blob/iceberg-metadata-columns/spark/src/test/resources/sql-tests/iceberg/metadata_column_partition.sql

@parthchandra parthchandra marked this pull request as draft June 18, 2026 00:34
@parthchandra

Copy link
Copy Markdown
Author

@advancedxy fyi

@parthchandra parthchandra marked this pull request as ready for review June 18, 2026 01:46
Comment thread crates/iceberg/src/metadata_columns.rs Outdated
/// # Arguments
/// * `partition_specs` - Iterator over all partition specs in the table
/// * `schema` - The current table schema (needed to determine result types of transforms)
pub fn compute_unified_partition_type<'a>(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's a bit of odd that this function is added in metadata_column.rs. Do you think it's a good idea to create partitioning.rs and put this function into partitioning.rs?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created crates/iceberg/src/partitioning.rs, moved the function there, registered as pub mod partitioning in lib.rs

Comment thread crates/iceberg/src/metadata_columns.rs Outdated
let mut seen_field_ids = std::collections::HashSet::new();
let mut struct_fields: Vec<NestedFieldRef> = Vec::new();

for spec in partition_specs {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice some inconsistent with java's impl:

  1. unknown specs are rejected in java.
  2. specs are sorted by spec id first(in reverse order), which means newer partition spec's field name will be picked fist.
  3. V1 table's void transform field is also handled in java: the partition field that was dropped later.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!
Updated the implementation (partitioning.rs) with -

  • Sort specs by spec_id descending (newer field names take precedence)
  • Skips Transform::Void fields (dropped partition columns)
  • Deduplicates by field_id

Comment thread crates/iceberg/src/scan/context.rs Outdated
Comment on lines +129 to +146
let partition_column_constant =
if let Some(ref unified_partition_type) = self.unified_partition_type {
let partition_spec = self
.table_metadata
.partition_spec_by_id(self.partition_spec_id);
if let Some(spec) = partition_spec {
let constant = build_partition_column_constant(
unified_partition_type,
spec,
&self.manifest_entry.data_file.partition,
)?;
Some(Arc::new(constant))
} else {
None
}
} else {
None
};

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is a good idea to calculate the partition column in the plan/scan phase and it adds build_partition_column_constant dep from record_batch_transformer' mod into the scan side.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. Removed build_partition_column_constant from scan/context.rs. The scan phase only passes unified_partition_type through to the task. The actual struct constant is computed lazily at read time in pipeline.rs.

Comment thread crates/iceberg/src/scan/task.rs Outdated
#[serde(serialize_with = "serialize_not_implemented")]
#[serde(deserialize_with = "deserialize_not_implemented")]
#[builder(default)]
pub partition_column_constant: Option<Arc<PartitionColumnConstant>>,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the comment in https://github.com/apache/iceberg-rust/pull/2668/changes#r3448373215, I think it's better to pass the unified partition type here rather than passing the actual unified partition value. It would be easier for comet to pooling the type rather than the actual value.

BTW, this would be unnecessary if we can rebuild/access the table/metadata when reading on the executor side. Java archives this by SerializableTable and with Spark's broadcast. We don't have similar thing on the rust yet.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileScanTask now carries unified_partition_type: Option<Arc<StructType>> instead of partition_column_constant: Option<Arc<PartitionColumnConstant>>. The reader computes the value from type + spec + partition data.
Also removed the table_metadata field from ManifestEntryContext and ManifestFileContext (it was only used for the old precomputation path)

Comment on lines 118 to 121
let mut ids: Vec<i32> = value.identifier_field_ids.into_iter().collect();
ids.sort_unstable();
Some(ids)
},

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the changes in this file seem unrelated? And I don't think the spec requires sorting identifier field ids.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, this got left over by accident. Removed.

Comment on lines +912 to +936
(DataType::LargeBinary, Some(PrimitiveLiteral::Binary(value))) => {
Arc::new(LargeBinaryArray::from_vec(vec![value; num_rows]))
}
(DataType::LargeBinary, None) => {
let vals: Vec<Option<&[u8]>> = vec![None; num_rows];
Arc::new(LargeBinaryArray::from_opt_vec(vals))
}
(DataType::FixedSizeBinary(len), Some(PrimitiveLiteral::Binary(value))) => {
let repeated: Vec<&[u8]> = vec![value.as_slice(); num_rows];
Arc::new(FixedSizeBinaryArray::try_from_iter(repeated.into_iter()).map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
format!("Failed to create FixedSizeBinary({len}) array: {e}"),
)
})?)
}
(DataType::FixedSizeBinary(len), None) => {
let repeated: Vec<Option<&[u8]>> = vec![None; num_rows];
Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(repeated.into_iter(), *len).map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
format!("Failed to create null FixedSizeBinary({len}) array: {e}"),
)
})?)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these seems not related?

Our internal integration also shows iceberg mapping Iceberg's binary to Arrow's LargeBinary though, which should also be updated.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these seems not related?

This is needed, I think. When a table is partitioned by a Binary, UUID, or Time column, create_primitive_array_repeated must handle those Arrow types to produce the child arrays of the _partition struct.

Our internal integration also shows iceberg mapping Iceberg's binary to Arrow's LargeBinary though, which should also be updated.

the default type_to_arrow_type mapping for Iceberg's Binary type should produce LargeBinary (matching Java). However, that change affects all Binary column reads (not just _partition), so it should probably be a follow up.
The LargeBinary arm in create_primitive_array_repeated ensures we handle it correctly if/when that mapping change lands.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed, I think. When a table is partitioned by a Binary, UUID, or Time column, create_primitive_array_repeated must handle those Arrow types to produce the child arrays of the _partition struct.

thanks for the explanation.

the default type_to_arrow_type mapping for Iceberg's Binary type should produce LargeBinary (matching Java)

Actually, java maps Iceberg's Binary to Arrow's Binary at least in the java arrow reader, see https://github.com/apache/iceberg/blob/main/arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java#L134 and I think parquet-rs reads the binary column in parquet as binary rather than large binary as well.

However, that change affects all Binary column reads (not just _partition), so it should probably be a follow up.

Yes, it should be addressed in a follow-up.

The LargeBinary arm in create_primitive_array_repeated ensures we handle it correctly if/when that mapping change lands.

That totally makes sense.


// A struct column where each child is a constant primitive value.
// Used for the _partition metadata column.
AddStructConstant {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm. why this is not added to the constant fields? I think java's impl simply add partition's field constant map with a struct projection to mapping the task's partition data into the unified type?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java's approach uses ConstantVectorReader with a StructProjection to map partition data into the unified type. This works because Java's constant map can hold arbitrary values including struct projections. In iceberg-rust, the constant_fields: HashMap<i32, Datum> infrastructure only supports primitive Datum values — Datum has no struct variant. Adding struct support to Datum would be a significant separate effort (touching the spec, serde, and value layers).

The AddStructConstant column source achieves the same functional result through a different mechanism - it builds the struct array directly from the precomputed child values. The per-batch cost is the same (materialize N primitive arrays + wrap in StructArray). If Datum gains struct support in the future, we can consolidated it into the existing constant field infrastructure but for now, the separate variant keeps the change self-contained without requiring changes to the Datum type system.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In iceberg-rust, the constant_fields: HashMap<i32, Datum> infrastructure only supports primitive Datum values — Datum has no struct variant. Adding struct support to Datum would be a significant separate effort (touching the spec, serde, and value layers).

I see the trade off now. I think it's better to choose your approach now to avoid touching too much internals.

However, I think iceberg-rust already has Struct literal in crates/iceberg/src/spec/values/literal.rs, we can use that to replace the child_values?
And BTW, taking a step forward, the AddStructConstant is essentially the same as

    Add {
        target_type: DataType,
        value: Option<PrimitiveLiteral>,
    },

which should also be refactor to use Option<Literal> in value.

Maybe, we can create a follow-up issue now to refactor these operations. And we can go with your approach for now.

@advancedxy

Copy link
Copy Markdown

@parthchandra thanks for pinging me and working on this. I think I'm concerned that the unified partition value is carried in the file scan task, which seems a bit of odd.

schema: &Schema,
) -> Result<StructType> {
let mut seen_field_ids = std::collections::HashSet::new();
let mut struct_fields: Vec<NestedFieldRef> = Vec::new();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should unknown transform should be handled too?

@advancedxy

Copy link
Copy Markdown

@parthchandra I did another round and left some follow-up comments. I think it's in good shape overall.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants