diff --git a/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs b/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs index 67d4f2ee..7b30c52c 100644 --- a/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs +++ b/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs @@ -33,6 +33,10 @@ impl AvroRecordDecode for ManifestFileMeta { let mut num_deleted_files: Option = None; let mut partition_stats: Option = None; let mut schema_id: Option = None; + let mut min_bucket: Option = None; + let mut max_bucket: Option = None; + let mut min_level: Option = None; + let mut max_level: Option = None; let mut min_row_id: Option = None; let mut max_row_id: Option = None; @@ -52,6 +56,10 @@ impl AvroRecordDecode for ManifestFileMeta { decode_nullable_binary_table_stats(cursor, &field.schema, field.nullable)?; } "_SCHEMA_ID" => schema_id = Some(read_long_field(cursor, field.nullable)?), + "_MIN_BUCKET" => min_bucket = read_optional_int(cursor, field.nullable)?, + "_MAX_BUCKET" => max_bucket = read_optional_int(cursor, field.nullable)?, + "_MIN_LEVEL" => min_level = read_optional_int(cursor, field.nullable)?, + "_MAX_LEVEL" => max_level = read_optional_int(cursor, field.nullable)?, "_MIN_ROW_ID" => min_row_id = read_optional_long(cursor, field.nullable)?, "_MAX_ROW_ID" => max_row_id = read_optional_long(cursor, field.nullable)?, _ => skip_nullable_field(cursor, &field.schema, field.nullable)?, @@ -66,12 +74,26 @@ impl AvroRecordDecode for ManifestFileMeta { num_deleted_files.unwrap_or(0), partition_stats.unwrap_or_else(|| BinaryTableStats::new(vec![], vec![], vec![])), schema_id.unwrap_or(0), + min_bucket, + max_bucket, + min_level, + max_level, min_row_id, max_row_id, )) } } +fn read_optional_int(cursor: &mut AvroCursor, nullable: bool) -> crate::Result> { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(None); + } + } + Ok(Some(cursor.read_int()?)) +} + fn read_optional_long(cursor: &mut AvroCursor, nullable: bool) -> crate::Result> { if nullable { let idx = cursor.read_union_index()?; diff --git a/crates/paimon/src/spec/manifest_file_meta.rs b/crates/paimon/src/spec/manifest_file_meta.rs index f0dddb73..51bb81cf 100644 --- a/crates/paimon/src/spec/manifest_file_meta.rs +++ b/crates/paimon/src/spec/manifest_file_meta.rs @@ -51,6 +51,43 @@ pub struct ManifestFileMeta { #[serde(rename = "_SCHEMA_ID")] schema_id: i64, + /// minimum bucket covered by entries in this manifest, used by the Java reader to + /// prune manifests that do not overlap a requested bucket. Always `None` together + /// with `max_bucket` for back-compat manifests written before bucket statistics + /// were introduced (apache/paimon#5345). + #[serde( + rename = "_MIN_BUCKET", + default, + skip_serializing_if = "Option::is_none" + )] + min_bucket: Option, + + /// maximum bucket covered by entries in this manifest. See `min_bucket`. + #[serde( + rename = "_MAX_BUCKET", + default, + skip_serializing_if = "Option::is_none" + )] + max_bucket: Option, + + /// minimum LSM level covered by entries in this manifest, used by the Java reader + /// for level-based pruning (e.g. compaction's level filter). Same back-compat note + /// as `min_bucket`. + #[serde( + rename = "_MIN_LEVEL", + default, + skip_serializing_if = "Option::is_none" + )] + min_level: Option, + + /// maximum LSM level covered by entries in this manifest. See `min_level`. + #[serde( + rename = "_MAX_LEVEL", + default, + skip_serializing_if = "Option::is_none" + )] + max_level: Option, + /// minimum row id covered by this manifest, when row tracking is enabled. #[serde( rename = "_MIN_ROW_ID", @@ -110,6 +147,31 @@ impl ManifestFileMeta { self.version } + /// Get the minimum bucket covered by this manifest (None when bucket stats are absent, + /// e.g. manifests written before the field was introduced). + #[inline] + pub fn min_bucket(&self) -> Option { + self.min_bucket + } + + /// Get the maximum bucket covered by this manifest (None when bucket stats are absent). + #[inline] + pub fn max_bucket(&self) -> Option { + self.max_bucket + } + + /// Get the minimum LSM level covered by this manifest (None when level stats are absent). + #[inline] + pub fn min_level(&self) -> Option { + self.min_level + } + + /// Get the maximum LSM level covered by this manifest (None when level stats are absent). + #[inline] + pub fn max_level(&self) -> Option { + self.max_level + } + /// Get the minimum row id covered by this manifest (None when row tracking is disabled). #[inline] pub fn min_row_id(&self) -> Option { @@ -122,6 +184,27 @@ impl ManifestFileMeta { self.max_row_id } + /// Attach bucket / level statistics aggregated from manifest entries. + /// + /// Use this in writers that have access to the entries that the manifest covers. + /// Setting all four to `None` is equivalent to leaving the stats absent (the Java + /// reader treats `null` here as "no information; do not prune"). + #[inline] + #[must_use] + pub fn with_bucket_level_stats( + mut self, + min_bucket: Option, + max_bucket: Option, + min_level: Option, + max_level: Option, + ) -> Self { + self.min_bucket = min_bucket; + self.max_bucket = max_bucket; + self.min_level = min_level; + self.max_level = max_level; + self + } + #[inline] pub fn new( file_name: String, @@ -139,6 +222,10 @@ impl ManifestFileMeta { num_deleted_files, partition_stats, schema_id, + min_bucket: None, + max_bucket: None, + min_level: None, + max_level: None, min_row_id: None, max_row_id: None, } @@ -154,6 +241,10 @@ impl ManifestFileMeta { num_deleted_files: i64, partition_stats: BinaryTableStats, schema_id: i64, + min_bucket: Option, + max_bucket: Option, + min_level: Option, + max_level: Option, min_row_id: Option, max_row_id: Option, ) -> ManifestFileMeta { @@ -165,6 +256,10 @@ impl ManifestFileMeta { num_deleted_files, partition_stats, schema_id, + min_bucket, + max_bucket, + min_level, + max_level, min_row_id, max_row_id, } @@ -192,6 +287,10 @@ pub const MANIFEST_FILE_META_SCHEMA: &str = r#"["null", { ] }], "default": null}, {"name": "_SCHEMA_ID", "type": "long"}, + {"name": "_MIN_BUCKET", "type": ["null", "int"], "default": null}, + {"name": "_MAX_BUCKET", "type": ["null", "int"], "default": null}, + {"name": "_MIN_LEVEL", "type": ["null", "int"], "default": null}, + {"name": "_MAX_LEVEL", "type": ["null", "int"], "default": null}, {"name": "_MIN_ROW_ID", "type": ["null", "long"], "default": null}, {"name": "_MAX_ROW_ID", "type": ["null", "long"], "default": null} ] diff --git a/crates/paimon/src/spec/manifest_list.rs b/crates/paimon/src/spec/manifest_list.rs index 16eb0499..cf3985c8 100644 --- a/crates/paimon/src/spec/manifest_list.rs +++ b/crates/paimon/src/spec/manifest_list.rs @@ -112,4 +112,148 @@ mod tests { let decoded = ManifestList::read(&file_io, path).await.unwrap(); assert!(decoded.is_empty()); } + + /// Round-trip bucket / level statistics through Avro so future schema drift is caught + /// here, not in production. Matches the fields added in apache/paimon#5345. + #[tokio::test] + async fn test_manifest_list_roundtrip_preserves_bucket_level_stats() { + let file_io = test_file_io(); + let path = "memory:/test_manifest_list_bucket_level/manifest-list-0"; + file_io + .mkdirs("memory:/test_manifest_list_bucket_level/") + .await + .unwrap(); + + let value_bytes = vec![ + 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, + ]; + let original = vec![ManifestFileMeta::new( + "manifest-bucket-level".to_string(), + 4096, + 3, + 0, + BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![Some(0)]), + 0, + ) + .with_bucket_level_stats(Some(-1), Some(7), Some(0), Some(5))]; + + ManifestList::write(&file_io, path, &original) + .await + .unwrap(); + let decoded = ManifestList::read(&file_io, path).await.unwrap(); + assert_eq!(decoded.len(), 1); + assert_eq!(decoded[0].min_bucket(), Some(-1)); + assert_eq!(decoded[0].max_bucket(), Some(7)); + assert_eq!(decoded[0].min_level(), Some(0)); + assert_eq!(decoded[0].max_level(), Some(5)); + // Sanity: nothing else changed. + assert_eq!(decoded[0].file_name(), "manifest-bucket-level"); + assert_eq!(decoded[0].num_added_files(), 3); + } + + /// Back-compat: a manifest list written without the bucket / level fields (e.g. by an + /// older Rust writer or any Java writer pre apache/paimon#5345) must decode into + /// `None` rather than failing or yielding bogus values. + #[tokio::test] + async fn test_manifest_list_decodes_legacy_without_bucket_level_fields() { + use apache_avro::{Codec, Schema, Writer}; + use std::collections::HashMap; + + let file_io = test_file_io(); + let path = "memory:/test_manifest_list_legacy/manifest-list-0"; + file_io + .mkdirs("memory:/test_manifest_list_legacy/") + .await + .unwrap(); + + // Avro schema with the pre-5345 shape: no _MIN/_MAX_BUCKET/LEVEL fields. + let legacy_schema = r#"["null", { + "type": "record", + "name": "record", + "namespace": "org.apache.paimon.avro.generated", + "fields": [ + {"name": "_VERSION", "type": "int"}, + {"name": "_FILE_NAME", "type": "string"}, + {"name": "_FILE_SIZE", "type": "long"}, + {"name": "_NUM_ADDED_FILES", "type": "long"}, + {"name": "_NUM_DELETED_FILES", "type": "long"}, + {"name": "_PARTITION_STATS", "type": ["null", { + "type": "record", + "name": "record__PARTITION_STATS", + "fields": [ + {"name": "_MIN_VALUES", "type": "bytes"}, + {"name": "_MAX_VALUES", "type": "bytes"}, + {"name": "_NULL_COUNTS", "type": ["null", {"type": "array", "items": ["null", "long"]}], "default": null} + ] + }], "default": null} + ] + }]"#; + let schema = Schema::parse_str(legacy_schema).unwrap(); + let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null); + let value_bytes = vec![0u8; 12]; + let mut record: HashMap = HashMap::new(); + record.insert("_VERSION".to_string(), apache_avro::types::Value::Int(2)); + record.insert( + "_FILE_NAME".to_string(), + apache_avro::types::Value::String("manifest-legacy".to_string()), + ); + record.insert( + "_FILE_SIZE".to_string(), + apache_avro::types::Value::Long(1024), + ); + record.insert( + "_NUM_ADDED_FILES".to_string(), + apache_avro::types::Value::Long(2), + ); + record.insert( + "_NUM_DELETED_FILES".to_string(), + apache_avro::types::Value::Long(0), + ); + record.insert( + "_PARTITION_STATS".to_string(), + apache_avro::types::Value::Union( + 1, + Box::new(apache_avro::types::Value::Record(vec![ + ( + "_MIN_VALUES".to_string(), + apache_avro::types::Value::Bytes(value_bytes.clone()), + ), + ( + "_MAX_VALUES".to_string(), + apache_avro::types::Value::Bytes(value_bytes.clone()), + ), + ( + "_NULL_COUNTS".to_string(), + apache_avro::types::Value::Union( + 0, + Box::new(apache_avro::types::Value::Null), + ), + ), + ])), + ), + ); + let value = apache_avro::types::Value::Union( + 1, + Box::new(apache_avro::types::Value::Record( + record.into_iter().collect(), + )), + ); + let resolved = value.resolve(&schema).unwrap(); + writer.append(resolved).unwrap(); + let bytes = writer.into_inner().unwrap(); + file_io + .new_output(path) + .unwrap() + .write(bytes::Bytes::from(bytes)) + .await + .unwrap(); + + let decoded = ManifestList::read(&file_io, path).await.unwrap(); + assert_eq!(decoded.len(), 1); + assert_eq!(decoded[0].file_name(), "manifest-legacy"); + assert_eq!(decoded[0].min_bucket(), None); + assert_eq!(decoded[0].max_bucket(), None); + assert_eq!(decoded[0].min_level(), None); + assert_eq!(decoded[0].max_level(), None); + } } diff --git a/crates/paimon/src/spec/objects_file.rs b/crates/paimon/src/spec/objects_file.rs index 9056a2e3..81e43779 100644 --- a/crates/paimon/src/spec/objects_file.rs +++ b/crates/paimon/src/spec/objects_file.rs @@ -83,6 +83,10 @@ mod tests { 0, BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![Some(1)]), 0, + None, + None, + None, + None, Some(100), Some(199), )]; diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index 664b6542..274dd963 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -520,11 +520,24 @@ impl TableCommit { let mut added_file_count: i64 = 0; let mut deleted_file_count: i64 = 0; + // Bucket / level pruning stats; left as None when entries is empty so back-compat + // readers (Java < apache/paimon#5345 or older Rust writers) see the same shape + // they would for a pre-feature manifest. + let mut min_bucket: Option = None; + let mut max_bucket: Option = None; + let mut min_level: Option = None; + let mut max_level: Option = None; for entry in entries { match entry.kind() { FileKind::Add => added_file_count += 1, FileKind::Delete => deleted_file_count += 1, } + let b = entry.bucket(); + min_bucket = Some(min_bucket.map_or(b, |cur| cur.min(b))); + max_bucket = Some(max_bucket.map_or(b, |cur| cur.max(b))); + let l = entry.file().level; + min_level = Some(min_level.map_or(l, |cur| cur.min(l))); + max_level = Some(max_level.map_or(l, |cur| cur.max(l))); } // Get file size @@ -539,7 +552,8 @@ impl TableCommit { deleted_file_count, partition_stats, self.table.schema().id(), - )) + ) + .with_bucket_level_stats(min_bucket, max_bucket, min_level, max_level)) } /// Check if this commit was already completed (idempotency). @@ -1822,4 +1836,44 @@ mod tests { "Expected 'Delete conflict' error, got: {err_msg}" ); } + + /// `write_manifest_file` must aggregate min/max bucket and level across entries so the + /// Java reader can prune manifests by bucket / level (see apache/paimon#5345). This + /// drives a real commit so all the call-site plumbing is exercised end to end. + #[tokio::test] + async fn test_commit_writes_bucket_and_level_stats_into_manifest_list() { + let file_io = test_file_io(); + let table_path = "memory:/test_commit_bucket_level_stats"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit(&file_io, table_path); + + fn data_file_at_level(name: &str, level: i32) -> DataFileMeta { + let mut f = test_data_file(name, 1); + f.level = level; + f + } + + // Two commit messages on different buckets, each carrying a file at a different + // level. Expected aggregate: bucket [0, 3], level [0, 2]. + let messages = vec![ + CommitMessage::new(vec![], 0, vec![data_file_at_level("data-b0.parquet", 0)]), + CommitMessage::new(vec![], 3, vec![data_file_at_level("data-b3.parquet", 2)]), + ]; + commit.commit(messages).await.unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + let delta_path = format!("{table_path}/manifest/{}", snapshot.delta_manifest_list()); + let metas = ManifestList::read(&file_io, &delta_path).await.unwrap(); + assert_eq!( + metas.len(), + 1, + "expected a single manifest covering both entries" + ); + assert_eq!(metas[0].min_bucket(), Some(0)); + assert_eq!(metas[0].max_bucket(), Some(3)); + assert_eq!(metas[0].min_level(), Some(0)); + assert_eq!(metas[0].max_level(), Some(2)); + } }