[server] fix stop replica deletion stuck when TabletServer is offline#3391
[server] fix stop replica deletion stuck when TabletServer is offline#3391gyang94 wants to merge 4 commits into
Conversation
swuferhong
left a comment
There was a problem hiding this comment.
Hi, @gyang94 thanks for your contributuon, it's an important feature, I left some comments:
| } | ||
|
|
||
| @Nullable | ||
| public Set<TableBucketReplica> getDeletionReplicas() { |
There was a problem hiding this comment.
This method and this field both are not used. I think it need to be removed
| * coordinator → tablet-server RPCs (e.g., {@code NOTIFY_LEADER_AND_ISR}) once they are migrated to | ||
| * the same sender-thread retry layer. | ||
| */ | ||
| public enum ApiKey { |
There was a problem hiding this comment.
I don't think we need to introduce this new enum. We already have org.apache.fluss.rpc.protocol.ApiKeys in fluss-rpc, which is the canonical enum for all wire-protocol APIs and already includes STOP_REPLICA — along with all the other control-plane RPCs (NOTIFY_LEADER_AND_ISR, UPDATE_METADATA, NOTIFY_REMOTE_LOG_OFFSETS, etc.).
The new ApiKey only has a single member today, and as we migrate more control-plane RPCs onto the sender thread, its members would just duplicate the ones already in ApiKeys. That gives us two parallel "ApiKey" concepts to keep in sync, which seems unnecessary.
There's also no dependency concern: fluss-server already depends on fluss-rpc and uses ApiKeys elsewhere.
| .withDescription("The amount of time to sleep when fetch bucket error occurs.") | ||
| .withFallbackKeys("log.replica.fetch-backoff-interval"); | ||
|
|
||
| public static final ConfigOption<Duration> COORDINATOR_REQUEST_RETRY_BACKOFF = |
There was a problem hiding this comment.
Why are the two coordinator-related config options placed together with the log-related ones? Could we move them up to the server module section instead? Also, please add documentation for these options in configuration.md.
| .withFallbackKeys("log.replica.fetch-backoff-interval"); | ||
|
|
||
| public static final ConfigOption<Duration> COORDINATOR_REQUEST_RETRY_BACKOFF = | ||
| key("coordinator.request-retry.backoff-interval") |
There was a problem hiding this comment.
The two new options use inconsistent key structures:
coordinator.request-retry.backoff-interval — uses a request-retry segment
coordinator.request.timeout — uses a request segment
These are both about the same thing (control-plane requests from the coordinator), so they should share a common coordinator.request. prefix.
Right now request-retry introduces a separate sub-namespace that doesn't line up with request.timeout.Suggested rename to keep them under one prefix:
coordinator.request-retry.backoff-interval→ coordinator.request.retry-backoff
coordinator.request.timeout → keep as is
| .withDescription( | ||
| "The backoff duration the coordinator waits before retrying a " | ||
| + "control-plane request to a tablet server after a " | ||
| + "transient RPC-layer failure. Mirrors Kafka's " |
There was a problem hiding this comment.
Mirrors Kafka's ControllerChannelManager retry backoff (hardcoded 100ms) -> Suggest removing this part — there's no need to mention it here.
| } | ||
|
|
||
| @VisibleForTesting | ||
| Map<Integer, TabletServerChannelState> getChannelStates() { |
| assertThat(getGaugeValue(metricGroup, MetricNames.SENDER_ALIVE)).isEqualTo(1); | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") |
There was a problem hiding this comment.
Redundant suppression, can be remove.
| this.metricGroup = metricGroup; | ||
| } | ||
|
|
||
| public int getTabletServerId() { |
| return tabletServerId; | ||
| } | ||
|
|
||
| public ServerNode getServerNode() { |
| public void removeTabletServer(Integer serverId) { | ||
| synchronized (channelLock) { | ||
| TabletServerChannelState state = channelStates.remove(serverId); | ||
| teardownChannelState(serverId, state); |
There was a problem hiding this comment.
Maybe this need to remove outside of the lock. ShutdownableThread.shutdown() blocks until the thread is fully joined, and we call it inside the synchronized (channelLock) block. The same lock guards three hot paths:
- the SENDER_TOTAL_QUEUE_SIZE gauge lambda — read by the metric-reporter thread;
- sendStopBucketReplicaRequest when it looks up the channel state
- getChannelStates()
So for as long as the join takes, metric reporting and control-plane sends to all other tablet servers are blocked on a single TS teardown.
Purpose
Linked issue: close #3357
Brief change log
Summary
When a
stopReplicaRPC fails due to transient network issues or a TabletServer crash, the Coordinator has no reliable retry mechanism. This causes replicas to get stuck and table deletion to never complete, resulting in thetableCountmetric never decreasing.This PR introduces a per-TabletServer sender thread model (aligned with Kafka's
ControllerChannelManager/RequestSendThread) and a newReplicaDeletionIneligiblestate. These changes provide robust retry and pause/resume semantics for replica deletion.Changes
Core: Per-TS Sender Thread (
ControlRequestSendThread)TableManager.resumeDeletions()implements 3-step logic:processNewTabletServer()clears ineligible marks and triggersresumeDeletions(), so paused deletions automatically resume when a TS reconnects.processDeadTabletServer()transitions in-flight deletion replicas toineligible.Config
coordinator.request.retry.backoff: Backoff between retries (default:100ms).coordinator.request.timeout: RPC timeout per attempt (default:30s).️ What was removed
retryDeleteAndSuccessDeleteReplicas(): The old "retry-N-then-force-success" mechanism.failDeleteNumberstracking andDELETE_TRY_TIMESconstant.CoordinatorRequestBatch(replaced by queue-based dispatch).Tests
API and Format
Documentation