Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions crates/paimon/src/spec/avro/manifest_entry_decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ fn decode_data_file_meta(
row_count: row_count.unwrap_or(0),
min_key: min_key.unwrap_or_default(),
max_key: max_key.unwrap_or_default(),
key_stats: key_stats.unwrap_or_else(|| BinaryTableStats::new(vec![], vec![], vec![])),
value_stats: value_stats.unwrap_or_else(|| BinaryTableStats::new(vec![], vec![], vec![])),
key_stats: key_stats.unwrap_or_else(BinaryTableStats::empty),
value_stats: value_stats.unwrap_or_else(BinaryTableStats::empty),
min_sequence_number: min_sequence_number.unwrap_or(0),
max_sequence_number: max_sequence_number.unwrap_or(0),
schema_id: schema_id.unwrap_or(0),
Expand Down Expand Up @@ -391,8 +391,8 @@ fn default_data_file_meta() -> DataFileMeta {
row_count: 0,
min_key: vec![],
max_key: vec![],
key_stats: BinaryTableStats::new(vec![], vec![], vec![]),
value_stats: BinaryTableStats::new(vec![], vec![], vec![]),
key_stats: BinaryTableStats::empty(),
value_stats: BinaryTableStats::empty(),
min_sequence_number: 0,
max_sequence_number: 0,
schema_id: 0,
Expand Down
2 changes: 1 addition & 1 deletion crates/paimon/src/spec/avro/manifest_file_meta_decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl AvroRecordDecode for ManifestFileMeta {
file_size.unwrap_or(0),
num_added_files.unwrap_or(0),
num_deleted_files.unwrap_or(0),
partition_stats.unwrap_or_else(|| BinaryTableStats::new(vec![], vec![], vec![])),
partition_stats.unwrap_or_else(BinaryTableStats::empty),
schema_id.unwrap_or(0),
min_bucket,
max_bucket,
Expand Down
2 changes: 1 addition & 1 deletion crates/paimon/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ mod tests {
use crate::spec::ManifestEntry;

fn entry(kind: FileKind, file_name: &str, level: i32) -> ManifestEntry {
let stats = BinaryTableStats::new(vec![], vec![], vec![]);
let stats = BinaryTableStats::empty();
let file = DataFileMeta {
file_name: file_name.to_string(),
file_size: 100,
Expand Down
52 changes: 51 additions & 1 deletion crates/paimon/src/spec/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt::{Display, Formatter};

use super::{extract_datum_from_arrow, BinaryRowBuilder, DataType, Datum};
use super::{extract_datum_from_arrow, BinaryRowBuilder, DataType, Datum, EMPTY_SERIALIZED_ROW};
use arrow_array::RecordBatch;

/// Deserialize `_NULL_COUNTS` which in Avro is `["null", {"type":"array","items":["null","long"]}]`.
Expand Down Expand Up @@ -94,6 +94,21 @@ impl BinaryTableStats {
null_counts,
}
}

/// Stats with empty (arity=0) BinaryRow bytes for min/max and no null counts.
///
/// Use this whenever there are no columns to collect stats for (e.g. a non-partitioned
/// table's `partition_stats`, or a writer producing no key/value stats columns). Writing
/// `Vec::new()` here breaks the Java reader: `SerializationUtils.deserializeBinaryRow`
/// requires at least the 4-byte BE arity prefix and throws `BufferUnderflowException` on
/// zero-length input.
pub fn empty() -> BinaryTableStats {
Self {
min_values: EMPTY_SERIALIZED_ROW.clone(),
max_values: EMPTY_SERIALIZED_ROW.clone(),
null_counts: Vec::new(),
}
}
}

impl Display for BinaryTableStats {
Expand Down Expand Up @@ -154,3 +169,38 @@ pub fn compute_column_stats(
null_counts,
))
}

#[cfg(test)]
mod tests {
use super::*;
use crate::spec::BinaryRow;

/// Empty stats must produce min/max bytes that the Java side's
/// `SerializationUtils.deserializeBinaryRow` accepts: at minimum a 4-byte BE
/// arity prefix. A bare `Vec::new()` would trigger `BufferUnderflowException`
/// when Spark/Flink read manifests written for a non-partitioned table.
#[test]
fn empty_stats_carries_arity_prefix_parseable_by_reader() {
let stats = BinaryTableStats::empty();
assert!(
stats.min_values().len() >= 4,
"min_values must contain at least the 4-byte arity prefix"
);
assert!(
stats.max_values().len() >= 4,
"max_values must contain at least the 4-byte arity prefix"
);
assert!(
stats.null_counts().is_empty(),
"null_counts stays empty so the Java reader short-circuits to EMPTY_STATS"
);

// Round-trip through the same parser the Java reader uses (4-byte BE arity).
let min_row = BinaryRow::from_serialized_bytes(stats.min_values())
.expect("min_values must decode as a BinaryRow");
let max_row = BinaryRow::from_serialized_bytes(stats.max_values())
.expect("max_values must decode as a BinaryRow");
assert_eq!(min_row.arity(), 0);
assert_eq!(max_row.arity(), 0);
}
}
2 changes: 1 addition & 1 deletion crates/paimon/src/table/data_evolution_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ mod tests {
write_cols: Option<Vec<String>>,
) -> DataFileMeta {
use crate::spec::stats::BinaryTableStats;
let empty_stats = BinaryTableStats::new(vec![], vec![], vec![]);
let empty_stats = BinaryTableStats::empty();
DataFileMeta {
file_name: file_name.to_string(),
file_size: 0,
Expand Down
4 changes: 2 additions & 2 deletions crates/paimon/src/table/referenced_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1063,8 +1063,8 @@ mod tests {
row_count: 100,
min_key: vec![],
max_key: vec![],
key_stats: BinaryTableStats::new(vec![], vec![], vec![]),
value_stats: BinaryTableStats::new(vec![], vec![], vec![]),
key_stats: BinaryTableStats::empty(),
value_stats: BinaryTableStats::empty(),
min_sequence_number: 0,
max_sequence_number: 0,
schema_id: 0,
Expand Down
102 changes: 91 additions & 11 deletions crates/paimon/src/table/table_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use crate::io::FileIO;
use crate::spec::stats::BinaryTableStats;
use crate::spec::FileKind;
use crate::spec::{
datums_to_binary_row, extract_datum, BinaryRow, CommitKind, CoreOptions, DataType, Datum,
IndexManifest, IndexManifestEntry, Manifest, ManifestEntry, ManifestFileMeta, ManifestList,
PartitionStatistics, Snapshot,
datums_to_binary_row, extract_datum, BinaryRow, BinaryRowBuilder, CommitKind, CoreOptions,
DataType, Datum, IndexManifest, IndexManifestEntry, Manifest, ManifestEntry, ManifestFileMeta,
ManifestList, PartitionStatistics, Snapshot,
};
use crate::table::commit_message::CommitMessage;
use crate::table::partition_filter::PartitionFilter;
Expand Down Expand Up @@ -932,7 +932,7 @@ impl TableCommit {
let num_fields = partition_fields.len();

if num_fields == 0 || entries.is_empty() {
return Ok(BinaryTableStats::new(vec![], vec![], vec![]));
return Ok(BinaryTableStats::empty());
}

let data_types: Vec<_> = partition_fields
Expand Down Expand Up @@ -970,11 +970,8 @@ impl TableCommit {
}
}

let min_datums: Vec<_> = mins.iter().zip(data_types.iter()).collect();
let max_datums: Vec<_> = maxs.iter().zip(data_types.iter()).collect();

let min_bytes = datums_to_binary_row(&min_datums);
let max_bytes = datums_to_binary_row(&max_datums);
let min_bytes = build_partition_stats_row(&mins, &data_types);
let max_bytes = build_partition_stats_row(&maxs, &data_types);
let null_counts = null_counts.into_iter().map(Some).collect();

Ok(BinaryTableStats::new(min_bytes, max_bytes, null_counts))
Expand Down Expand Up @@ -1127,6 +1124,20 @@ impl TableCommit {
}
}

/// Serialized BinaryRow for partition stats; unlike `datums_to_binary_row`, returns a
/// valid arity-N row even when every datum is `None` (the all-null case must still
/// decode on the Java side).
fn build_partition_stats_row(datums: &[Option<Datum>], data_types: &[DataType]) -> Vec<u8> {
let mut builder = BinaryRowBuilder::new(datums.len() as i32);
for (pos, (datum_opt, data_type)) in datums.iter().zip(data_types.iter()).enumerate() {
match datum_opt {
Some(d) => builder.write_datum(pos, d, data_type),
None => builder.set_null_at(pos),
}
}
builder.build_serialized()
}

/// Plan for resolving commit entries.
enum CommitEntriesPlan {
/// Caller-provided entries. May contain `FileKind::Delete` entries from CoW
Expand Down Expand Up @@ -1234,8 +1245,8 @@ mod tests {
row_count,
min_key: vec![],
max_key: vec![],
key_stats: BinaryTableStats::new(vec![], vec![], vec![]),
value_stats: BinaryTableStats::new(vec![], vec![], vec![]),
key_stats: BinaryTableStats::empty(),
value_stats: BinaryTableStats::empty(),
min_sequence_number: 0,
max_sequence_number: 0,
schema_id: 0,
Expand Down Expand Up @@ -1837,6 +1848,75 @@ mod tests {
);
}

/// Regression: a non-partitioned table (e.g. `CREATE TABLE test_pk (... PRIMARY KEY ...)`)
/// must still emit `_PARTITION_STATS._MIN_VALUES`/`_MAX_VALUES` carrying the 4-byte BE
/// arity prefix; otherwise Java readers like Spark/Flink hit
/// `BufferUnderflowException` inside `SerializationUtils.deserializeBinaryRow`.
#[test]
fn compute_partition_stats_no_partition_fields_returns_decodable_empty() {
let file_io = test_file_io();
let commit = setup_commit(&file_io, "memory:/test_no_partition_stats");

let entry = ManifestEntry::new(
FileKind::Add,
vec![],
0,
1,
test_data_file("data-0.parquet", 1),
2,
);

let stats = commit.compute_partition_stats(&[entry]).unwrap();
BinaryRow::from_serialized_bytes(stats.min_values())
.expect("min_values must decode via the same protocol as Java's deserializeBinaryRow");
BinaryRow::from_serialized_bytes(stats.max_values())
.expect("max_values must decode via the same protocol as Java's deserializeBinaryRow");
assert!(stats.null_counts().is_empty());
}

/// Regression: when there are no entries at all, the empty stats we return must also
/// satisfy the protocol — same Java reader path runs on it.
#[test]
fn compute_partition_stats_empty_entries_returns_decodable_empty() {
let file_io = test_file_io();
let commit = setup_partitioned_commit(&file_io, "memory:/test_no_entries_stats");

let stats = commit.compute_partition_stats(&[]).unwrap();
BinaryRow::from_serialized_bytes(stats.min_values()).unwrap();
BinaryRow::from_serialized_bytes(stats.max_values()).unwrap();
assert!(stats.null_counts().is_empty());
}

/// Regression: partitioned table with an all-null partition row must still emit
/// decodable min/max bytes (otherwise Java hits `BufferUnderflowException`).
#[test]
fn compute_partition_stats_all_null_partition_values_returns_decodable_bytes() {
let file_io = test_file_io();
let commit = setup_partitioned_commit(&file_io, "memory:/test_all_null_partition_stats");

let mut builder = BinaryRowBuilder::new(1);
builder.set_null_at(0);
let null_partition = builder.build_serialized();

let entry = ManifestEntry::new(
FileKind::Add,
null_partition,
0,
1,
test_data_file("data-null-pt.parquet", 1),
2,
);

let stats = commit.compute_partition_stats(&[entry]).unwrap();
let min_row = BinaryRow::from_serialized_bytes(stats.min_values()).unwrap();
let max_row = BinaryRow::from_serialized_bytes(stats.max_values()).unwrap();
assert_eq!(min_row.arity(), 1);
assert_eq!(max_row.arity(), 1);
assert!(min_row.is_null_at(0));
assert!(max_row.is_null_at(0));
assert_eq!(stats.null_counts(), &vec![Some(1)]);
}

/// `write_manifest_file` must aggregate min/max bucket and level across entries so the
/// Java reader can prune manifests by bucket / level (see apache/paimon#5345). This
/// drives a real commit so all the call-site plumbing is exercised end to end.
Expand Down
Loading