Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions crates/paimon/src/spec/avro/manifest_file_meta_decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ impl AvroRecordDecode for ManifestFileMeta {
let mut num_deleted_files: Option<i64> = None;
let mut partition_stats: Option<BinaryTableStats> = None;
let mut schema_id: Option<i64> = None;
let mut min_bucket: Option<i32> = None;
let mut max_bucket: Option<i32> = None;
let mut min_level: Option<i32> = None;
let mut max_level: Option<i32> = None;
let mut min_row_id: Option<i64> = None;
let mut max_row_id: Option<i64> = None;

Expand All @@ -52,6 +56,10 @@ impl AvroRecordDecode for ManifestFileMeta {
decode_nullable_binary_table_stats(cursor, &field.schema, field.nullable)?;
}
"_SCHEMA_ID" => schema_id = Some(read_long_field(cursor, field.nullable)?),
"_MIN_BUCKET" => min_bucket = read_optional_int(cursor, field.nullable)?,
"_MAX_BUCKET" => max_bucket = read_optional_int(cursor, field.nullable)?,
"_MIN_LEVEL" => min_level = read_optional_int(cursor, field.nullable)?,
"_MAX_LEVEL" => max_level = read_optional_int(cursor, field.nullable)?,
"_MIN_ROW_ID" => min_row_id = read_optional_long(cursor, field.nullable)?,
"_MAX_ROW_ID" => max_row_id = read_optional_long(cursor, field.nullable)?,
_ => skip_nullable_field(cursor, &field.schema, field.nullable)?,
Expand All @@ -66,12 +74,26 @@ impl AvroRecordDecode for ManifestFileMeta {
num_deleted_files.unwrap_or(0),
partition_stats.unwrap_or_else(|| BinaryTableStats::new(vec![], vec![], vec![])),
schema_id.unwrap_or(0),
min_bucket,
max_bucket,
min_level,
max_level,
min_row_id,
max_row_id,
))
}
}

fn read_optional_int(cursor: &mut AvroCursor, nullable: bool) -> crate::Result<Option<i32>> {
if nullable {
let idx = cursor.read_union_index()?;
if idx == 0 {
return Ok(None);
}
}
Ok(Some(cursor.read_int()?))
}

fn read_optional_long(cursor: &mut AvroCursor, nullable: bool) -> crate::Result<Option<i64>> {
if nullable {
let idx = cursor.read_union_index()?;
Expand Down
99 changes: 99 additions & 0 deletions crates/paimon/src/spec/manifest_file_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,43 @@ pub struct ManifestFileMeta {
#[serde(rename = "_SCHEMA_ID")]
schema_id: i64,

/// minimum bucket covered by entries in this manifest, used by the Java reader to
/// prune manifests that do not overlap a requested bucket. Always `None` together
/// with `max_bucket` for back-compat manifests written before bucket statistics
/// were introduced (apache/paimon#5345).
#[serde(
rename = "_MIN_BUCKET",
default,
skip_serializing_if = "Option::is_none"
)]
min_bucket: Option<i32>,

/// maximum bucket covered by entries in this manifest. See `min_bucket`.
#[serde(
rename = "_MAX_BUCKET",
default,
skip_serializing_if = "Option::is_none"
)]
max_bucket: Option<i32>,

/// minimum LSM level covered by entries in this manifest, used by the Java reader
/// for level-based pruning (e.g. compaction's level filter). Same back-compat note
/// as `min_bucket`.
#[serde(
rename = "_MIN_LEVEL",
default,
skip_serializing_if = "Option::is_none"
)]
min_level: Option<i32>,

/// maximum LSM level covered by entries in this manifest. See `min_level`.
#[serde(
rename = "_MAX_LEVEL",
default,
skip_serializing_if = "Option::is_none"
)]
max_level: Option<i32>,

/// minimum row id covered by this manifest, when row tracking is enabled.
#[serde(
rename = "_MIN_ROW_ID",
Expand Down Expand Up @@ -110,6 +147,31 @@ impl ManifestFileMeta {
self.version
}

/// Get the minimum bucket covered by this manifest (None when bucket stats are absent,
/// e.g. manifests written before the field was introduced).
#[inline]
pub fn min_bucket(&self) -> Option<i32> {
self.min_bucket
}

/// Get the maximum bucket covered by this manifest (None when bucket stats are absent).
#[inline]
pub fn max_bucket(&self) -> Option<i32> {
self.max_bucket
}

/// Get the minimum LSM level covered by this manifest (None when level stats are absent).
#[inline]
pub fn min_level(&self) -> Option<i32> {
self.min_level
}

/// Get the maximum LSM level covered by this manifest (None when level stats are absent).
#[inline]
pub fn max_level(&self) -> Option<i32> {
self.max_level
}

/// Get the minimum row id covered by this manifest (None when row tracking is disabled).
#[inline]
pub fn min_row_id(&self) -> Option<i64> {
Expand All @@ -122,6 +184,27 @@ impl ManifestFileMeta {
self.max_row_id
}

/// Attach bucket / level statistics aggregated from manifest entries.
///
/// Use this in writers that have access to the entries that the manifest covers.
/// Setting all four to `None` is equivalent to leaving the stats absent (the Java
/// reader treats `null` here as "no information; do not prune").
#[inline]
#[must_use]
pub fn with_bucket_level_stats(
mut self,
min_bucket: Option<i32>,
max_bucket: Option<i32>,
min_level: Option<i32>,
max_level: Option<i32>,
) -> Self {
self.min_bucket = min_bucket;
self.max_bucket = max_bucket;
self.min_level = min_level;
self.max_level = max_level;
self
}

#[inline]
pub fn new(
file_name: String,
Expand All @@ -139,6 +222,10 @@ impl ManifestFileMeta {
num_deleted_files,
partition_stats,
schema_id,
min_bucket: None,
max_bucket: None,
min_level: None,
max_level: None,
min_row_id: None,
max_row_id: None,
}
Expand All @@ -154,6 +241,10 @@ impl ManifestFileMeta {
num_deleted_files: i64,
partition_stats: BinaryTableStats,
schema_id: i64,
min_bucket: Option<i32>,
max_bucket: Option<i32>,
min_level: Option<i32>,
max_level: Option<i32>,
min_row_id: Option<i64>,
max_row_id: Option<i64>,
) -> ManifestFileMeta {
Expand All @@ -165,6 +256,10 @@ impl ManifestFileMeta {
num_deleted_files,
partition_stats,
schema_id,
min_bucket,
max_bucket,
min_level,
max_level,
min_row_id,
max_row_id,
}
Expand Down Expand Up @@ -192,6 +287,10 @@ pub const MANIFEST_FILE_META_SCHEMA: &str = r#"["null", {
]
}], "default": null},
{"name": "_SCHEMA_ID", "type": "long"},
{"name": "_MIN_BUCKET", "type": ["null", "int"], "default": null},
{"name": "_MAX_BUCKET", "type": ["null", "int"], "default": null},
{"name": "_MIN_LEVEL", "type": ["null", "int"], "default": null},
{"name": "_MAX_LEVEL", "type": ["null", "int"], "default": null},
{"name": "_MIN_ROW_ID", "type": ["null", "long"], "default": null},
{"name": "_MAX_ROW_ID", "type": ["null", "long"], "default": null}
]
Expand Down
144 changes: 144 additions & 0 deletions crates/paimon/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,148 @@ mod tests {
let decoded = ManifestList::read(&file_io, path).await.unwrap();
assert!(decoded.is_empty());
}

/// Round-trip bucket / level statistics through Avro so future schema drift is caught
/// here, not in production. Matches the fields added in apache/paimon#5345.
#[tokio::test]
async fn test_manifest_list_roundtrip_preserves_bucket_level_stats() {
let file_io = test_file_io();
let path = "memory:/test_manifest_list_bucket_level/manifest-list-0";
file_io
.mkdirs("memory:/test_manifest_list_bucket_level/")
.await
.unwrap();

let value_bytes = vec![
0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129,
];
let original = vec![ManifestFileMeta::new(
"manifest-bucket-level".to_string(),
4096,
3,
0,
BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![Some(0)]),
0,
)
.with_bucket_level_stats(Some(-1), Some(7), Some(0), Some(5))];

ManifestList::write(&file_io, path, &original)
.await
.unwrap();
let decoded = ManifestList::read(&file_io, path).await.unwrap();
assert_eq!(decoded.len(), 1);
assert_eq!(decoded[0].min_bucket(), Some(-1));
assert_eq!(decoded[0].max_bucket(), Some(7));
assert_eq!(decoded[0].min_level(), Some(0));
assert_eq!(decoded[0].max_level(), Some(5));
// Sanity: nothing else changed.
assert_eq!(decoded[0].file_name(), "manifest-bucket-level");
assert_eq!(decoded[0].num_added_files(), 3);
}

/// Back-compat: a manifest list written without the bucket / level fields (e.g. by an
/// older Rust writer or any Java writer pre apache/paimon#5345) must decode into
/// `None` rather than failing or yielding bogus values.
#[tokio::test]
async fn test_manifest_list_decodes_legacy_without_bucket_level_fields() {
use apache_avro::{Codec, Schema, Writer};
use std::collections::HashMap;

let file_io = test_file_io();
let path = "memory:/test_manifest_list_legacy/manifest-list-0";
file_io
.mkdirs("memory:/test_manifest_list_legacy/")
.await
.unwrap();

// Avro schema with the pre-5345 shape: no _MIN/_MAX_BUCKET/LEVEL fields.
let legacy_schema = r#"["null", {
"type": "record",
"name": "record",
"namespace": "org.apache.paimon.avro.generated",
"fields": [
{"name": "_VERSION", "type": "int"},
{"name": "_FILE_NAME", "type": "string"},
{"name": "_FILE_SIZE", "type": "long"},
{"name": "_NUM_ADDED_FILES", "type": "long"},
{"name": "_NUM_DELETED_FILES", "type": "long"},
{"name": "_PARTITION_STATS", "type": ["null", {
"type": "record",
"name": "record__PARTITION_STATS",
"fields": [
{"name": "_MIN_VALUES", "type": "bytes"},
{"name": "_MAX_VALUES", "type": "bytes"},
{"name": "_NULL_COUNTS", "type": ["null", {"type": "array", "items": ["null", "long"]}], "default": null}
]
}], "default": null}
]
}]"#;
let schema = Schema::parse_str(legacy_schema).unwrap();
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null);
let value_bytes = vec![0u8; 12];
let mut record: HashMap<String, apache_avro::types::Value> = HashMap::new();
record.insert("_VERSION".to_string(), apache_avro::types::Value::Int(2));
record.insert(
"_FILE_NAME".to_string(),
apache_avro::types::Value::String("manifest-legacy".to_string()),
);
record.insert(
"_FILE_SIZE".to_string(),
apache_avro::types::Value::Long(1024),
);
record.insert(
"_NUM_ADDED_FILES".to_string(),
apache_avro::types::Value::Long(2),
);
record.insert(
"_NUM_DELETED_FILES".to_string(),
apache_avro::types::Value::Long(0),
);
record.insert(
"_PARTITION_STATS".to_string(),
apache_avro::types::Value::Union(
1,
Box::new(apache_avro::types::Value::Record(vec![
(
"_MIN_VALUES".to_string(),
apache_avro::types::Value::Bytes(value_bytes.clone()),
),
(
"_MAX_VALUES".to_string(),
apache_avro::types::Value::Bytes(value_bytes.clone()),
),
(
"_NULL_COUNTS".to_string(),
apache_avro::types::Value::Union(
0,
Box::new(apache_avro::types::Value::Null),
),
),
])),
),
);
let value = apache_avro::types::Value::Union(
1,
Box::new(apache_avro::types::Value::Record(
record.into_iter().collect(),
)),
);
let resolved = value.resolve(&schema).unwrap();
writer.append(resolved).unwrap();
let bytes = writer.into_inner().unwrap();
file_io
.new_output(path)
.unwrap()
.write(bytes::Bytes::from(bytes))
.await
.unwrap();

let decoded = ManifestList::read(&file_io, path).await.unwrap();
assert_eq!(decoded.len(), 1);
assert_eq!(decoded[0].file_name(), "manifest-legacy");
assert_eq!(decoded[0].min_bucket(), None);
assert_eq!(decoded[0].max_bucket(), None);
assert_eq!(decoded[0].min_level(), None);
assert_eq!(decoded[0].max_level(), None);
}
}
4 changes: 4 additions & 0 deletions crates/paimon/src/spec/objects_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ mod tests {
0,
BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![Some(1)]),
0,
None,
None,
None,
None,
Some(100),
Some(199),
)];
Expand Down
Loading
Loading