[spark] Fix empty projection causing Invalid metadata length for COUNT(*)/COUNT(1)#3227
[spark] Fix empty projection causing Invalid metadata length for COUNT(*)/COUNT(1)#3227Kaixuan-Duan wants to merge 3 commits into
Conversation
|
@Kaixuan-Duan Hi, seems is it same with #2725. cc @beryllw |
|
@luoyuxia I’m sorry. It was my fault. I didn't notice that there was already a PR pointing to issue#2724 |
|
I've closed PR #2725.Please go ahead. |
|
@beryllw Thanks for the suggestion! |
5f4d23a to
91624a4
Compare
I think we could assume they're always accurate in Fluss — for KV tables it's backed by rowCount, and for log tables it's derived from getHighWatermark() - logStartOffset(). fluss/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java Lines 248 to 250 in d4cd1a2 fluss/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java Lines 307 to 318 in c6a1c2d |
91624a4 to
1fb013b
Compare
c61edcc to
036af32
Compare
036af32 to
8602e38
Compare
Purpose
Linked issue: close #2724
When Spark pushes down an empty column projection for
COUNT(*)/COUNT(1)queries, the Fluss server fails withIllegalStateException("Invalid metadata length")inFileLogProjection.project(), causing the client to retry indefinitely and the query to hang.This PR fixes the issue from two sides:
Server side: reject empty projection early with a clear
InvalidColumnProjectionExceptioninstead of crashing with an internal error.Spark connector side: fall back to projecting the first column when Spark pushes down an empty projection, so the row count is preserved without fetching unnecessary data.
Brief change log
FileLogProjection#setCurrentProjection: add a guard that throws InvalidColumnProjectionException when selectedFieldPositions is empty.FileLogProjectionTest: add testEmptyProjectionRejectsWithClearError to verify the server-side guard.FlussBatch#projection/FlussMicroBatchStream#projection: when readSchema yields an empty projection, fall back to Array(0) (first column).SparkLogTableReadTest: addCOUNT(*)andCOUNT(1)end-to-end tests for log tables.SparkPrimaryKeyTableReadTest: addCOUNT(*)end-to-end test for primary key tables.Tests
./mvnw -pl fluss-common -DskipTests=false -Dtest=FileLogProjectionTest#testEmptyProjectionRejectsWithClearError test
./mvnw -pl fluss-spark/fluss-spark-ut -am install -DskipTests
./mvnw -pl fluss-spark/fluss-spark-ut -Dsuites='org.apache.fluss.spark.SparkLogTableReadTest' test
./mvnw -pl fluss-spark/fluss-spark-ut -Dsuites='org.apache.fluss.spark.SparkPrimaryKeyTableReadTest' test
API and Format
Documentation