[flink-action][server][client] add orphan files cleanup action for remote storage#3404
[flink-action][server][client] add orphan files cleanup action for remote storage#3404platinumhamburg wants to merge 2 commits into
Conversation
9b95b89 to
9bd732b
Compare
9bd732b to
901dc41
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a Flink-based orphan_files_clean action to identify and delete orphaned remote-storage artifacts (log segments/manifests and KV snapshot files) by adding new coordinator read-only RPCs to enumerate the active reference set and wiring client/server support for those RPCs.
Changes:
- Add new RPCs
LIST_REMOTE_LOG_MANIFESTSandLIST_KV_SNAPSHOTS(proto + api keys), implement them inCoordinatorService, and expose them via the clientAdmin. - Add new Flink action module + SPI loader/entrypoint and implement the orphan cleanup DAG (scope enumeration → scan/clean → stats aggregation + empty-dir sweep) with rule-based file classification and audit logging.
- Extend filesystem metadata support (modification time) to enable safe age-based deletion, and add unit/integration tests for the new behavior.
Reviewed changes
Copilot reviewed 82 out of 82 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-test-coverage/pom.xml | Excludes newly introduced Flink action entry/SPI classes from coverage instrumentation. |
| fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java | Adds stub methods for the new list RPCs in the tablet gateway test implementation. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java | Adds stub methods for the new list RPCs in the coordinator gateway test implementation. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServiceOrphanRpcsITCase.java | Adds IT coverage for coordinator orphan-cleanup RPCs (manifests + snapshot listing). |
| fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java | Adds ZK helper methods to list remote log manifest handles and bucket snapshot IDs. |
| fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java | Rejects orphan-cleanup RPCs on tablet servers (coordinator-only RPCs). |
| fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java | Adds API to expose active snapshot IDs (retained ∪ still-in-use) for listing RPCs. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseManager.java | Exposes lease-pinned snapshot IDs to support “still-in-use” snapshot reporting. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java | Implements listRemoteLogManifests and listKvSnapshots coordinator RPC handlers. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java | Adjusts visibility/testing annotation around snapshot store manager accessor. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java | Adds per-bucket active snapshot ID computation with ZK fallback when store isn’t in-memory. |
| fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java | Adds stub methods for the new list RPCs in RPC test scaffolding. |
| fluss-rpc/src/main/proto/FlussApi.proto | Defines new request/response messages for manifest/snapshot listing. |
| fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java | Registers new API keys for the orphan-cleanup list RPCs. |
| fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java | Adds gateway methods for the two new read-only list RPCs. |
| fluss-flink/pom.xml | Adds the new fluss-flink-action module. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/FlinkMultipleParameterToolTest.java | Extends adapter tests for new convenience accessors. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/rule/RuleDispatcherTest.java | Adds rule dispatch coverage for orphan-cleanup file classification. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/rule/OrphanDirDetectorTest.java | Adds tests for orphan table/partition directory detection by ID guards. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/rule/LogSegmentRuleTest.java | Adds log segment rule tests (active-set + cutoff semantics). |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/rule/LogManifestRuleTest.java | Adds manifest rule tests (default conservative + opt-in deletion). |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/rule/KvSnapshotFileRuleTest.java | Adds KV snapshot file rule tests (active snap dirs + cutoff semantics). |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/rule/KvSharedSstRuleTest.java | Adds tests ensuring shared SSTs are never deleted. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/RpcErrorClassifierTest.java | Adds tests for stable RPC error categorization used in audit logs. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/job/EmptyDirSweeperTest.java | Adds tests for post-clean empty directory sweeping (dry-run + bottom-up). |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/fs/SafeDeleterTest.java | Adds tests for safe deletion behavior (dry-run, non-empty dir no-op, etc.). |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/config/OrphanCleanConfigTest.java | Adds CLI config parsing/validation tests for orphan cleanup action. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/build/MaxKnownIdsTrackerTest.java | Adds tests for max-known ID tracking used for orphan directory guard logic. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/build/ActiveRefsFetcherTest.java | Adds tests for manifest/snapshot active-set fetching (retries + per-bucket failures). |
| fluss-flink/fluss-flink-common/src/main/resources/META-INF/services/org.apache.fluss.flink.action.ActionFactory | Registers OrphanFilesCleanActionFactory via ServiceLoader SPI. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java | Adds convenience accessors (has/get/getMultiParameter) for CLI parsing. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/RuleId.java | Introduces stable rule identifiers for audit tagging. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/RuleDispatcher.java | Implements rule dispatch based on path patterns (log/kv/manifest/shared/unknown). |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/OrphanDirDetector.java | Implements orphan table/partition dir detection via parsed ID + max-known guards. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/LogSegmentRule.java | Implements log segment deletion decisions using active refs + cutoff + orphan-dir mode. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/LogManifestRule.java | Implements conservative manifest handling with opt-in deletion behavior. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/KvSnapshotFileRule.java | Implements KV snapshot file classification using active snapshot dir names. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/KvSharedSstRule.java | Implements “never delete” policy for shared SST files. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/FileRule.java | Defines the rule interface for single-file decisions. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/FileMeta.java | Adds immutable file metadata container for rule evaluation. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/Decision.java | Adds decision vocabulary for cleanup (DELETE/KEEP/DEFER/SKIP_UNKNOWN). |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/BucketActiveRefs.java | Adds immutable bucket-scoped active reference sets for log+kv+manifest paths. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/RpcErrorClassifier.java | Adds stable classification of RPC failures for audit/reporting logic. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/OrphanFilesCleanActionFactory.java | Adds factory for the orphan_files_clean action and CLI help text. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/OrphanFilesCleanAction.java | Adds the action runner that executes the Flink job and logs final stats. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/OrphanCleanUtils.java | Adds shared utilities (paths, remote dir resolution, safe listing helpers). |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/StatsAggregateOperator.java | Adds custom bounded operator to aggregate stats and run the empty-dir sweep. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/ScanAndCleanFunction.java | Implements stage-2 FS scan & cleanup with per-subtask rate limiting. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/OrphanFilesCleanJob.java | Builds and executes the 3-stage Flink batch DAG and returns final stats. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/OrphanDirCleanTask.java | Adds task type for cleaning an orphan table/partition directory. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/EmptyDirSweeper.java | Adds end-of-run empty directory reclamation logic (dry-run aware). |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/CleanTask.java | Adds marker interface for stage-1 emitted work items. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/CleanStats.java | Adds aggregatable stats object including “touched dirs” for sweeping. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/BucketCleanTask.java | Adds task type carrying bucket dirs + active refs for file-level cleanup. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/BucketCleaner.java | Adds per-bucket directory walker applying rules and safe deletion. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/fs/SafeDeleter.java | Centralizes deletion operations with dry-run + rate limiting + audit logging. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/config/OrphanCleanConfig.java | Adds CLI configuration parsing and validation for the orphan cleanup action. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/build/RpcListStatus.java | Adds shared per-target RPC list status representation. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/build/MaxKnownIdsTracker.java | Adds per-run max-known ID tracking used for orphan dir guards. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/build/LogActiveRefsFetchResult.java | Adds detailed per-target/per-bucket log manifest read results. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/build/KvActiveRefsFetchResult.java | Adds per-target KV active snapshot dir fetch result representation. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/build/ActiveRefsFetcher.java | Implements coordinator-RPC-driven active ref fetching with retries and parsing. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/audit/AuditLogger.java | Adds structured audit logger for the cleanup action. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/ActionLoader.java | Adds ServiceLoader-based action discovery and CLI dispatch. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/ActionFactory.java | Adds SPI interface for action factories. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/Action.java | Adds base action interface (build + run). |
| fluss-flink/fluss-flink-action/src/main/java/org/apache/fluss/flink/action/FlussFlinkActionEntrypoint.java | Adds the main entrypoint for the Flink action shaded jar. |
| fluss-flink/fluss-flink-action/pom.xml | Introduces new shaded Flink action jar module. |
| fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/action/orphan/Flink22OrphanFilesCleanITCase.java | Adds Flink 2.2 orphan cleanup IT case class. |
| fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java | Mirrors CLI adapter enhancements for Flink 2.2 variant. |
| fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/action/orphan/Flink20OrphanFilesCleanITCase.java | Adds Flink 1.20 orphan cleanup IT case class. |
| fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/action/orphan/Flink19OrphanFilesCleanITCase.java | Adds Flink 1.19 orphan cleanup IT case class. |
| fluss-filesystems/fluss-fs-hadoop/src/main/java/org/apache/fluss/fs/hdfs/HadoopFileStatus.java | Exposes HDFS modification time via FileStatus. |
| fluss-common/src/test/java/org/apache/fluss/fs/FileStatusTest.java | Adds test locking down fail-safe default for modification time. |
| fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java | Makes remote log metadata dir name constant public for reuse. |
| fluss-common/src/main/java/org/apache/fluss/fs/local/LocalFileStatus.java | Exposes local FS modification time via FileStatus. |
| fluss-common/src/main/java/org/apache/fluss/fs/FileStatus.java | Adds default getModificationTime() (fail-safe MAX_VALUE) to FileStatus interface. |
| fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java | Adds helper to convert TableBucket to PbTableBucket. |
| fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java | Implements client-side admin methods for the new list RPCs. |
| fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java | Extends Admin API with internal default methods for the new list RPCs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| throw new ApiException( | ||
| "Failed to resolve table " + tableId + ": " + e.getMessage()); | ||
| } |
| import java.util.List; | ||
| import java.util.Optional; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.Executors; |
| return new CompletedSnapshotStore( | ||
| MAX_SNAPSHOTS_TO_RETAIN, | ||
| new SharedKvFileRegistry(Executors.newSingleThreadExecutor()), | ||
| Collections.emptyList(), | ||
| new NoOpSnapshotHandleStore(), | ||
| Executors.newSingleThreadExecutor(), | ||
| inUseChecker); |
| throw new ApiException( | ||
| "Failed to resolve table " + tableId + ": " + e.getMessage()); | ||
| } |
| import java.util.List; | ||
| import java.util.Optional; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.Executors; |
| return new CompletedSnapshotStore( | ||
| MAX_SNAPSHOTS_TO_RETAIN, | ||
| new SharedKvFileRegistry(Executors.newSingleThreadExecutor()), | ||
| Collections.emptyList(), | ||
| new NoOpSnapshotHandleStore(), | ||
| Executors.newSingleThreadExecutor(), | ||
| inUseChecker); |
901dc41 to
0a34c82
Compare
luoyuxia
left a comment
There was a problem hiding this comment.
@platinumhamburg Thanks for the pr. Left minor comment. PTAL
| /** | ||
| * Parses a CLI cutoff value into an absolute epoch-ms timestamp. Empty input falls back to | ||
| * {@code now - defaultGap}. Explicit input must parse as {@code yyyy-MM-dd HH:mm:ss} in the | ||
| * server's local time zone and must be at least {@link #HARD_LOWER_BOUND} earlier than {@code |
There was a problem hiding this comment.
nit:
server make me feel like it's fluss server timezone, may clarify this as Flink action JVM local timezone
| FsPath dir = stack.pop(); | ||
| FileStatus[] children; | ||
| try { | ||
| children = fs.listStatus(dir); |
There was a problem hiding this comment.
nit: since shared sst will never be deleted, can we skill the list for shared sst path?
| stats.transform( | ||
| "StatsAggregate", | ||
| TypeInformation.of(new TypeHint<CleanStats>() {}), | ||
| new StatsAggregateOperator(config.dryRun())) |
There was a problem hiding this comment.
Should pass config.extraConfigs() to StatsAggregateOperator since seems it will also delete empty dir which may require may ak/sk
| SingleOutputStreamOperator<CleanTask> tasks = | ||
| trigger.process(new ScopeEnumeratorFunction(config)) | ||
| .returns(TypeInformation.of(new TypeHint<CleanTask>() {})) | ||
| .setParallelism(1) |
There was a problem hiding this comment.
also set maxParallelism(1) to enfore the parallelism to be 1 since user may overwrite the parallelism
| "StatsAggregate", | ||
| TypeInformation.of(new TypeHint<CleanStats>() {}), | ||
| new StatsAggregateOperator(config.dryRun())) | ||
| .setParallelism(1); |
There was a problem hiding this comment.
dito: also setMaxParallelism
| public static String resolveRemoteDataDir( | ||
| TableInfo tableInfo, | ||
| @Nullable PartitionInfo partitionInfo, | ||
| @Nullable String clusterRemoteDataDir) { |
There was a problem hiding this comment.
nit: just curious about when the clusterRemoteDataDir will be null
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<ListRemoteLogManifestsResponse> listRemoteLogManifests( |
There was a problem hiding this comment.
can we just move listRemoteLogManifests and listKvSnapshots to AdminGateway?
| // Resolve table metadata: prefer in-memory context, fallback to ZK assignment | ||
| // (single getData) to handle the race between createTable and context sync. | ||
| CoordinatorContext context = coordinatorContextSupplier.get(); | ||
| TablePath tablePath = context.getTablePathById(tableId); |
There was a problem hiding this comment.
not sure concurrent access in here should be considered as a problem, one thread(event thread) may put table to the map, but this thread get table from map.
| List<PartitionInfo> partitions = admin.listPartitionInfos(tablePath).get(); | ||
| TableInfo confirm = admin.getTableInfo(tablePath).get(); | ||
| if (confirm.getTableId() != tableInfo.getTableId()) { | ||
| audit.logSkipTable(dbState.dbName, tableName, "ABA"); |
There was a problem hiding this comment.
nit: ABA is pretty terse for an audit reason. This log is likely to be read by operators, and ABA does not explain what happened unless the reader knows the concurrency term.
Could we use something more explicit, e.g. table-recreated-during-enumeration or include the old/new table ids?
| /** | ||
| * Rule for shared SST files under the {@code shared/} KV directory. | ||
| * | ||
| * <p>Always returns {@link Decision#KEEP_ACTIVE}. The true active set for shared SSTs lives inside |
There was a problem hiding this comment.
I think we could theoretically clean shared/*.sst more precisely: after getting the active snapshots, read each snapshot's metadata file and build the set of live SST files referenced by those snapshots, then delete only shared SSTs that are not in that live set.
But I’m also fine with keeping all shared SSTs in this first version for simplicity and safety, especially if truly orphaned shared SSTs are expected to be relatively rare compared to log segment
swuferhong
left a comment
There was a problem hiding this comment.
Hi, @platinumhamburg Thanks for your great work, I left some comments.
| this.snapshotStoreManagerSupplier = | ||
| () -> coordinatorEventProcessorSupplier.get().completedSnapshotStoreManager(); | ||
| this.coordinatorContextSupplier = | ||
| () -> coordinatorEventProcessorSupplier.get().getCoordinatorContext(); |
There was a problem hiding this comment.
CoordinatorContext is annotated @NotThreadSafe. Every other method in CoordinatorService accesses it through an AccessContextEvent queued to the coordinator event thread. This code bypasses that safety mechanism, introducing a data race under concurrent table creation/deletion.
| * relying on FS LIST + mtime selection. | ||
| */ | ||
| @Override | ||
| public CompletableFuture<ListRemoteLogManifestsResponse> listRemoteLogManifests( |
There was a problem hiding this comment.
Why we directly response the class ListRemoteLogManifestsResponse to user?
| * partition. Used by the orphan cleanup action to construct the complete KV active set. | ||
| */ | ||
| @Override | ||
| public CompletableFuture<ListKvSnapshotsResponse> listKvSnapshots( |
| * @param partitionId optional partition id (null for non-partitioned tables) | ||
| * @return per-bucket manifest paths and end offsets | ||
| */ | ||
| @Internal |
There was a problem hiding this comment.
This @Internal annotation seems wired — the API key is marked as PUBLIC, but the method itself is @Internal. Shouldn't one of them be changed to make them consistent?
| */ | ||
| @Internal | ||
| default CompletableFuture<ListRemoteLogManifestsResponse> listRemoteLogManifests( | ||
| long tableId, @Nullable Long partitionId) { |
There was a problem hiding this comment.
Both listRemoteLogManifests and listKvSnapshots are scoped to a single (tableId, partitionId) target, so a partitioned table with P partitions forces P sequential RPCs from the Stage-1 enumerator (parallelism=1) per table.
For tables with hundreds or thousands of partitions this concentrates a large RPC burst on a single coordinator-side hot path, and Stage-1 becomes the cleanup job's bottleneck.
| return snapshotId; | ||
| } | ||
|
|
||
| public BucketSnapshot getBucketSnapshot() { |
There was a problem hiding this comment.
This method is never used.
| <name>Fluss : Flink : Action</name> | ||
|
|
||
| <properties> | ||
| <flink.minor.version>1.20.3</flink.minor.version> |
There was a problem hiding this comment.
Change to ${flink.version} ?
| import java.util.Optional; | ||
|
|
||
| /** Main entrypoint for the Fluss Flink action jar. Delegates to {@link ActionLoader}. */ | ||
| public class FlussFlinkActionEntrypoint { |
There was a problem hiding this comment.
Maybe we can named as Paimon, just call it FlinkActions? — it's shorter and stays product-neutral. If a Fluss prefix is desirable, at least drop the Entrypoint suffix; either FlussActions or FlussFlinkActions reads much cleaner than the current FlussFlinkActionEntrypoint.
| * (deletion silently failed — e.g. permissions, transient remote-store error). Callers | ||
| * should track {@code false} returns as delete failures in their run summary. | ||
| */ | ||
| public boolean deleteFile(FsPath file, Decision decision, RuleId ruleId) throws IOException { |
There was a problem hiding this comment.
IOException from fs.delete propagates all the way up and fails the entire batch job.
fs.delete(...) throws IOException
→ SafeDeleter.deleteFile / deleteEmptyDir (throws IOException, no try/catch around fs.delete)
→ BucketCleaner.clean / walkAndCleanDir (throws IOException, no try/catch around safeDeleter.deleteFile)
→ ScanAndCleanFunction.processBucketTask /
processOrphanDirTask (throws IOException)
→ ScanAndCleanFunction.processElement (throws Exception → Flink runtime)
→ EmptyDirSweeper.sweep / sweepOne (Stage-3 driver, same pattern)
Every layer just re-declares throws IOException — none of them actually catches it. The OrphanFilesCleanJob runs in RuntimeExecutionMode.BATCH, so a single uncaught IOException fails the subtask, and BATCH mode has no task-local restart — the entire job is rescheduled from scratch.
| * {@link FileSystem#delete} returned {@code false}. Callers should not increment a "deleted | ||
| * directory" counter when this returns {@code false}. | ||
| */ | ||
| public boolean deleteEmptyDir(FsPath dir) throws IOException { |
There was a problem hiding this comment.
ditto. IOException from fs.delete propagates all the way up and fails the entire batch job.
Purpose
Linked issue: close #3403 3403
Brief change log
Tests
API and Format
Documentation