Skip to content

[server] fix stop replica deletion stuck when TabletServer is offline#3391

Open
gyang94 wants to merge 4 commits into
apache:mainfrom
gyang94:per-sender-retry
Open

[server] fix stop replica deletion stuck when TabletServer is offline#3391
gyang94 wants to merge 4 commits into
apache:mainfrom
gyang94:per-sender-retry

Conversation

@gyang94
Copy link
Copy Markdown
Contributor

@gyang94 gyang94 commented May 27, 2026

Purpose

Linked issue: close #3357

Brief change log

Summary

When a stopReplica RPC fails due to transient network issues or a TabletServer crash, the Coordinator has no reliable retry mechanism. This causes replicas to get stuck and table deletion to never complete, resulting in the tableCount metric never decreasing.

This PR introduces a per-TabletServer sender thread model (aligned with Kafka's ControllerChannelManager / RequestSendThread) and a new ReplicaDeletionIneligible state. These changes provide robust retry and pause/resume semantics for replica deletion.

Changes

Core: Per-TS Sender Thread (ControlRequestSendThread)

  • Dedicated Sender Thread: Each TabletServer gets a dedicated sender thread with a FIFO queue.
  • New Replica State: Introduced a state for replicas whose deletion cannot proceed (e.g., TS offline or returned a business error).
  • Resume Logic: TableManager.resumeDeletions() implements 3-step logic:
    1. Complete if all replicas succeeded.
    2. Retry previously-ineligible replicas on alive TSes.
    3. Re-fire eligible tables.
  • Auto-Resume on Reconnect: processNewTabletServer() clears ineligible marks and triggers resumeDeletions(), so paused deletions automatically resume when a TS reconnects.
  • Handle Dead TS: processDeadTabletServer() transitions in-flight deletion replicas to ineligible.

Config

  • coordinator.request.retry.backoff: Backoff between retries (default: 100ms).
  • coordinator.request.timeout: RPC timeout per attempt (default: 30s).

️ What was removed

  • retryDeleteAndSuccessDeleteReplicas(): The old "retry-N-then-force-success" mechanism.
  • failDeleteNumbers tracking and DELETE_TRY_TIMES constant.
  • Direct RPC calls from CoordinatorRequestBatch (replaced by queue-based dispatch).

Tests

API and Format

Documentation

Copy link
Copy Markdown
Contributor

@swuferhong swuferhong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @gyang94 thanks for your contributuon, it's an important feature, I left some comments:

}

@Nullable
public Set<TableBucketReplica> getDeletionReplicas() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method and this field both are not used. I think it need to be removed

* coordinator → tablet-server RPCs (e.g., {@code NOTIFY_LEADER_AND_ISR}) once they are migrated to
* the same sender-thread retry layer.
*/
public enum ApiKey {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to introduce this new enum. We already have org.apache.fluss.rpc.protocol.ApiKeys in fluss-rpc, which is the canonical enum for all wire-protocol APIs and already includes STOP_REPLICA — along with all the other control-plane RPCs (NOTIFY_LEADER_AND_ISR, UPDATE_METADATA, NOTIFY_REMOTE_LOG_OFFSETS, etc.).

The new ApiKey only has a single member today, and as we migrate more control-plane RPCs onto the sender thread, its members would just duplicate the ones already in ApiKeys. That gives us two parallel "ApiKey" concepts to keep in sync, which seems unnecessary.

There's also no dependency concern: fluss-server already depends on fluss-rpc and uses ApiKeys elsewhere.

.withDescription("The amount of time to sleep when fetch bucket error occurs.")
.withFallbackKeys("log.replica.fetch-backoff-interval");

public static final ConfigOption<Duration> COORDINATOR_REQUEST_RETRY_BACKOFF =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the two coordinator-related config options placed together with the log-related ones? Could we move them up to the server module section instead? Also, please add documentation for these options in configuration.md.

.withFallbackKeys("log.replica.fetch-backoff-interval");

public static final ConfigOption<Duration> COORDINATOR_REQUEST_RETRY_BACKOFF =
key("coordinator.request-retry.backoff-interval")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two new options use inconsistent key structures:
coordinator.request-retry.backoff-interval — uses a request-retry segment
coordinator.request.timeout — uses a request segment

These are both about the same thing (control-plane requests from the coordinator), so they should share a common coordinator.request. prefix.

Right now request-retry introduces a separate sub-namespace that doesn't line up with request.timeout.Suggested rename to keep them under one prefix:
coordinator.request-retry.backoff-intervalcoordinator.request.retry-backoff
coordinator.request.timeout → keep as is

.withDescription(
"The backoff duration the coordinator waits before retrying a "
+ "control-plane request to a tablet server after a "
+ "transient RPC-layer failure. Mirrors Kafka's "
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mirrors Kafka's ControllerChannelManager retry backoff (hardcoded 100ms) -> Suggest removing this part — there's no need to mention it here.

}

@VisibleForTesting
Map<Integer, TabletServerChannelState> getChannelStates() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No use. pls remove it.

assertThat(getGaugeValue(metricGroup, MetricNames.SENDER_ALIVE)).isEqualTo(1);
}

@SuppressWarnings("unchecked")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant suppression, can be remove.

this.metricGroup = metricGroup;
}

public int getTabletServerId() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No user. remove it.

return tabletServerId;
}

public ServerNode getServerNode() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No user. remove it.

public void removeTabletServer(Integer serverId) {
synchronized (channelLock) {
TabletServerChannelState state = channelStates.remove(serverId);
teardownChannelState(serverId, state);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this need to remove outside of the lock. ShutdownableThread.shutdown() blocks until the thread is fully joined, and we call it inside the synchronized (channelLock) block. The same lock guards three hot paths:

  • the SENDER_TOTAL_QUEUE_SIZE gauge lambda — read by the metric-reporter thread;
  • sendStopBucketReplicaRequest when it looks up the channel state
  • getChannelStates()

So for as long as the join takes, metric reporting and control-plane sends to all other tablet servers are blocked on a single TS teardown.

@gyang94 gyang94 force-pushed the per-sender-retry branch from caaaebb to a490b4c Compare June 2, 2026 10:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[server] Table deletion stuck permanently when StopReplica request fails

2 participants