From 01bb9e02bd03172af0f3c46135cd4fe0b507755d Mon Sep 17 00:00:00 2001 From: QuakeWang Date: Tue, 2 Jun 2026 15:45:15 +0800 Subject: [PATCH] test: add full types boundary consistency coverage --- crates/integration_tests/tests/read_tables.rs | 420 +++++++++++++++++- dev/spark/provision.py | 116 +++++ 2 files changed, 534 insertions(+), 2 deletions(-) diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index b89268c1..fdf213a3 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -18,8 +18,8 @@ //! Integration tests for reading Paimon tables provisioned by Spark. use arrow_array::{ - Array, ArrowPrimitiveType, Int32Array, Int64Array, ListArray, MapArray, RecordBatch, - StringArray, StructArray, + Array, ArrowPrimitiveType, Int32Array, Int64Array, ListArray, MapArray, PrimitiveArray, + RecordBatch, StringArray, StructArray, }; use futures::TryStreamExt; use paimon::api::ConfigResponse; @@ -2638,3 +2638,419 @@ async fn test_read_full_types_table() { assert_eq!(r.17, vec![("d".into(), 40), ("e".into(), 50)]); // map assert_eq!(r.18, ("carol".into(), 300)); // struct } + +#[tokio::test] +async fn test_read_full_types_boundary_table() { + use arrow_array::{ + BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, + Int16Array, Int64Array, Int8Array, ListArray, MapArray, StructArray, + TimestampMicrosecondArray, + }; + + #[derive(Debug, PartialEq)] + struct BoundaryRow { + id: i32, + col_boolean: Option, + col_tinyint: Option, + col_smallint: Option, + col_int: Option, + col_bigint: Option, + col_float: Option, + col_double: Option, + col_decimal: Option, + col_decimal5: Option, + col_decimal38: Option, + col_string: Option, + col_binary: Option>, + col_date: Option, + col_timestamp: Option, + col_timestamp_ltz: Option, + col_array: Option>>, + col_map: Option)>>, + col_struct: Option<(Option, Option)>, + } + + fn primitive_value( + array: &PrimitiveArray, + row: usize, + ) -> Option { + (!array.is_null(row)).then(|| array.value(row)) + } + + fn bool_value(array: &BooleanArray, row: usize) -> Option { + (!array.is_null(row)).then(|| array.value(row)) + } + + fn string_value(array: &StringArray, row: usize) -> Option { + (!array.is_null(row)).then(|| array.value(row).to_string()) + } + + fn binary_value(array: &BinaryArray, row: usize) -> Option> { + (!array.is_null(row)).then(|| array.value(row).to_vec()) + } + + fn list_i32_value(array: &ListArray, row: usize) -> Option>> { + if array.is_null(row) { + return None; + } + let values = array.value(row); + let values = values + .as_any() + .downcast_ref::() + .expect("list element as Int32Array"); + Some( + (0..values.len()) + .map(|i| (!values.is_null(i)).then(|| values.value(i))) + .collect(), + ) + } + + fn map_string_i32_value(array: &MapArray, row: usize) -> Option)>> { + if array.is_null(row) { + return None; + } + let entries = array.value(row); + let entries = entries + .as_any() + .downcast_ref::() + .expect("map entries as StructArray"); + let keys = entries + .column(0) + .as_any() + .downcast_ref::() + .expect("map keys"); + let values = entries + .column(1) + .as_any() + .downcast_ref::() + .expect("map values"); + let mut result: Vec<(String, Option)> = (0..keys.len()) + .map(|i| { + ( + keys.value(i).to_string(), + (!values.is_null(i)).then(|| values.value(i)), + ) + }) + .collect(); + result.sort_by(|left, right| left.0.cmp(&right.0)); + Some(result) + } + + fn struct_string_i32_value( + array: &StructArray, + row: usize, + ) -> Option<(Option, Option)> { + if array.is_null(row) { + return None; + } + let names = array + .column_by_name("name") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("struct name"); + let values = array + .column_by_name("value") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("struct value"); + Some(( + (!names.is_null(row)).then(|| names.value(row).to_string()), + (!values.is_null(row)).then(|| values.value(row)), + )) + } + + let (plan, batches) = scan_and_read_with_fs_catalog("full_types_boundary_table", None).await; + let formats: HashSet<&str> = plan + .splits() + .iter() + .flat_map(|split| split.data_files()) + .filter_map(|file| file.file_name.rsplit_once('.').map(|(_, ext)| ext)) + .collect(); + assert_eq!( + formats, + HashSet::from(["avro", "orc", "parquet"]), + "full_types_boundary_table should scan all provisioned file formats" + ); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total_rows, 6, + "full_types_boundary_table should have 6 rows" + ); + + let mut rows = Vec::new(); + for batch in &batches { + let id = batch + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_boolean = batch + .column_by_name("col_boolean") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_tinyint = batch + .column_by_name("col_tinyint") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_smallint = batch + .column_by_name("col_smallint") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_int = batch + .column_by_name("col_int") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_bigint = batch + .column_by_name("col_bigint") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_float = batch + .column_by_name("col_float") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_double = batch + .column_by_name("col_double") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_decimal = batch + .column_by_name("col_decimal") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_decimal5 = batch + .column_by_name("col_decimal5") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_decimal38 = batch + .column_by_name("col_decimal38") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_string = batch + .column_by_name("col_string") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_binary = batch + .column_by_name("col_binary") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_date = batch + .column_by_name("col_date") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_timestamp = batch + .column_by_name("col_timestamp") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_timestamp_ltz = batch + .column_by_name("col_timestamp_ltz") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_array = batch + .column_by_name("col_array") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_map = batch + .column_by_name("col_map") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let col_struct = batch + .column_by_name("col_struct") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + for i in 0..batch.num_rows() { + rows.push(BoundaryRow { + id: id.value(i), + col_boolean: bool_value(col_boolean, i), + col_tinyint: primitive_value(col_tinyint, i), + col_smallint: primitive_value(col_smallint, i), + col_int: primitive_value(col_int, i), + col_bigint: primitive_value(col_bigint, i), + col_float: primitive_value(col_float, i), + col_double: primitive_value(col_double, i), + col_decimal: primitive_value(col_decimal, i), + col_decimal5: primitive_value(col_decimal5, i), + col_decimal38: primitive_value(col_decimal38, i), + col_string: string_value(col_string, i), + col_binary: binary_value(col_binary, i), + col_date: primitive_value(col_date, i), + col_timestamp: primitive_value(col_timestamp, i), + col_timestamp_ltz: primitive_value(col_timestamp_ltz, i), + col_array: list_i32_value(col_array, i), + col_map: map_string_i32_value(col_map, i), + col_struct: struct_string_i32_value(col_struct, i), + }); + } + } + rows.sort_by_key(|row| row.id); + + assert_eq!( + rows, + vec![ + BoundaryRow { + id: 1, + col_boolean: Some(false), + col_tinyint: Some(i8::MIN), + col_smallint: Some(i16::MIN), + col_int: Some(i32::MIN), + col_bigint: Some(i64::MIN), + col_float: Some(-0.5), + col_double: Some(-1.25), + col_decimal: Some(-9_999_999_999), + col_decimal5: Some(-99999), + col_decimal38: Some(-99_999_999_999_999_999_999_999_999_999_999_999_999), + col_string: Some(String::new()), + col_binary: Some(Vec::new()), + col_date: Some(-1), + col_timestamp: Some(1), + col_timestamp_ltz: Some(1), + col_array: Some(vec![None, Some(i32::MIN), Some(0)]), + col_map: Some(vec![ + ("negative".into(), Some(i32::MIN)), + ("zero".into(), None) + ]), + col_struct: Some((None, Some(-1))), + }, + BoundaryRow { + id: 2, + col_boolean: None, + col_tinyint: None, + col_smallint: None, + col_int: None, + col_bigint: None, + col_float: None, + col_double: None, + col_decimal: None, + col_decimal5: None, + col_decimal38: None, + col_string: None, + col_binary: None, + col_date: None, + col_timestamp: None, + col_timestamp_ltz: None, + col_array: None, + col_map: None, + col_struct: None, + }, + BoundaryRow { + id: 3, + col_boolean: Some(true), + col_tinyint: Some(i8::MAX), + col_smallint: Some(i16::MAX), + col_int: Some(i32::MAX), + col_bigint: Some(i64::MAX), + col_float: Some(0.25), + col_double: Some(0.5), + col_decimal: Some(9_999_999_999), + col_decimal5: Some(99999), + col_decimal38: Some(99_999_999_999_999_999_999_999_999_999_999_999_999), + col_string: Some("orc-boundary".into()), + col_binary: Some(vec![0x00, 0xFF]), + col_date: Some(0), + col_timestamp: Some(0), + col_timestamp_ltz: Some(0), + col_array: Some(vec![]), + col_map: Some(vec![]), + col_struct: Some((Some("orc".into()), None)), + }, + BoundaryRow { + id: 4, + col_boolean: None, + col_tinyint: None, + col_smallint: None, + col_int: None, + col_bigint: None, + col_float: None, + col_double: None, + col_decimal: None, + col_decimal5: None, + col_decimal38: None, + col_string: None, + col_binary: None, + col_date: None, + col_timestamp: None, + col_timestamp_ltz: None, + col_array: None, + col_map: None, + col_struct: None, + }, + BoundaryRow { + id: 5, + col_boolean: Some(false), + col_tinyint: Some(0), + col_smallint: Some(0), + col_int: Some(0), + col_bigint: Some(0), + col_float: Some(0.0), + col_double: Some(0.0), + col_decimal: Some(0), + col_decimal5: Some(0), + col_decimal38: Some(0), + col_string: Some("avro-boundary".into()), + col_binary: Some(vec![0x01, 0x02]), + col_date: Some(1), + col_timestamp: Some(999_999), + col_timestamp_ltz: Some(999_999), + col_array: Some(vec![Some(7)]), + col_map: Some(vec![("seven".into(), Some(7))]), + col_struct: Some((Some("avro".into()), Some(7))), + }, + BoundaryRow { + id: 6, + col_boolean: None, + col_tinyint: None, + col_smallint: None, + col_int: None, + col_bigint: None, + col_float: None, + col_double: None, + col_decimal: None, + col_decimal5: None, + col_decimal38: None, + col_string: None, + col_binary: None, + col_date: None, + col_timestamp: None, + col_timestamp_ltz: None, + col_array: None, + col_map: None, + col_struct: None, + }, + ] + ); +} diff --git a/dev/spark/provision.py b/dev/spark/provision.py index 53bda45d..c7c408d3 100644 --- a/dev/spark/provision.py +++ b/dev/spark/provision.py @@ -822,6 +822,122 @@ def main(): """ ) + # ===== Full types boundary table: parquet, orc, avro ===== + # Each format writes one boundary row and one all-null row for nullable fields. + spark.sql( + """ + CREATE TABLE IF NOT EXISTS full_types_boundary_table ( + id INT, + col_boolean BOOLEAN, + col_tinyint TINYINT, + col_smallint SMALLINT, + col_int INT, + col_bigint BIGINT, + col_float FLOAT, + col_double DOUBLE, + col_decimal DECIMAL(10, 2), + col_decimal5 DECIMAL(5), + col_decimal38 DECIMAL(38, 18), + col_string STRING, + col_binary BINARY, + col_date DATE, + col_timestamp TIMESTAMP_NTZ, + col_timestamp_ltz TIMESTAMP, + col_array ARRAY, + col_map MAP, + col_struct STRUCT + ) USING paimon + TBLPROPERTIES ( + 'file.format' = 'parquet' + ) + """ + ) + spark.sql( + """ + INSERT INTO full_types_boundary_table VALUES + (1, false, CAST('-128' AS TINYINT), CAST('-32768' AS SMALLINT), + CAST('-2147483648' AS INT), CAST('-9223372036854775808' AS BIGINT), + CAST(-0.5 AS FLOAT), -1.25, CAST('-99999999.99' AS DECIMAL(10,2)), + CAST('-99999' AS DECIMAL(5)), + CAST('-99999999999999999999.999999999999999999' AS DECIMAL(38,18)), + '', X'', + DATE '1969-12-31', + TIMESTAMP_NTZ '1970-01-01 00:00:00.000001', + TIMESTAMP '1970-01-01 00:00:00.000001', + array(CAST(NULL AS INT), CAST('-2147483648' AS INT), CAST(0 AS INT)), + map('negative', CAST('-2147483648' AS INT), 'zero', CAST(NULL AS INT)), + named_struct('name', CAST(NULL AS STRING), 'value', CAST(-1 AS INT))), + (2, CAST(NULL AS BOOLEAN), CAST(NULL AS TINYINT), CAST(NULL AS SMALLINT), + CAST(NULL AS INT), CAST(NULL AS BIGINT), + CAST(NULL AS FLOAT), CAST(NULL AS DOUBLE), CAST(NULL AS DECIMAL(10,2)), + CAST(NULL AS DECIMAL(5)), CAST(NULL AS DECIMAL(38,18)), + CAST(NULL AS STRING), CAST(NULL AS BINARY), + CAST(NULL AS DATE), + CAST(NULL AS TIMESTAMP_NTZ), + CAST(NULL AS TIMESTAMP), + CAST(NULL AS ARRAY), + CAST(NULL AS MAP), + CAST(NULL AS STRUCT)) + """ + ) + spark.sql("ALTER TABLE full_types_boundary_table SET TBLPROPERTIES ('file.format' = 'orc')") + spark.sql( + """ + INSERT INTO full_types_boundary_table VALUES + (3, true, CAST('127' AS TINYINT), CAST('32767' AS SMALLINT), + CAST('2147483647' AS INT), CAST('9223372036854775807' AS BIGINT), + CAST(0.25 AS FLOAT), 0.5, CAST('99999999.99' AS DECIMAL(10,2)), + CAST('99999' AS DECIMAL(5)), + CAST('99999999999999999999.999999999999999999' AS DECIMAL(38,18)), + 'orc-boundary', X'00FF', + DATE '1970-01-01', + TIMESTAMP_NTZ '1970-01-01 00:00:00', + TIMESTAMP '1970-01-01 00:00:00', + CAST(array() AS ARRAY), + CAST(map() AS MAP), + named_struct('name', 'orc', 'value', CAST(NULL AS INT))), + (4, CAST(NULL AS BOOLEAN), CAST(NULL AS TINYINT), CAST(NULL AS SMALLINT), + CAST(NULL AS INT), CAST(NULL AS BIGINT), + CAST(NULL AS FLOAT), CAST(NULL AS DOUBLE), CAST(NULL AS DECIMAL(10,2)), + CAST(NULL AS DECIMAL(5)), CAST(NULL AS DECIMAL(38,18)), + CAST(NULL AS STRING), CAST(NULL AS BINARY), + CAST(NULL AS DATE), + CAST(NULL AS TIMESTAMP_NTZ), + CAST(NULL AS TIMESTAMP), + CAST(NULL AS ARRAY), + CAST(NULL AS MAP), + CAST(NULL AS STRUCT)) + """ + ) + spark.sql("ALTER TABLE full_types_boundary_table SET TBLPROPERTIES ('file.format' = 'avro')") + spark.sql( + """ + INSERT INTO full_types_boundary_table VALUES + (5, false, CAST(0 AS TINYINT), CAST(0 AS SMALLINT), + 0, 0, + CAST(0.0 AS FLOAT), 0.0, CAST(0.00 AS DECIMAL(10,2)), + CAST(0 AS DECIMAL(5)), CAST(0 AS DECIMAL(38,18)), + 'avro-boundary', X'0102', + DATE '1970-01-02', + TIMESTAMP_NTZ '1970-01-01 00:00:00.999999', + TIMESTAMP '1970-01-01 00:00:00.999999', + array(CAST(7 AS INT)), + map('seven', 7), + named_struct('name', 'avro', 'value', 7)), + (6, CAST(NULL AS BOOLEAN), CAST(NULL AS TINYINT), CAST(NULL AS SMALLINT), + CAST(NULL AS INT), CAST(NULL AS BIGINT), + CAST(NULL AS FLOAT), CAST(NULL AS DOUBLE), CAST(NULL AS DECIMAL(10,2)), + CAST(NULL AS DECIMAL(5)), CAST(NULL AS DECIMAL(38,18)), + CAST(NULL AS STRING), CAST(NULL AS BINARY), + CAST(NULL AS DATE), + CAST(NULL AS TIMESTAMP_NTZ), + CAST(NULL AS TIMESTAMP), + CAST(NULL AS ARRAY), + CAST(NULL AS MAP), + CAST(NULL AS STRUCT)) + """ + ) + # ===== First-Row merge engine PK table ===== # first-row keeps the earliest inserted row per key; later duplicates are ignored.