[client] Support log scanner scan to arrow record batch#2995
Conversation
bfa46ef to
e29e8c5
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds an Arrow-native scan path so Fluss clients can poll log data as Arrow VectorSchemaRoot batches (instead of iterating row-by-row), including support for schema evolution (missing columns filled with nulls) and projection reordering.
Changes:
- Introduces
ArrowLogScanner/ArrowScanRecordsand theArrowBatchDatacontainer API for polling Arrow record batches from the client. - Adds unshaded-Arrow batch loading + client-side projection utilities (
UnshadedArrowReadUtils,UnshadedFlussVectorLoader, unshaded compression codecs/factory). - Refactors existing log scanner/collector code to share implementation via new abstract base classes and adds tests covering Arrow-batch loading and schema evolution.
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java | Adds loadArrowBatch(...) API and Arrow-specific methods to ReadContext. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java | Adds selected row type + Arrow column projection calculation; makes Arrow resources lazily created. |
| fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java | Implements loadArrowBatch(...) for Arrow log batches, including optional client-side projection. |
| fluss-common/src/main/java/org/apache/fluss/record/ArrowBatchData.java | New public container holding VectorSchemaRoot + log metadata + change types; closeable. |
| fluss-common/src/main/java/org/apache/fluss/utils/UnshadedArrowReadUtils.java | New utilities to deserialize unshaded Arrow batches and project/reorder vectors. |
| fluss-common/src/main/java/org/apache/fluss/record/UnshadedFlussVectorLoader.java | New unshaded variant of FlussVectorLoader for scanner/read path. |
| fluss-common/src/main/java/org/apache/fluss/compression/Unshaded*ArrowCompressionCodec.java | New unshaded Arrow compression codecs for LZ4/Zstd. |
| fluss-common/src/main/java/org/apache/fluss/compression/UnshadedArrowCompressionFactory.java | Factory providing unshaded compression codecs for read path. |
| fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java | Exposes index mapping via new accessor. |
| fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java | Delegates loadArrowBatch() for file-backed batches. |
| fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java | Adds schema-evolution test for missing projected columns in Arrow batches. |
| fluss-common/src/test/java/org/apache/fluss/record/FileLogInputStreamTest.java | Adds test to load Arrow batch from file log input stream. |
| fluss-common/pom.xml | Adds unshaded Arrow dependencies (currently provided). |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java | Adds createArrowLogScanner() API. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java | Implements createArrowLogScanner() with log-format/limit validation. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowLogScanner*.java | New Arrow scanner interfaces + implementation. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowScanRecords.java | New container for scanned Arrow batches per bucket. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java | Refactors to share logic via AbstractLogScanner. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/AbstractLogScanner.java | New shared scanner implementation (poll/subscribe/unsubscribe/wakeup/close). |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java | Adds Arrow fetch collection path. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowLogFetchCollector.java | New fetch collector producing ArrowScanRecords. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/AbstractLogFetchCollector.java | New shared fetch-collection implementation. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java | Refactors to extend AbstractLogFetchCollector. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java | Adds fetchArrowBatches(...) path that loads batches via loadArrowBatch(...). |
| fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java | Adds IT coverage for Arrow scanning, schema evolution, and projection reorder. |
| fluss-client/pom.xml | Adds unshaded Arrow dependencies (currently provided). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
b865197 to
1432afb
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 24 out of 24 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
1432afb to
e897604
Compare
1daf809 to
433fd85
Compare
1c9cf70 to
2f0f9ba
Compare
2f0f9ba to
8ece150
Compare
8ece150 to
15031fe
Compare
There was a problem hiding this comment.
@luoyuxia Ty for the PR, LGTM overall, one minor comment
Also shall we file followups based on TODOs?
…batches Replace IOUtils.closeQuietly with an explicit close that surfaces the exception when Arrow batches have not been released before the scanner is closed.
aa1b9ec to
f1bb47f
Compare
|
@fresh-borzoni Thanks for your comments. Comments addressed. |
42d1ed9 to
f1bb47f
Compare
c672dd0 to
5793384
Compare
- Fix fetchArrowBatches Javadoc: maxRecords is a soft upper bound, not a hard limit, since batches are returned whole. - Remove unused selectedFields parameter from LogRecordReadContext constructor. - Fix mid-loop failure in fetchArrowBatches: deliver partial results instead of discarding all batches, mirroring fetchRecords() semantics. The batch iterator is single-pass so rolling back offsets is not viable. - Remove dead closeArrowBatches method. - Add off-heap resource cleanup in AbstractLogFetchCollector for non-FetchException failures to prevent Arrow memory leaks. - Fix race in LogRecordReadContext.close(): synchronize on unshadedArrowResourceLock to prevent returning an already-closed allocator via getOrCreateUnshadedBufferAllocator() fast path.
5793384 to
4270b74
Compare
Purpose
Linked issue: close #2965
Support
LogScannerto scan log data as Arrow record batches (ArrowBatchData) instead of row-by-rowScanRecord, enabling consumers like the tiering service to process columnar Arrow batches directly.Brief change log
LogScannerImpl.pollRecordBatch(Duration)returningArrowScanRecordsfor ARROW-format append-only log tables.ArrowBatchDatato hold scanned ArrowVectorSchemaRootwith log metadata and memory ownership.LogRecordBatch.loadArrowBatch(ReadContext)to load Arrow batch directly from batch memory.ArrowRecordBatchContext/UnshadedArrowBatchAccessto bridge shaded/unshaded Arrow types with batch-scoped child allocators.UnshadedArrowReadUtilsfor the unshaded read path with schema evolution support.AbstractLogFetchCollector<T, R>to share fetch-collection logic between row-based and Arrow batch paths.CompletedFetch.nextFetchedBatch()/finishFetchedBatches()and generalizeLogScannerImpl.doPoll()for code reuse.Tests
LogScannerITCase.testPollArrowBatchesWithSchemaEvolutionFileLogInputStreamTest.testLoadArrowBatchFromFileLogInputStreamAPI and Format
@InternalAPI:LogScannerImpl.pollRecordBatch(Duration),ArrowBatchData,ArrowScanRecords.Documentation
N/A