Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions crates/integration_tests/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,128 @@ async fn test_read_schema_evolution_type_promotion() {
);
}

fn assert_plan_file_formats(plan: &Plan, expected_formats: &[&str], table_name: &str) {
let formats: HashSet<&str> = plan
.splits()
.iter()
.flat_map(|split| split.data_files())
.filter_map(|file| file.file_name.rsplit_once('.').map(|(_, ext)| ext))
.collect();
assert_eq!(
formats,
expected_formats.iter().copied().collect(),
"{table_name} should scan the expected data file formats"
);
}

fn assert_plan_has_multiple_schema_ids(plan: &Plan, table_name: &str) {
let schema_ids: HashSet<i64> = plan
.splits()
.iter()
.flat_map(|split| split.data_files())
.map(|file| file.schema_id)
.collect();
assert!(
schema_ids.len() >= 2,
"{table_name} should scan files from multiple schema versions, got {schema_ids:?}"
);
}

/// Test reading mixed-format files after ALTER TABLE ADD COLUMNS.
/// Old Parquet files lack the new column; newer ORC/Avro files contain it.
#[tokio::test]
async fn test_read_format_schema_evolution_add_column() {
let table_name = "format_schema_evolution_add_column";
let (plan, batches) = scan_and_read_with_fs_catalog(table_name, None).await;
assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name);
assert_plan_has_multiple_schema_ids(&plan, table_name);

let mut rows: Vec<(i32, String, Option<i32>)> = Vec::new();
for batch in &batches {
let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("id");
let name = batch
.column_by_name("name")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("name");
let age = batch
.column_by_name("age")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("age");
for i in 0..batch.num_rows() {
rows.push((
id.value(i),
name.value(i).to_string(),
(!age.is_null(i)).then(|| age.value(i)),
));
}
}
rows.sort_by_key(|(id, _, _)| *id);

assert_eq!(
rows,
vec![
(1, "alice".into(), None),
(2, "bob".into(), None),
(3, "carol".into(), Some(30)),
(4, "dave".into(), Some(40)),
(5, "eve".into(), Some(50)),
(6, "frank".into(), Some(60)),
],
"Old Parquet rows should have null age and new ORC/Avro rows should keep age values"
);
}

/// Test reading mixed-format files after ALTER TABLE ALTER COLUMN TYPE (INT -> BIGINT).
/// Old Parquet files have INT; newer ORC/Avro files have BIGINT.
#[tokio::test]
async fn test_read_format_schema_evolution_type_promotion() {
let table_name = "format_schema_evolution_type_promotion";
let (plan, batches) = scan_and_read_with_fs_catalog(table_name, None).await;
assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name);
assert_plan_has_multiple_schema_ids(&plan, table_name);

for batch in &batches {
let value_col = batch.column_by_name("value").expect("value column");
assert_eq!(
value_col.data_type(),
&arrow_array::types::Int64Type::DATA_TYPE,
"value column should be Int64 after mixed-format type promotion"
);
}

let mut rows: Vec<(i32, i64)> = Vec::new();
for batch in &batches {
let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("id");
let value = batch
.column_by_name("value")
.and_then(|c| c.as_any().downcast_ref::<Int64Array>())
.expect("value as Int64Array");
for i in 0..batch.num_rows() {
rows.push((id.value(i), value.value(i)));
}
}
rows.sort_by_key(|(id, _)| *id);

assert_eq!(
rows,
vec![
(1, 100),
(2, 200),
(3, 3_000_000_000),
(4, 4_000_000_000),
(5, 5_000_000_000),
(6, 6_000_000_000),
],
"Old Parquet INT rows should be cast to BIGINT and new ORC/Avro BIGINT rows should match"
);
}

/// Stats pruning should treat a newly added column as all-NULL for old files.
#[tokio::test]
async fn test_stats_pruning_schema_evolution_added_column_eq_prunes_old_files() {
Expand Down
54 changes: 54 additions & 0 deletions dev/spark/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,60 @@ def main():
"INSERT INTO schema_evolution_type_promotion VALUES (3, 3000000000)"
)

# ===== Mixed-format Schema Evolution: Add Column =====
# Old Parquet files lack age; new ORC/Avro files contain age.
spark.sql(
"""
CREATE TABLE IF NOT EXISTS format_schema_evolution_add_column (
id INT,
name STRING
) USING paimon
TBLPROPERTIES (
'file.format' = 'parquet'
)
"""
)
spark.sql(
"INSERT INTO format_schema_evolution_add_column VALUES (1, 'alice'), (2, 'bob')"
)
spark.sql("ALTER TABLE format_schema_evolution_add_column ADD COLUMNS (age INT)")
spark.sql("ALTER TABLE format_schema_evolution_add_column SET TBLPROPERTIES ('file.format' = 'orc')")
spark.sql(
"INSERT INTO format_schema_evolution_add_column VALUES (3, 'carol', 30), (4, 'dave', 40)"
)
spark.sql("ALTER TABLE format_schema_evolution_add_column SET TBLPROPERTIES ('file.format' = 'avro')")
spark.sql(
"INSERT INTO format_schema_evolution_add_column VALUES (5, 'eve', 50), (6, 'frank', 60)"
)

# ===== Mixed-format Schema Evolution: Type Promotion (INT -> BIGINT) =====
# Old Parquet files have value as INT; new ORC/Avro files have value as BIGINT.
spark.sql(
"""
CREATE TABLE IF NOT EXISTS format_schema_evolution_type_promotion (
id INT,
value INT
) USING paimon
TBLPROPERTIES (
'file.format' = 'parquet'
)
"""
)
spark.sql(
"INSERT INTO format_schema_evolution_type_promotion VALUES (1, 100), (2, 200)"
)
spark.sql(
"ALTER TABLE format_schema_evolution_type_promotion ALTER COLUMN value TYPE BIGINT"
)
spark.sql("ALTER TABLE format_schema_evolution_type_promotion SET TBLPROPERTIES ('file.format' = 'orc')")
spark.sql(
"INSERT INTO format_schema_evolution_type_promotion VALUES (3, 3000000000), (4, 4000000000)"
)
spark.sql("ALTER TABLE format_schema_evolution_type_promotion SET TBLPROPERTIES ('file.format' = 'avro')")
spark.sql(
"INSERT INTO format_schema_evolution_type_promotion VALUES (5, 5000000000), (6, 6000000000)"
)

# ===== Data Evolution + Schema Evolution: Add Column =====
# Combines data-evolution (row-tracking + MERGE INTO) with ALTER TABLE ADD COLUMNS.
# Old files lack the new column; MERGE INTO produces partial-column files.
Expand Down
Loading