Skip to content

feat(table): add SessionContext.registerStreamingTable for push-mode batch ingest#98

Open
LantaoJin wants to merge 1 commit into
apache:mainfrom
LantaoJin:feat/streaming-table-sink
Open

feat(table): add SessionContext.registerStreamingTable for push-mode batch ingest#98
LantaoJin wants to merge 1 commit into
apache:mainfrom
LantaoJin:feat/streaming-table-sink

Conversation

@LantaoJin
Copy link
Copy Markdown
Contributor

@LantaoJin LantaoJin commented May 26, 2026

Which issue does this PR close?

Rationale for this change

PR #65 shipped a Java-implemented TableProvider and SessionContext.registerTable(String, TableProvider). That covered the pull shape: DataFusion calls scan(BufferAllocator) and reads the returned ArrowReader. For Java callers who already have batches in memory, or who can produce them on demand, that's the right surface.

It does not cover the push shape that event-driven batch sources need:

  • A coordinator reducing over shard responses arriving incrementally.
  • A Flight stream feeding into a query.
  • Any in-process producer that emits batches as side-effects of other work and doesn't know in advance how many will arrive.

Bridging these into PR #65 today requires writing a BlockingArrowReader adapter that buffers pushed batches and serves them through the pull interface. That's a serialisation point: the producer blocks waiting for loadNextBatch(), or DataFusion blocks waiting for the next batch -- the two ends can never run truly concurrently. The adapter also has to invent its own backpressure, error propagation, end-of-stream signalling, and thread-safety story.

DataFusion itself solves this on the Rust side with StreamingTable + PartitionStream plus an mpsc channel. This PR surfaces that exact shape from Java.

What changes are included in this PR?

New public Java API on SessionContext returning a typed sink:

TableSink sink = ctx.registerStreamingTable("shard_results", schema, /* capacity */ 16);
// Producer thread (any thread, including outside any Tokio runtime, and including a thread
// that is itself a Tokio worker -- see "Tokio context detection" below):
try {
  while (hasMoreInput()) {
    sink.write(batch);   // backpressures when channel is full
  }
  sink.close();          // EOF: queries see end-of-stream cleanly
} catch (Throwable t) {
  sink.fail(t);          // signal error: queries see RuntimeException
}
public final class TableSink implements AutoCloseable {
  void write(VectorSchemaRoot batch);
  @Override void close();
  void fail(Throwable cause);
}

After registration the table can be queried like any other:

DataFrame df = ctx.sql("SELECT count(*) FROM shard_results");
ArrowReader r = df.collect(allocator);

Single-scan semantics. The registered table can be queried at most once; subsequent scans against the same registration throw a RuntimeException. This matches the natural semantic for an event-driven producer (the data is consumed as it arrives) and avoids buffering every batch internally. Callers who need re-scan should use the pull-mode registerTable + SimpleTableProvider path from PR #65 instead. Documented loudly on registerStreamingTable's Javadoc and tested explicitly (secondScanThrows).

Schema constraints (validated at registration). The schema passed to registerStreamingTable must:

  • Have at least one column. Zero-column streaming would have no allocator to borrow for the FFI scratch; rejected up front rather than failing at first write.
  • Have no dictionary-encoded fields at any depth (recursive walk through Field.getChildren()). Data.exportVectorSchemaRoot would otherwise NPE on the missing DictionaryProvider -- v1 cannot supply one. The error message names the dotted path (e.g. 'row.code') so nested violations are debuggable. A future overload that accepts a DictionaryProvider would lift this restriction; out of scope here.

Native side, in a new native/src/streaming_table.rs.

JNI handlers in native/src/lib.rs.

The Java TableSink mirrors this with an AtomicLong nativeHandle and a ReentrantReadWriteLock over the lifetime; write() holds the read lock during the JNI call, close()/fail() flip the handle to 0 (forbidding new writes), call closeSinkNative/failSinkNative (waking parked writes), then take the write lock and call dropHandleNative (which now waits for all readers to drain).

The TableSink.write Java path derives the FFI scratch allocator from the producer's batch (batch.getFieldVectors().get(0).getAllocator()) so the exported buffers share an allocator-root with the producer's vectors. Using a separate RootAllocator would make Data.exportVectorSchemaRoot reject the cross-root transfer.

Cancellation propagates automatically: when DataFusion drops the consumer (query cancelled, LIMIT N short-circuits, error in another operator), the receiver is dropped, Sender::send resolves to Err(SendError), and the JNI handler surfaces that as a RuntimeException on the producer thread. Producers that ignore the exception leak no resources because the sender's Drop runs unconditionally.

tokio-stream is added as a direct Cargo dependency (it was already pulled in transitively by datafusion-physical-plan; declaring it directly insulates us from transitive resolution drift). The tokio dependency gains the sync feature for Notify.

Out of scope (for follow-ups):

  • Multi-partition registerStreamingTable. StreamingTable::try_new accepts Vec<Arc<dyn PartitionStream>>; an overload accepting multiple sinks is a small additive follow-up.
  • try_write(batch) -> boolean -- non-blocking variant. Easy to add as a method on TableSink.
  • Re-scannable streaming. Would force buffering every batch; defeats the use case. Use pull-mode registerTable instead.
  • Dictionary-encoded fields. Would need a DictionaryProvider overload on registerStreamingTable. Tracked as a future enhancement.

Are these changes tested?

Yes -- 18 new tests in SessionContextStreamingTableTest.

Are there any user-facing changes?

Yes -- purely additive. New public API:

  • org.apache.datafusion.TableSink (final, AutoCloseable)
  • SessionContext.registerStreamingTable(String, Schema, int) -> TableSink

No API removals, no deprecations, no behavior change for existing callers. The native binary picks up tokio-stream as a direct dep but it was already on the classpath transitively, so the binary size delta is zero. The tokio dependency gains the sync feature, which is small (Notify, mpsc).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(table): expose registerStreamingTable for push-mode batch ingest

1 participant