diff --git a/core/src/main/java/org/apache/datafusion/DataFrame.java b/core/src/main/java/org/apache/datafusion/DataFrame.java index 38ec2f5..d4e0226 100644 --- a/core/src/main/java/org/apache/datafusion/DataFrame.java +++ b/core/src/main/java/org/apache/datafusion/DataFrame.java @@ -334,6 +334,155 @@ public DataFrame unnestColumns(UnnestOptions options, String... columns) { return new DataFrame(unnestColumns(nativeHandle, columns, options.preserveNulls())); } + // -- Set operations ------------------------------------------------------ + // + // The Java method names mirror DataFusion's Rust API verbatim. SQL semantics: + // + // union = UNION ALL (positional, keeps duplicates) + // unionDistinct = UNION (positional, deduplicated) + // unionByName = UNION ALL by column name; missing columns become NULL + // unionByNameDistinct = UNION by column name; missing columns become NULL + // intersect = INTERSECT ALL (keeps duplicates) + // intersectDistinct = INTERSECT (deduplicated) + // except = EXCEPT ALL (keeps duplicates) + // exceptDistinct = EXCEPT (deduplicated) + // + // Note: the *_distinct variants deduplicate, while the unsuffixed methods keep + // duplicates. This is the inverse of Spark's convention, where `intersect` + // deduplicates and `intersectAll` keeps duplicates -- consult the Javadoc on + // each method to confirm semantics before porting Spark code. + // + // None of these methods consume the receiver or {@code other}; both DataFrames + // remain usable after the call. The native side clones the LogicalPlan on + // each side, which is cheap (LogicalPlan is Arc-backed in DataFusion). + + /** + * Concatenate this DataFrame with {@code other} by column position, keeping all duplicates (SQL + * {@code UNION ALL}). The two schemas must match positionally. Both this DataFrame and {@code + * other} remain usable after the call and must still be closed independently. + * + * @throws IllegalArgumentException if {@code other} is {@code null}. + * @throws RuntimeException if the schemas are incompatible. + */ + public DataFrame union(DataFrame other) { + return new DataFrame(unionRows(nativeHandle, otherHandle("union", other))); + } + + /** + * Concatenate this DataFrame with {@code other} by column position, removing duplicates (SQL + * {@code UNION DISTINCT} -- equivalent to plain {@code UNION} in standard SQL). Both DataFrames + * remain usable. + * + * @throws IllegalArgumentException if {@code other} is {@code null}. + * @throws RuntimeException if the schemas are incompatible. + */ + public DataFrame unionDistinct(DataFrame other) { + return new DataFrame(unionDistinctRows(nativeHandle, otherHandle("unionDistinct", other))); + } + + /** + * Concatenate this DataFrame with {@code other} by column name, keeping all duplicates. Columns + * present in only one side are filled with NULL on the other. Both DataFrames remain usable. + * + * @throws IllegalArgumentException if {@code other} is {@code null}. + * @throws RuntimeException if column types disagree on a shared name. + */ + public DataFrame unionByName(DataFrame other) { + return new DataFrame(unionByNameRows(nativeHandle, otherHandle("unionByName", other))); + } + + /** + * Concatenate this DataFrame with {@code other} by column name, removing duplicates. Columns + * present in only one side are filled with NULL on the other. Both DataFrames remain usable. + * + * @throws IllegalArgumentException if {@code other} is {@code null}. + * @throws RuntimeException if column types disagree on a shared name. + */ + public DataFrame unionByNameDistinct(DataFrame other) { + return new DataFrame( + unionByNameDistinctRows(nativeHandle, otherHandle("unionByNameDistinct", other))); + } + + /** + * Rows present in both this DataFrame and {@code other}, keeping duplicates from the receiver + * (SQL {@code INTERSECT ALL}). Both schemas must match positionally. Both DataFrames remain + * usable. + * + *
Implementation note: DataFusion implements {@code INTERSECT ALL} as a + * left-semi join on equality, not as standard SQL bag intersection. A left row is kept iff any + * matching row exists in {@code other}. With {@code left = (1, 2, 2, 3)} and {@code right = (2, + * 3)}, the result is {@code (2, 2, 3)} -- both copies of {@code 2} survive because each finds a + * match in {@code right}. PostgreSQL / Spark {@code INTERSECT ALL} would also yield {@code (2, 2, + * 3)} here, but the two engines diverge when {@code other} has fewer copies than {@code this} of + * a row that appears in both. + * + * @throws IllegalArgumentException if {@code other} is {@code null}. + * @throws RuntimeException if the schemas are incompatible. + */ + public DataFrame intersect(DataFrame other) { + return new DataFrame(intersectRows(nativeHandle, otherHandle("intersect", other))); + } + + /** + * Rows present in both this DataFrame and {@code other}, deduplicated (SQL {@code INTERSECT}). + * Both schemas must match positionally. Both DataFrames remain usable. + * + * @throws IllegalArgumentException if {@code other} is {@code null}. + * @throws RuntimeException if the schemas are incompatible. + */ + public DataFrame intersectDistinct(DataFrame other) { + return new DataFrame( + intersectDistinctRows(nativeHandle, otherHandle("intersectDistinct", other))); + } + + /** + * Rows present in this DataFrame but not in {@code other}, keeping duplicates from the receiver + * (SQL {@code EXCEPT ALL}). Both schemas must match positionally. Both DataFrames remain usable. + * + *
Implementation note: DataFusion implements {@code EXCEPT ALL} as a
+ * left-anti join on equality, not as standard SQL bag difference. A left row is kept iff
+ * no matching row exists in {@code other} -- the multiplicity of matches is irrelevant.
+ * With {@code left = (1, 1, 2, 2, 3)} and {@code right = (1, 3)}, the result is {@code (2, 2)}:
+ * both copies of {@code 2} survive (no match in {@code right}); both copies of {@code 1} and the
+ * {@code 3} drop. PostgreSQL / Spark {@code EXCEPT ALL} would yield the same answer here, but the
+ * two engines diverge when {@code right} contains fewer copies than {@code left} of a row that
+ * appears in both.
+ *
+ * @throws IllegalArgumentException if {@code other} is {@code null}.
+ * @throws RuntimeException if the schemas are incompatible.
+ */
+ public DataFrame except(DataFrame other) {
+ return new DataFrame(exceptRows(nativeHandle, otherHandle("except", other)));
+ }
+
+ /**
+ * Rows present in this DataFrame but not in {@code other}, deduplicated (SQL {@code EXCEPT}).
+ * Both schemas must match positionally. Both DataFrames remain usable.
+ *
+ * @throws IllegalArgumentException if {@code other} is {@code null}.
+ * @throws RuntimeException if the schemas are incompatible.
+ */
+ public DataFrame exceptDistinct(DataFrame other) {
+ return new DataFrame(exceptDistinctRows(nativeHandle, otherHandle("exceptDistinct", other)));
+ }
+
+ /**
+ * Validate the receiver and the other DataFrame and return {@code other.nativeHandle}. Common
+ * preamble for the eight set-operation methods so the validation logic stays in one place.
+ */
+ private long otherHandle(String op, DataFrame other) {
+ if (nativeHandle == 0) {
+ throw new IllegalStateException("DataFrame is closed or already collected");
+ }
+ if (other == null) {
+ throw new IllegalArgumentException(op + " other must be non-null");
+ }
+ if (other.nativeHandle == 0) {
+ throw new IllegalStateException(op + " other DataFrame is closed or already collected");
+ }
+ return other.nativeHandle;
+ }
+
/**
* Order the rows by the supplied sort keys. Each {@link SortExpr} names a column and a direction
* ({@link SortExpr#asc(String)} / {@link SortExpr#desc(String)}); call {@link
@@ -668,6 +817,22 @@ public void close() {
private static native long unnestColumns(long handle, String[] columns, boolean preserveNulls);
+ private static native long unionRows(long handle, long otherHandle);
+
+ private static native long unionDistinctRows(long handle, long otherHandle);
+
+ private static native long unionByNameRows(long handle, long otherHandle);
+
+ private static native long unionByNameDistinctRows(long handle, long otherHandle);
+
+ private static native long intersectRows(long handle, long otherHandle);
+
+ private static native long intersectDistinctRows(long handle, long otherHandle);
+
+ private static native long exceptRows(long handle, long otherHandle);
+
+ private static native long exceptDistinctRows(long handle, long otherHandle);
+
private static native long sortRows(
long handle, String[] columns, boolean[] ascending, boolean[] nullsFirst);
diff --git a/core/src/test/java/org/apache/datafusion/DataFrameTransformationsTest.java b/core/src/test/java/org/apache/datafusion/DataFrameTransformationsTest.java
index ab977a9..8a5edde 100644
--- a/core/src/test/java/org/apache/datafusion/DataFrameTransformationsTest.java
+++ b/core/src/test/java/org/apache/datafusion/DataFrameTransformationsTest.java
@@ -123,7 +123,8 @@ void chainFilterSelectCount() {
@Test
void methodsThrowAfterClose() {
- try (SessionContext ctx = new SessionContext()) {
+ try (SessionContext ctx = new SessionContext();
+ DataFrame other = ctx.sql("SELECT 1 AS x")) {
DataFrame df = ctx.sql("SELECT 1 AS x");
df.close();
assertThrows(IllegalStateException.class, () -> df.select("x"));
@@ -135,6 +136,14 @@ void methodsThrowAfterClose() {
assertThrows(IllegalStateException.class, () -> df.withColumnRenamed("x", "y"));
assertThrows(IllegalStateException.class, () -> df.withColumn("y", "x + 1"));
assertThrows(IllegalStateException.class, () -> df.unnestColumns("x"));
+ assertThrows(IllegalStateException.class, () -> df.union(other));
+ assertThrows(IllegalStateException.class, () -> df.unionDistinct(other));
+ assertThrows(IllegalStateException.class, () -> df.unionByName(other));
+ assertThrows(IllegalStateException.class, () -> df.unionByNameDistinct(other));
+ assertThrows(IllegalStateException.class, () -> df.intersect(other));
+ assertThrows(IllegalStateException.class, () -> df.intersectDistinct(other));
+ assertThrows(IllegalStateException.class, () -> df.except(other));
+ assertThrows(IllegalStateException.class, () -> df.exceptDistinct(other));
assertThrows(IllegalStateException.class, () -> df.sort(SortExpr.asc("x")));
assertThrows(IllegalStateException.class, () -> df.repartitionRoundRobin(1));
assertThrows(IllegalStateException.class, () -> df.repartitionHash(1, "x"));
@@ -148,7 +157,8 @@ void methodsThrowAfterClose() {
void methodsThrowAfterCollect() throws Exception {
try (BufferAllocator allocator = new RootAllocator();
SessionContext ctx = new SessionContext();
- DataFrame df = ctx.sql("SELECT 1 AS x")) {
+ DataFrame df = ctx.sql("SELECT 1 AS x");
+ DataFrame other = ctx.sql("SELECT 1 AS x")) {
try (ArrowReader reader = df.collect(allocator)) {
assertTrue(reader.loadNextBatch());
}
@@ -161,6 +171,14 @@ void methodsThrowAfterCollect() throws Exception {
assertThrows(IllegalStateException.class, () -> df.withColumnRenamed("x", "y"));
assertThrows(IllegalStateException.class, () -> df.withColumn("y", "x + 1"));
assertThrows(IllegalStateException.class, () -> df.unnestColumns("x"));
+ assertThrows(IllegalStateException.class, () -> df.union(other));
+ assertThrows(IllegalStateException.class, () -> df.unionDistinct(other));
+ assertThrows(IllegalStateException.class, () -> df.unionByName(other));
+ assertThrows(IllegalStateException.class, () -> df.unionByNameDistinct(other));
+ assertThrows(IllegalStateException.class, () -> df.intersect(other));
+ assertThrows(IllegalStateException.class, () -> df.intersectDistinct(other));
+ assertThrows(IllegalStateException.class, () -> df.except(other));
+ assertThrows(IllegalStateException.class, () -> df.exceptDistinct(other));
assertThrows(IllegalStateException.class, () -> df.sort(SortExpr.asc("x")));
assertThrows(IllegalStateException.class, () -> df.repartitionRoundRobin(1));
assertThrows(IllegalStateException.class, () -> df.repartitionHash(1, "x"));
@@ -511,6 +529,152 @@ void unnestColumnsRejectsNullArgs() {
}
}
+ // -- Set operations -------------------------------------------------------
+
+ @Test
+ void unionKeepsDuplicates() {
+ try (SessionContext ctx = new SessionContext();
+ DataFrame left = ctx.sql("SELECT * FROM (VALUES (1), (2), (2)) AS t(x)");
+ DataFrame right = ctx.sql("SELECT * FROM (VALUES (2), (3)) AS t(x)");
+ DataFrame combined = left.union(right)) {
+ // union = UNION ALL: 3 + 2 = 5 rows, duplicates preserved.
+ assertEquals(5L, combined.count());
+ }
+ }
+
+ @Test
+ void unionDistinctRemovesDuplicates() {
+ try (SessionContext ctx = new SessionContext();
+ DataFrame left = ctx.sql("SELECT * FROM (VALUES (1), (2), (2)) AS t(x)");
+ DataFrame right = ctx.sql("SELECT * FROM (VALUES (2), (3)) AS t(x)");
+ DataFrame combined = left.unionDistinct(right)) {
+ // unionDistinct = UNION (deduplicated): {1, 2, 3} = 3 rows.
+ assertEquals(3L, combined.count());
+ }
+ }
+
+ @Test
+ void unionByNameAlignsColumnsByName() {
+ try (SessionContext ctx = new SessionContext();
+ // Left: (a, b). Right: (b, a) -- same names, swapped column positions.
+ DataFrame left = ctx.sql("SELECT 1 AS a, 2 AS b");
+ DataFrame right = ctx.sql("SELECT 4 AS b, 3 AS a");
+ DataFrame combined = left.unionByName(right)) {
+ // 2 rows: positional union would mix columns, but unionByName aligns them.
+ assertEquals(2L, combined.count());
+ }
+ }
+
+ @Test
+ void unionByNameDistinctRemovesDuplicates() {
+ try (SessionContext ctx = new SessionContext();
+ DataFrame left = ctx.sql("SELECT 1 AS a, 2 AS b");
+ DataFrame right = ctx.sql("SELECT 2 AS b, 1 AS a");
+ DataFrame combined = left.unionByNameDistinct(right)) {
+ // After name-aligning, both rows are (a=1, b=2). Distinct collapses to 1.
+ assertEquals(1L, combined.count());
+ }
+ }
+
+ @Test
+ void intersectKeepsCommonRowsWithDuplicates() {
+ try (SessionContext ctx = new SessionContext();
+ DataFrame left = ctx.sql("SELECT * FROM (VALUES (1), (2), (2), (3)) AS t(x)");
+ DataFrame right = ctx.sql("SELECT * FROM (VALUES (2), (2), (3)) AS t(x)");
+ DataFrame inter = left.intersect(right)) {
+ // INTERSECT ALL: row r appears min(count(r) in left, count(r) in right) times.
+ // 2 appears 2 times in both => 2 rows; 3 appears 1 time in both => 1 row. Total 3.
+ assertEquals(3L, inter.count());
+ }
+ }
+
+ @Test
+ void intersectDistinctReturnsUniqueCommonRows() {
+ try (SessionContext ctx = new SessionContext();
+ DataFrame left = ctx.sql("SELECT * FROM (VALUES (1), (2), (2), (3)) AS t(x)");
+ DataFrame right = ctx.sql("SELECT * FROM (VALUES (2), (2), (3)) AS t(x)");
+ DataFrame inter = left.intersectDistinct(right)) {
+ // INTERSECT (deduplicated): {2, 3} = 2 rows.
+ assertEquals(2L, inter.count());
+ }
+ }
+
+ @Test
+ void exceptKeepsLeftMinusRightWithDuplicates() {
+ try (SessionContext ctx = new SessionContext();
+ DataFrame left = ctx.sql("SELECT * FROM (VALUES (1), (1), (2), (2), (3)) AS t(x)");
+ DataFrame right = ctx.sql("SELECT * FROM (VALUES (1), (3)) AS t(x)");
+ DataFrame diff = left.except(right)) {
+ // DataFusion's EXCEPT ALL is implemented as a LeftAnti join, so a left row is kept iff
+ // its key has no match in right. With left {1, 1, 2, 2, 3} and right {1, 3}: both 1s and
+ // the 3 are dropped, both 2s are kept. Total 2 rows -- duplicates of 2 preserved.
+ assertEquals(2L, diff.count());
+ }
+ }
+
+ @Test
+ void exceptDistinctReturnsUniqueLeftOnlyRows() {
+ try (SessionContext ctx = new SessionContext();
+ DataFrame left = ctx.sql("SELECT * FROM (VALUES (1), (1), (2), (2), (3)) AS t(x)");
+ DataFrame right = ctx.sql("SELECT * FROM (VALUES (1), (3)) AS t(x)");
+ DataFrame diff = left.exceptDistinct(right)) {
+ // EXCEPT (DISTINCT): left is deduplicated to {1, 2, 3}, then anti-joined against
+ // right {1, 3}. Result: {2} = 1 row.
+ assertEquals(1L, diff.count());
+ }
+ }
+
+ @Test
+ void setOpsAreNonDestructive() {
+ try (SessionContext ctx = new SessionContext();
+ DataFrame left = ctx.sql("SELECT * FROM (VALUES (1), (2)) AS t(x)");
+ DataFrame right = ctx.sql("SELECT * FROM (VALUES (2), (3)) AS t(x)")) {
+ try (DataFrame combined = left.union(right)) {
+ assertEquals(4L, combined.count());
+ }
+ // Both originals still usable after the set-op call.
+ assertEquals(2L, left.count());
+ assertEquals(2L, right.count());
+ }
+ }
+
+ @Test
+ void setOpsRejectNullOther() {
+ try (SessionContext ctx = new SessionContext();
+ DataFrame df = ctx.sql("SELECT 1 AS x")) {
+ assertThrows(IllegalArgumentException.class, () -> df.union(null));
+ assertThrows(IllegalArgumentException.class, () -> df.unionDistinct(null));
+ assertThrows(IllegalArgumentException.class, () -> df.unionByName(null));
+ assertThrows(IllegalArgumentException.class, () -> df.unionByNameDistinct(null));
+ assertThrows(IllegalArgumentException.class, () -> df.intersect(null));
+ assertThrows(IllegalArgumentException.class, () -> df.intersectDistinct(null));
+ assertThrows(IllegalArgumentException.class, () -> df.except(null));
+ assertThrows(IllegalArgumentException.class, () -> df.exceptDistinct(null));
+ }
+ }
+
+ @Test
+ void setOpsRejectClosedOther() {
+ try (SessionContext ctx = new SessionContext();
+ DataFrame df = ctx.sql("SELECT 1 AS x")) {
+ DataFrame other = ctx.sql("SELECT 1 AS x");
+ other.close();
+ assertThrows(IllegalStateException.class, () -> df.union(other));
+ assertThrows(IllegalStateException.class, () -> df.intersectDistinct(other));
+ assertThrows(IllegalStateException.class, () -> df.exceptDistinct(other));
+ }
+ }
+
+ @Test
+ void unionWithIncompatibleSchemaThrows() {
+ // Different column count -- LogicalPlanBuilder rejects at plan-build time.
+ try (SessionContext ctx = new SessionContext();
+ DataFrame left = ctx.sql("SELECT 1 AS x, 2 AS y");
+ DataFrame right = ctx.sql("SELECT 1 AS x")) {
+ assertThrows(RuntimeException.class, () -> left.union(right));
+ }
+ }
+
// -- sort -----------------------------------------------------------------
@Test
diff --git a/native/src/lib.rs b/native/src/lib.rs
index 9bd82bd..ce69549 100644
--- a/native/src/lib.rs
+++ b/native/src/lib.rs
@@ -649,6 +649,65 @@ pub extern "system" fn Java_org_apache_datafusion_DataFrame_unnestColumns<'local
})
}
+// -- Set operations -------------------------------------------------------
+//
+// Each handler clones both DataFrames -- DataFusion's set-op methods consume
+// `self` and the argument by value, but `DataFrame: Clone` is cheap (the
+// underlying LogicalPlan is Arc-backed), so cloning lets us keep both Java
+// receivers usable. The Java side already validates that both handles are
+// non-null before reaching here.
+macro_rules! set_op_handler {
+ ($fn_name:ident, $df_method:ident) => {
+ #[no_mangle]
+ pub extern "system" fn $fn_name<'local>(
+ mut env: JNIEnv<'local>,
+ _class: JClass<'local>,
+ handle: jlong,
+ other_handle: jlong,
+ ) -> jlong {
+ try_unwrap_or_throw(&mut env, 0, |_env| -> JniResult