Skip to content

[lake/hudi] Introduce Hudi LakeCatalog to create table#3395

Open
fhan688 wants to merge 17 commits into
apache:mainfrom
fhan688:Introduce-hudi-LakeCatalog-to-create-table
Open

[lake/hudi] Introduce Hudi LakeCatalog to create table#3395
fhan688 wants to merge 17 commits into
apache:mainfrom
fhan688:Introduce-hudi-LakeCatalog-to-create-table

Conversation

@fhan688
Copy link
Copy Markdown
Contributor

@fhan688 fhan688 commented May 28, 2026

Purpose

Linked issue: #3275

This PR introduces the Hudi LakeCatalog implementation, enabling Fluss to create tables in Hudi data lake storage. This aligns with the existing Paimon and Iceberg lake catalog support, completing the trio of supported lake formats for table creation.

Brief change log

New modules & classes (fluss-lake/fluss-lake-hudi):

  • HudiLakeCatalog: Implements LakeCatalog interface, supporting both HMS (Hive Metastore) and DFS (filesystem) catalog modes. Handles table creation with schema compatibility check for crash-recovery idempotency, automatic database creation, and system column (__bucket, __offset, __timestamp) appending.

  • FlussDataTypeToHudiDataType: Implements DataTypeVisitor to convert Fluss data types to Flink types (Hudi's type system). Handles LocalZonedTimestampType specially: maps to BIGINT under HMS mode, TIMESTAMP_WITH_LOCAL_TIME_ZONE under DFS mode.

  • HudiConversions: Core conversion utility. Converts Fluss TablePath → Hudi ObjectPath, TableDescriptorResolvedSchema / Hudi table properties. Validates HUDI_UNSETTABLE_OPTIONS (6 protected options that Fluss auto-manages), checks system column name conflicts, and handles property prefix rewriting (hudi.xxxxxx, others → fluss.xxx).

  • HudiCatalogUtils: Factory for creating Hudi Catalog instances (HoodieHiveCatalog for HMS, HoodieCatalog for DFS). Uses copied Configuration to avoid mutating the original.

Modifications to existing modules (fluss-flink/fluss-flink-common):

  • LakeFlinkCatalog: Adds HUDI branch in getLakeCatalog() with a new HudiCatalogFactory inner class that uses reflection to instantiate Hudi catalog (mirroring the Iceberg pattern to avoid compile-time dependency on hudi-flink-bundle).

  • LakeTableFactory: Adds HUDI branch in getLakeTableFactory() with getHudiFactory() that reflectively loads HoodieTableFactory.

  • HudiLakeStorage: Replaces the UnsupportedOperationException in createLakeCatalog() with new HudiLakeCatalog(hudiConfig) to wire the SPI path.

Key design decisions:

Aspect Decision
Table type mapping PK table → MERGE_ON_READ, Log table → COPY_ON_WRITE
Index strategy BUCKET index type, aligned with Fluss's bucketing model
Dependency isolation Hudi bundle loaded via reflection/plugin classloader (no compile-time dependency in fluss-flink-common)
Catalog mode Supports hms (Hive Metastore) and dfs (filesystem)
Property rewriting hudi. prefix stripped; non-hudi properties prefixed with fluss.
Idempotency Schema-compatible duplicate creation is treated as success for crash recovery

Tests

HudiLakeCatalogTest (14 test cases):

  • testPropertyPrefixRewriting — verifies hudi.xxx → xxx and non-hudi → fluss.xxx prefix rewriting

  • testCreatePrimaryKeyTable — PK table (MOR) creation with system columns and primary key

  • testCreateLogTable — Log table (COW) creation with record key from customProperties

  • testIsHudiSchemaCompatibleWithSameSchema — compatible schemas return true

  • testIsHudiSchemaCompatibleWithDifferentColumnCount — different column count returns false

  • testIsHudiSchemaCompatibleWithDifferentColumnName — different column name returns false

  • testIsHudiSchemaCompatibleWithDifferentColumnType — different column type returns false

  • testCreateDuplicateTableWithCompatibleSchema — duplicate creation with compatible schema is idempotent

  • testCreateDuplicateTableWithIncompatibleSchema — duplicate creation with incompatible schema throws TableAlreadyExistException

  • testUnsettableOptionInPropertiesThrowsException — protected option in properties throws InvalidConfigException

  • testUnsettableOptionInCustomPropertiesThrowsException — protected option in customProperties throws InvalidConfigException

  • testNonProtectedHudiOptionPassesValidation — non-protected option passes validation

  • testSystemColumnBucketConflictThrowsException__bucket conflict throws InvalidTableException

  • testSystemColumnOffsetConflictThrowsException__offset conflict throws InvalidTableException

  • testSystemColumnTimestampConflictThrowsException__timestamp conflict throws InvalidTableException

API and Format

No API or storage format changes. This PR only adds new implementations for the existing LakeCatalog and LakeStorage SPI interfaces.

Documentation

A new feature — Hudi lake catalog support for table creation. Will need documentation updates for the Hudi integration guide.

@fhan688
Copy link
Copy Markdown
Contributor Author

fhan688 commented May 28, 2026

all tests are passed, please help review, thanks! @XuQianJin-Stars

@fhan688 fhan688 closed this Jun 1, 2026
@fhan688 fhan688 reopened this Jun 1, 2026
@fhan688 fhan688 closed this Jun 1, 2026
@fhan688 fhan688 reopened this Jun 1, 2026
@luoyuxia luoyuxia requested a review from Copilot June 1, 2026 08:25
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces Hudi lake catalog support so Fluss can create Hudi-backed lake tables (paralleling existing Paimon/Iceberg integrations), including schema conversion, option rewriting/validation, and Flink-side reflective loading to keep fluss-flink-common free of a compile-time Hudi dependency.

Changes:

  • Add new fluss-lake-hudi module implementing LakeCatalog/LakeStorage for Hudi table creation (schema + properties conversions, HMS/DFS modes, idempotent create).
  • Extend Flink integration to support HUDI in LakeFlinkCatalog (reflective catalog creation) and LakeTableFactory (reflective table factory loading).
  • Add unit tests for Hudi catalog behavior (properties rewriting, schema compatibility, idempotency, protected options, system-column conflicts).

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
fluss-lake/pom.xml Adds Flink table API dependency management needed by the new Hudi module.
fluss-lake/fluss-lake-hudi/pom.xml Introduces/adjusts Hudi + Flink/Hadoop dependencies and test utilities for the new module.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java New Hudi LakeCatalog implementation (create table, schema compatibility for idempotency, DB creation).
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeStorage.java Wires LakeStorage#createLakeCatalog() to return the new HudiLakeCatalog.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/FlussDataTypeToHudiDataType.java Converts Fluss types to Flink/Hudi types, with mode-specific handling for LTZ timestamps.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java Converts Fluss table descriptors to Hudi/Flink schema + options; validates protected Hudi options; rewrites property prefixes.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java Builds Hudi Catalog instances for HMS/DFS modes and helpers to create catalog table wrappers.
fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java New unit test suite covering prefix rewriting, table creation, schema compatibility, idempotency, and validation.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java Adds HUDI branch + reflection-based HudiCatalogFactory to load Hudi from plugin classloader.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java Adds HUDI branch and reflective HoodieTableFactory instantiation for table sources.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

fhan688 and others added 4 commits June 1, 2026 18:49
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
… options log to debug;[flink] load Hudi table factory with context classloader
…reate-table' into Introduce-hudi-LakeCatalog-to-create-table
@fhan688 fhan688 closed this Jun 1, 2026
@fhan688 fhan688 reopened this Jun 1, 2026
Copy link
Copy Markdown
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fhan688 Thanks for the pr. LGTM overall. Left minor comment.
@XuQianJin-Stars Do you want to have another round of review again?

} else if (lakeFormat == ICEBERG) {
catalog = IcebergCatalogFactory.create(catalogName, catalogProperties);
this.lakeFormat = ICEBERG;
} else if (lakeFormat == HUDI) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please remove these changes in LakeFlinkCatalog and LakeTableFactory in this pr. Let's focus Hudi own catalog instead of flink catalog in this pr.
We do add them back in another pr which requirs a IT to test we can use $lake to read hudi table

import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.FILE_SYSTEM_TYPE;
import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HIVE_META_STORE_TYPE;

/** Convert from Fluss's data type to Hudi's internal data type. */
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not Hudi's internal data type. It's flink internal data type.
Also add comments why convert to flink internal data type intead of Hudi's internal data type.


boolean isPkTable = tableDescriptor.getSchema().getPrimaryKeyIndexes().length > 0;

// Create Hudi catalog table
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please add comment that why convert to flink catalog table and call flink catalog to create hudi table?


List<String> primaryKeys = new ArrayList<>();
primaryKeys.add("id");
TableSchema expectHudiSchema =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 'org.apache.flink.table.api.TableSchema' is deprecated


List<String> primaryKeys = new ArrayList<>();
primaryKeys.add("id");
TableSchema expectHudiSchema =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dito

// ------------------------------------------------------------------

@Test
void testIsHudiSchemaCompatibleWithSameSchema() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: combine testIsHudiSchemaCompatible releated test into single one test method?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants