[lake/hudi] Introduce Hudi LakeCatalog to create table#3395
Conversation
…Schema() and add isCreatingFlussTable in HudiLakeCatalog.createTable()
…talogDatabaseImpl in HudiLakeCatalog & use copied Configuration in HudiCatalogUtils
|
all tests are passed, please help review, thanks! @XuQianJin-Stars |
There was a problem hiding this comment.
Pull request overview
This PR introduces Hudi lake catalog support so Fluss can create Hudi-backed lake tables (paralleling existing Paimon/Iceberg integrations), including schema conversion, option rewriting/validation, and Flink-side reflective loading to keep fluss-flink-common free of a compile-time Hudi dependency.
Changes:
- Add new
fluss-lake-hudimodule implementingLakeCatalog/LakeStoragefor Hudi table creation (schema + properties conversions, HMS/DFS modes, idempotent create). - Extend Flink integration to support HUDI in
LakeFlinkCatalog(reflective catalog creation) andLakeTableFactory(reflective table factory loading). - Add unit tests for Hudi catalog behavior (properties rewriting, schema compatibility, idempotency, protected options, system-column conflicts).
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-lake/pom.xml | Adds Flink table API dependency management needed by the new Hudi module. |
| fluss-lake/fluss-lake-hudi/pom.xml | Introduces/adjusts Hudi + Flink/Hadoop dependencies and test utilities for the new module. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java | New Hudi LakeCatalog implementation (create table, schema compatibility for idempotency, DB creation). |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeStorage.java | Wires LakeStorage#createLakeCatalog() to return the new HudiLakeCatalog. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/FlussDataTypeToHudiDataType.java | Converts Fluss types to Flink/Hudi types, with mode-specific handling for LTZ timestamps. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java | Converts Fluss table descriptors to Hudi/Flink schema + options; validates protected Hudi options; rewrites property prefixes. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java | Builds Hudi Catalog instances for HMS/DFS modes and helpers to create catalog table wrappers. |
| fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java | New unit test suite covering prefix rewriting, table creation, schema compatibility, idempotency, and validation. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java | Adds HUDI branch + reflection-based HudiCatalogFactory to load Hudi from plugin classloader. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java | Adds HUDI branch and reflective HoodieTableFactory instantiation for table sources. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
… options log to debug;[flink] load Hudi table factory with context classloader
…reate-table' into Introduce-hudi-LakeCatalog-to-create-table
luoyuxia
left a comment
There was a problem hiding this comment.
@fhan688 Thanks for the pr. LGTM overall. Left minor comment.
@XuQianJin-Stars Do you want to have another round of review again?
| } else if (lakeFormat == ICEBERG) { | ||
| catalog = IcebergCatalogFactory.create(catalogName, catalogProperties); | ||
| this.lakeFormat = ICEBERG; | ||
| } else if (lakeFormat == HUDI) { |
There was a problem hiding this comment.
Could you please remove these changes in LakeFlinkCatalog and LakeTableFactory in this pr. Let's focus Hudi own catalog instead of flink catalog in this pr.
We do add them back in another pr which requirs a IT to test we can use $lake to read hudi table
| import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.FILE_SYSTEM_TYPE; | ||
| import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HIVE_META_STORE_TYPE; | ||
|
|
||
| /** Convert from Fluss's data type to Hudi's internal data type. */ |
There was a problem hiding this comment.
nit: not Hudi's internal data type. It's flink internal data type.
Also add comments why convert to flink internal data type intead of Hudi's internal data type.
|
|
||
| boolean isPkTable = tableDescriptor.getSchema().getPrimaryKeyIndexes().length > 0; | ||
|
|
||
| // Create Hudi catalog table |
There was a problem hiding this comment.
could you please add comment that why convert to flink catalog table and call flink catalog to create hudi table?
|
|
||
| List<String> primaryKeys = new ArrayList<>(); | ||
| primaryKeys.add("id"); | ||
| TableSchema expectHudiSchema = |
There was a problem hiding this comment.
nit: 'org.apache.flink.table.api.TableSchema' is deprecated
|
|
||
| List<String> primaryKeys = new ArrayList<>(); | ||
| primaryKeys.add("id"); | ||
| TableSchema expectHudiSchema = |
| // ------------------------------------------------------------------ | ||
|
|
||
| @Test | ||
| void testIsHudiSchemaCompatibleWithSameSchema() { |
There was a problem hiding this comment.
nit: combine testIsHudiSchemaCompatible releated test into single one test method?
Purpose
Linked issue: #3275
This PR introduces the Hudi
LakeCatalogimplementation, enabling Fluss to create tables in Hudi data lake storage. This aligns with the existing Paimon and Iceberg lake catalog support, completing the trio of supported lake formats for table creation.Brief change log
New modules & classes (
fluss-lake/fluss-lake-hudi):HudiLakeCatalog: ImplementsLakeCataloginterface, supporting both HMS (Hive Metastore) and DFS (filesystem) catalog modes. Handles table creation with schema compatibility check for crash-recovery idempotency, automatic database creation, and system column (__bucket, __offset, __timestamp) appending.FlussDataTypeToHudiDataType: ImplementsDataTypeVisitorto convert Fluss data types to Flink types (Hudi's type system). HandlesLocalZonedTimestampTypespecially: maps toBIGINTunder HMS mode,TIMESTAMP_WITH_LOCAL_TIME_ZONEunder DFS mode.HudiConversions: Core conversion utility. Converts FlussTablePath→ HudiObjectPath,TableDescriptor→ResolvedSchema/ Hudi table properties. ValidatesHUDI_UNSETTABLE_OPTIONS(6 protected options that Fluss auto-manages), checks system column name conflicts, and handles property prefix rewriting (hudi.xxx→xxx, others →fluss.xxx).HudiCatalogUtils: Factory for creating HudiCataloginstances (HoodieHiveCatalogfor HMS,HoodieCatalog forDFS). Uses copied Configuration to avoid mutating the original.Modifications to existing modules (
fluss-flink/fluss-flink-common):LakeFlinkCatalog: Adds HUDI branch ingetLakeCatalog()with a newHudiCatalogFactoryinner class that uses reflection to instantiate Hudi catalog (mirroring the Iceberg pattern to avoid compile-time dependency on hudi-flink-bundle).LakeTableFactory: Adds HUDI branch ingetLakeTableFactory()withgetHudiFactory()that reflectively loadsHoodieTableFactory.HudiLakeStorage: Replaces theUnsupportedOperationExceptionincreateLakeCatalog()withnew HudiLakeCatalog(hudiConfig)to wire the SPI path.Key design decisions:
Tests
HudiLakeCatalogTest(14 test cases):testPropertyPrefixRewriting— verifies hudi.xxx → xxx and non-hudi → fluss.xxx prefix rewritingtestCreatePrimaryKeyTable— PK table (MOR) creation with system columns and primary keytestCreateLogTable— Log table (COW) creation with record key from customPropertiestestIsHudiSchemaCompatibleWithSameSchema— compatible schemas return truetestIsHudiSchemaCompatibleWithDifferentColumnCount— different column count returns falsetestIsHudiSchemaCompatibleWithDifferentColumnName— different column name returns falsetestIsHudiSchemaCompatibleWithDifferentColumnType— different column type returns falsetestCreateDuplicateTableWithCompatibleSchema— duplicate creation with compatible schema is idempotentt
estCreateDuplicateTableWithIncompatibleSchema— duplicate creation with incompatible schema throwsTableAlreadyExistExceptiontestUnsettableOptionInPropertiesThrowsException— protected option inpropertiesthrowsInvalidConfigExceptiontestUnsettableOptionInCustomPropertiesThrowsException— protected option in customProperties throwsInvalidConfigExceptiontestNonProtectedHudiOptionPassesValidation— non-protected option passes validationtestSystemColumnBucketConflictThrowsException—__bucketconflict throwsInvalidTableExceptiontestSystemColumnOffsetConflictThrowsException—__offsetconflict throwsInvalidTableExceptiontestSystemColumnTimestampConflictThrowsException—__timestampconflict throwsInvalidTableExceptionAPI and Format
No API or storage format changes. This PR only adds new implementations for the existing
LakeCatalogandLakeStorage SPIinterfaces.Documentation
A new feature — Hudi lake catalog support for table creation. Will need documentation updates for the Hudi integration guide.