diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index fdf213a3..d0766c3f 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -1152,6 +1152,128 @@ async fn test_read_schema_evolution_type_promotion() { ); } +fn assert_plan_file_formats(plan: &Plan, expected_formats: &[&str], table_name: &str) { + let formats: HashSet<&str> = plan + .splits() + .iter() + .flat_map(|split| split.data_files()) + .filter_map(|file| file.file_name.rsplit_once('.').map(|(_, ext)| ext)) + .collect(); + assert_eq!( + formats, + expected_formats.iter().copied().collect(), + "{table_name} should scan the expected data file formats" + ); +} + +fn assert_plan_has_multiple_schema_ids(plan: &Plan, table_name: &str) { + let schema_ids: HashSet = plan + .splits() + .iter() + .flat_map(|split| split.data_files()) + .map(|file| file.schema_id) + .collect(); + assert!( + schema_ids.len() >= 2, + "{table_name} should scan files from multiple schema versions, got {schema_ids:?}" + ); +} + +/// Test reading mixed-format files after ALTER TABLE ADD COLUMNS. +/// Old Parquet files lack the new column; newer ORC/Avro files contain it. +#[tokio::test] +async fn test_read_format_schema_evolution_add_column() { + let table_name = "format_schema_evolution_add_column"; + let (plan, batches) = scan_and_read_with_fs_catalog(table_name, None).await; + assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name); + assert_plan_has_multiple_schema_ids(&plan, table_name); + + let mut rows: Vec<(i32, String, Option)> = Vec::new(); + for batch in &batches { + let id = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("id"); + let name = batch + .column_by_name("name") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("name"); + let age = batch + .column_by_name("age") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("age"); + for i in 0..batch.num_rows() { + rows.push(( + id.value(i), + name.value(i).to_string(), + (!age.is_null(i)).then(|| age.value(i)), + )); + } + } + rows.sort_by_key(|(id, _, _)| *id); + + assert_eq!( + rows, + vec![ + (1, "alice".into(), None), + (2, "bob".into(), None), + (3, "carol".into(), Some(30)), + (4, "dave".into(), Some(40)), + (5, "eve".into(), Some(50)), + (6, "frank".into(), Some(60)), + ], + "Old Parquet rows should have null age and new ORC/Avro rows should keep age values" + ); +} + +/// Test reading mixed-format files after ALTER TABLE ALTER COLUMN TYPE (INT -> BIGINT). +/// Old Parquet files have INT; newer ORC/Avro files have BIGINT. +#[tokio::test] +async fn test_read_format_schema_evolution_type_promotion() { + let table_name = "format_schema_evolution_type_promotion"; + let (plan, batches) = scan_and_read_with_fs_catalog(table_name, None).await; + assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name); + assert_plan_has_multiple_schema_ids(&plan, table_name); + + for batch in &batches { + let value_col = batch.column_by_name("value").expect("value column"); + assert_eq!( + value_col.data_type(), + &arrow_array::types::Int64Type::DATA_TYPE, + "value column should be Int64 after mixed-format type promotion" + ); + } + + let mut rows: Vec<(i32, i64)> = Vec::new(); + for batch in &batches { + let id = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("id"); + let value = batch + .column_by_name("value") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("value as Int64Array"); + for i in 0..batch.num_rows() { + rows.push((id.value(i), value.value(i))); + } + } + rows.sort_by_key(|(id, _)| *id); + + assert_eq!( + rows, + vec![ + (1, 100), + (2, 200), + (3, 3_000_000_000), + (4, 4_000_000_000), + (5, 5_000_000_000), + (6, 6_000_000_000), + ], + "Old Parquet INT rows should be cast to BIGINT and new ORC/Avro BIGINT rows should match" + ); +} + /// Stats pruning should treat a newly added column as all-NULL for old files. #[tokio::test] async fn test_stats_pruning_schema_evolution_added_column_eq_prunes_old_files() { diff --git a/dev/spark/provision.py b/dev/spark/provision.py index c7c408d3..1a948c6e 100644 --- a/dev/spark/provision.py +++ b/dev/spark/provision.py @@ -370,6 +370,60 @@ def main(): "INSERT INTO schema_evolution_type_promotion VALUES (3, 3000000000)" ) + # ===== Mixed-format Schema Evolution: Add Column ===== + # Old Parquet files lack age; new ORC/Avro files contain age. + spark.sql( + """ + CREATE TABLE IF NOT EXISTS format_schema_evolution_add_column ( + id INT, + name STRING + ) USING paimon + TBLPROPERTIES ( + 'file.format' = 'parquet' + ) + """ + ) + spark.sql( + "INSERT INTO format_schema_evolution_add_column VALUES (1, 'alice'), (2, 'bob')" + ) + spark.sql("ALTER TABLE format_schema_evolution_add_column ADD COLUMNS (age INT)") + spark.sql("ALTER TABLE format_schema_evolution_add_column SET TBLPROPERTIES ('file.format' = 'orc')") + spark.sql( + "INSERT INTO format_schema_evolution_add_column VALUES (3, 'carol', 30), (4, 'dave', 40)" + ) + spark.sql("ALTER TABLE format_schema_evolution_add_column SET TBLPROPERTIES ('file.format' = 'avro')") + spark.sql( + "INSERT INTO format_schema_evolution_add_column VALUES (5, 'eve', 50), (6, 'frank', 60)" + ) + + # ===== Mixed-format Schema Evolution: Type Promotion (INT -> BIGINT) ===== + # Old Parquet files have value as INT; new ORC/Avro files have value as BIGINT. + spark.sql( + """ + CREATE TABLE IF NOT EXISTS format_schema_evolution_type_promotion ( + id INT, + value INT + ) USING paimon + TBLPROPERTIES ( + 'file.format' = 'parquet' + ) + """ + ) + spark.sql( + "INSERT INTO format_schema_evolution_type_promotion VALUES (1, 100), (2, 200)" + ) + spark.sql( + "ALTER TABLE format_schema_evolution_type_promotion ALTER COLUMN value TYPE BIGINT" + ) + spark.sql("ALTER TABLE format_schema_evolution_type_promotion SET TBLPROPERTIES ('file.format' = 'orc')") + spark.sql( + "INSERT INTO format_schema_evolution_type_promotion VALUES (3, 3000000000), (4, 4000000000)" + ) + spark.sql("ALTER TABLE format_schema_evolution_type_promotion SET TBLPROPERTIES ('file.format' = 'avro')") + spark.sql( + "INSERT INTO format_schema_evolution_type_promotion VALUES (5, 5000000000), (6, 6000000000)" + ) + # ===== Data Evolution + Schema Evolution: Add Column ===== # Combines data-evolution (row-tracking + MERGE INTO) with ALTER TABLE ADD COLUMNS. # Old files lack the new column; MERGE INTO produces partial-column files.