Skip to content

[spark] Fix empty projection causing Invalid metadata length for COUNT(*)/COUNT(1)#3227

Open
Kaixuan-Duan wants to merge 3 commits into
apache:mainfrom
Kaixuan-Duan:issue-2724-empty-projection
Open

[spark] Fix empty projection causing Invalid metadata length for COUNT(*)/COUNT(1)#3227
Kaixuan-Duan wants to merge 3 commits into
apache:mainfrom
Kaixuan-Duan:issue-2724-empty-projection

Conversation

@Kaixuan-Duan
Copy link
Copy Markdown
Contributor

@Kaixuan-Duan Kaixuan-Duan commented Apr 28, 2026

Purpose

Linked issue: close #2724

When Spark pushes down an empty column projection for COUNT(*)/COUNT(1) queries, the Fluss server fails with IllegalStateException("Invalid metadata length") in FileLogProjection.project(), causing the client to retry indefinitely and the query to hang.

This PR fixes the issue from two sides:

Server side: reject empty projection early with a clear InvalidColumnProjectionException instead of crashing with an internal error.
Spark connector side: fall back to projecting the first column when Spark pushes down an empty projection, so the row count is preserved without fetching unnecessary data.

Brief change log

  • FileLogProjection#setCurrentProjection: add a guard that throws InvalidColumnProjectionException when selectedFieldPositions is empty.

  • FileLogProjectionTest: add testEmptyProjectionRejectsWithClearError to verify the server-side guard.

  • FlussBatch#projection / FlussMicroBatchStream#projection: when readSchema yields an empty projection, fall back to Array(0) (first column).

  • SparkLogTableReadTest: add COUNT(*) and COUNT(1) end-to-end tests for log tables.

  • SparkPrimaryKeyTableReadTest: add COUNT(*) end-to-end test for primary key tables.

Tests

./mvnw -pl fluss-common -DskipTests=false -Dtest=FileLogProjectionTest#testEmptyProjectionRejectsWithClearError test

./mvnw -pl fluss-spark/fluss-spark-ut -am install -DskipTests
./mvnw -pl fluss-spark/fluss-spark-ut -Dsuites='org.apache.fluss.spark.SparkLogTableReadTest' test
./mvnw -pl fluss-spark/fluss-spark-ut -Dsuites='org.apache.fluss.spark.SparkPrimaryKeyTableReadTest' test

API and Format

Documentation

Comment thread fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java Outdated
@luoyuxia
Copy link
Copy Markdown
Contributor

luoyuxia commented Apr 30, 2026

@Kaixuan-Duan Hi, seems is it same with #2725. cc @beryllw

@Kaixuan-Duan
Copy link
Copy Markdown
Contributor Author

@luoyuxia I’m sorry. It was my fault. I didn't notice that there was already a PR pointing to issue#2724

@beryllw
Copy link
Copy Markdown
Contributor

beryllw commented May 8, 2026

I've closed PR #2725.Please go ahead.
Also, is it possible to use the pre-computed table statistics directly when dealing with PK tables, instead of scanning the data and counting afterward?

@Kaixuan-Duan
Copy link
Copy Markdown
Contributor Author

Kaixuan-Duan commented May 9, 2026

@beryllw Thanks for the suggestion!
Done. The Spark connector now implements SupportsPushDownAggregates and answers COUNT(*) / COUNT(1) / COUNT(non-null col) (no GROUP BY, no DISTINCT) on PK non-lake tables directly via Admin.getTableStats().getRowCount() — no data scan.
Falls back gracefully on UnsupportedVersionException for older clusters. PTAL.

@Kaixuan-Duan Kaixuan-Duan force-pushed the issue-2724-empty-projection branch from 5f4d23a to 91624a4 Compare May 9, 2026 19:29
@Yohahaha
Copy link
Copy Markdown
Contributor

I've closed PR #2725.Please go ahead. Also, is it possible to use the pre-computed table statistics directly when dealing with PK tables, instead of scanning the data and counting afterward?

@beryllw curious could we assume table statistics always accurate in fluss?

@Kaixuan-Duan Kaixuan-Duan requested a review from beryllw May 18, 2026 01:45
@beryllw
Copy link
Copy Markdown
Contributor

beryllw commented May 19, 2026

@beryllw curious could we assume table statistics always accurate in fluss?

I think we could assume they're always accurate in Fluss — for KV tables it's backed by rowCount, and for log tables it's derived from getHighWatermark() - logStartOffset().

public long getRowCount() {
return getHighWatermark() - logStartOffset();
}

public long getRowCount() {
if (rowCount == ROW_COUNT_DISABLED) {
throw new InvalidTableException(
String.format(
"Row count is disabled for this table '%s'. This usually happens when the table is"
+ "created before v0.9 or the changelog image is set to WAL, "
+ "as maintaining row count in WAL mode is costly and not necessary for most use cases. "
+ "If you want to enable row count, please set changelog image to FULL.",
getTablePath()));
}
return rowCount;
}

@Kaixuan-Duan Kaixuan-Duan force-pushed the issue-2724-empty-projection branch from 91624a4 to 1fb013b Compare May 26, 2026 14:34
@Kaixuan-Duan Kaixuan-Duan force-pushed the issue-2724-empty-projection branch from c61edcc to 036af32 Compare June 2, 2026 09:06
@Kaixuan-Duan Kaixuan-Duan force-pushed the issue-2724-empty-projection branch from 036af32 to 8602e38 Compare June 2, 2026 10:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[spark] Empty projection in batch read throws Invalid metadata length with COUNT(*)/COUNT(1)

4 participants