From b6086b2f4e183c10a7d288aad11dd1c7a8669512 Mon Sep 17 00:00:00 2001 From: OpenAI Codex Date: Thu, 28 May 2026 05:53:30 +0000 Subject: [PATCH] Subscription: add topic owner epoch fencing --- .../org/apache/iotdb/rpc/TSStatusCode.java | 5 + .../subscription/config/ConsumerConfig.java | 10 ++ .../subscription/config/ConsumerConstant.java | 2 + .../subscription/config/TopicConstant.java | 4 + .../SubscriptionOwnerFencedException.java | 45 +++++ .../base/AbstractSubscriptionConsumer.java | 22 +++ .../AbstractSubscriptionConsumerBuilder.java | 23 +++ .../base/AbstractSubscriptionProvider.java | 24 +++ .../table/SubscriptionTableProvider.java | 4 + .../table/SubscriptionTablePullConsumer.java | 4 + .../SubscriptionTablePullConsumerBuilder.java | 18 ++ .../table/SubscriptionTablePushConsumer.java | 4 + .../SubscriptionTablePushConsumerBuilder.java | 18 ++ .../tree/SubscriptionTreeProvider.java | 4 + .../tree/SubscriptionTreePullConsumer.java | 21 +++ .../SubscriptionTreePullConsumerBuilder.java | 18 ++ .../tree/SubscriptionTreePushConsumer.java | 21 +++ .../SubscriptionTreePushConsumerBuilder.java | 18 ++ .../subscription/SubscriptionInfo.java | 8 + .../subscription/SubscriptionInfoTest.java | 70 ++++++++ .../agent/SubscriptionTopicAgent.java | 76 ++++++++ .../receiver/SubscriptionReceiverV1.java | 57 ++++++ .../receiver/SubscriptionReceiverV1Test.java | 94 ++++++++++ .../subscription/meta/topic/TopicMeta.java | 167 +++++++++++++++++- .../subscription/topic/TopicDeSerTest.java | 61 +++++++ 25 files changed, 796 insertions(+), 2 deletions(-) create mode 100644 iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionOwnerFencedException.java create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfoTest.java diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index e5014681fa724..97a70963d720c 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -316,6 +316,11 @@ public enum TSStatusCode { SHOW_SUBSCRIPTION_ERROR(1910), SUBSCRIPTION_PIPE_TIMEOUT_ERROR(1911), SUBSCRIPTION_NOT_ENABLED_ERROR(1912), + SUBSCRIPTION_OWNER_FENCED(1913), + SUBSCRIPTION_OWNER_REQUIRED(1914), + SUBSCRIPTION_OWNER_EPOCH_REQUIRED(1915), + SUBSCRIPTION_OWNER_LEASE_EXPIRED(1916), + SUBSCRIPTION_OWNER_EPOCH_CONFLICT(1917), // Topic CREATE_TOPIC_ERROR(2000), diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java index 3cb0087d6827e..13f2a9ee3fb03 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java @@ -68,6 +68,16 @@ public String getConsumerGroupId() { return getString(ConsumerConstant.CONSUMER_GROUP_ID_KEY); } + public String getOwnerId() { + return getString(ConsumerConstant.OWNER_ID_KEY); + } + + public Long getOwnerEpoch() { + return hasAttribute(ConsumerConstant.OWNER_EPOCH_KEY) + ? getLong(ConsumerConstant.OWNER_EPOCH_KEY) + : null; + } + public String getUsername() { return getString(ConsumerConstant.USERNAME_KEY); } diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java index 90d2ea7a01fb0..3df95facf367c 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java @@ -40,6 +40,8 @@ public class ConsumerConstant { public static final String CONSUMER_ID_KEY = "consumer-id"; public static final String CONSUMER_GROUP_ID_KEY = "group-id"; + public static final String OWNER_ID_KEY = "owner-id"; + public static final String OWNER_EPOCH_KEY = "owner-epoch"; public static final String HEARTBEAT_INTERVAL_MS_KEY = "heartbeat-interval-ms"; public static final long HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE = 30_000L; diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java index 52c8e4de75221..73036684ccde9 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java @@ -61,6 +61,10 @@ public class TopicConstant { public static final String STRICT_KEY = "strict"; public static final String STRICT_DEFAULT_VALUE = "true"; + public static final String OWNER_ID_KEY = "owner-id"; + public static final String OWNER_EPOCH_KEY = "owner-epoch"; + public static final String OWNER_LEASE_EXPIRE_TIME_MS_KEY = "owner-lease-expire-time-ms"; + private TopicConstant() { throw new IllegalStateException(SubscriptionMessages.UTILITY_CLASS); } diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionOwnerFencedException.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionOwnerFencedException.java new file mode 100644 index 0000000000000..f00c3ba1dcb0c --- /dev/null +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionOwnerFencedException.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rpc.subscription.exception; + +import java.util.Objects; + +public class SubscriptionOwnerFencedException extends SubscriptionRuntimeNonCriticalException { + + public SubscriptionOwnerFencedException(final String message) { + super(message); + } + + public SubscriptionOwnerFencedException(final String message, final Throwable cause) { + super(message, cause); + } + + @Override + public boolean equals(final Object obj) { + return obj instanceof SubscriptionOwnerFencedException + && Objects.equals(getMessage(), ((SubscriptionOwnerFencedException) obj).getMessage()) + && Objects.equals(getTimeStamp(), ((SubscriptionOwnerFencedException) obj).getTimeStamp()); + } + + @Override + public int hashCode() { + return super.hashCode(); + } +} diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java index 63e5a263f22b3..57c78dbb4d0c7 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java @@ -105,6 +105,8 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { protected String consumerId; protected String consumerGroupId; + protected String ownerId; + protected Long ownerEpoch; private final long heartbeatIntervalMs; private final long endpointsSyncIntervalMs; @@ -154,6 +156,14 @@ public String getConsumerGroupId() { return consumerGroupId; } + public String getOwnerId() { + return ownerId; + } + + public Long getOwnerEpoch() { + return ownerEpoch; + } + /////////////////////////////// ctor /////////////////////////////// protected AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder) { @@ -183,6 +193,8 @@ protected AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder this.consumerId = builder.consumerId; this.consumerGroupId = builder.consumerGroupId; + this.ownerId = builder.ownerId; + this.ownerEpoch = builder.ownerEpoch; this.heartbeatIntervalMs = builder.heartbeatIntervalMs; this.endpointsSyncIntervalMs = builder.endpointsSyncIntervalMs; @@ -213,6 +225,8 @@ protected AbstractSubscriptionConsumer( .encryptedPassword((String) properties.get(ConsumerConstant.ENCRYPTED_PASSWORD_KEY)) .consumerId((String) properties.get(ConsumerConstant.CONSUMER_ID_KEY)) .consumerGroupId((String) properties.get(ConsumerConstant.CONSUMER_GROUP_ID_KEY)) + .ownerId((String) properties.get(ConsumerConstant.OWNER_ID_KEY)) + .ownerEpoch((Long) properties.get(ConsumerConstant.OWNER_EPOCH_KEY)) .heartbeatIntervalMs( (Long) properties.getOrDefault( @@ -394,6 +408,8 @@ protected abstract AbstractSubscriptionProvider constructSubscriptionProvider( final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs); @@ -408,6 +424,8 @@ AbstractSubscriptionProvider constructProviderAndHandshake(final TEndPoint endPo this.encryptedPassword, this.consumerId, this.consumerGroupId, + this.ownerId, + this.ownerEpoch, this.thriftMaxFrameSize, this.heartbeatIntervalMs, this.connectionTimeoutInMs); @@ -1429,6 +1447,8 @@ protected Map coreReportMessage() { final Map result = new HashMap<>(); result.put("consumerId", consumerId); result.put("consumerGroupId", consumerGroupId); + result.put("ownerId", ownerId); + result.put("ownerEpoch", String.valueOf(ownerEpoch)); result.put("isClosed", isClosed.toString()); result.put("fileSaveDir", fileSaveDir); result.put( @@ -1443,6 +1463,8 @@ protected Map allReportMessage() { final Map result = new HashMap<>(); result.put("consumerId", consumerId); result.put("consumerGroupId", consumerGroupId); + result.put("ownerId", ownerId); + result.put("ownerEpoch", String.valueOf(ownerEpoch)); result.put("heartbeatIntervalMs", String.valueOf(heartbeatIntervalMs)); result.put("endpointsSyncIntervalMs", String.valueOf(endpointsSyncIntervalMs)); result.put("providers", providers.toString()); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java index 991d237ed2b89..a0c4b421ed18b 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java @@ -40,6 +40,8 @@ public class AbstractSubscriptionConsumerBuilder { protected String consumerId; protected String consumerGroupId; + protected String ownerId; + protected Long ownerEpoch; protected long heartbeatIntervalMs = ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE; protected long endpointsSyncIntervalMs = @@ -111,6 +113,27 @@ public AbstractSubscriptionConsumerBuilder consumerGroupId( return this; } + public AbstractSubscriptionConsumerBuilder ownerId(@Nullable final String ownerId) { + if (Objects.isNull(ownerId)) { + return this; + } + this.ownerId = ownerId; + return this; + } + + public AbstractSubscriptionConsumerBuilder ownerEpoch(final long ownerEpoch) { + this.ownerEpoch = ownerEpoch; + return this; + } + + public AbstractSubscriptionConsumerBuilder ownerEpoch(@Nullable final Long ownerEpoch) { + if (Objects.isNull(ownerEpoch)) { + return this; + } + this.ownerEpoch = ownerEpoch; + return this; + } + public AbstractSubscriptionConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) { this.heartbeatIntervalMs = Math.max(heartbeatIntervalMs, ConsumerConstant.HEARTBEAT_INTERVAL_MS_MIN_VALUE); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java index 413c609abbff3..3751d019fe779 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java @@ -27,6 +27,7 @@ import org.apache.iotdb.rpc.subscription.config.TopicConfig; import org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException; import org.apache.iotdb.rpc.subscription.exception.SubscriptionException; +import org.apache.iotdb.rpc.subscription.exception.SubscriptionOwnerFencedException; import org.apache.iotdb.rpc.subscription.exception.SubscriptionPipeTimeoutException; import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeCriticalException; import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeNonCriticalException; @@ -81,6 +82,8 @@ public abstract class AbstractSubscriptionProvider { private String consumerId; private String consumerGroupId; + private final String ownerId; + private final Long ownerEpoch; private final AtomicBoolean isClosed = new AtomicBoolean(true); private final AtomicBoolean isAvailable = new AtomicBoolean(false); @@ -109,6 +112,8 @@ protected AbstractSubscriptionProvider( final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -124,6 +129,8 @@ protected AbstractSubscriptionProvider( this.endPoint = endPoint; this.consumerId = consumerId; this.consumerGroupId = consumerGroupId; + this.ownerId = ownerId; + this.ownerEpoch = ownerEpoch; this.username = username; this.password = password; this.encryptedPassword = encryptedPassword; @@ -176,6 +183,12 @@ synchronized void handshake() throws SubscriptionException, IoTDBConnectionExcep final Map consumerAttributes = new HashMap<>(); consumerAttributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, consumerGroupId); consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId); + if (ownerId != null) { + consumerAttributes.put(ConsumerConstant.OWNER_ID_KEY, ownerId); + } + if (ownerEpoch != null) { + consumerAttributes.put(ConsumerConstant.OWNER_EPOCH_KEY, String.valueOf(ownerEpoch)); + } consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username); consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password); if (encryptedPassword != null) { @@ -451,6 +464,17 @@ private static void verifyPipeSubscribeSuccess(final TSStatus status) case 1911: // SUBSCRIPTION_PIPE_TIMEOUT_ERROR throw new SubscriptionPipeTimeoutException( String.format(SUBSCRIPTION_PIPE_TIMEOUT_FORMATTER, status.code, status.message)); + case 1913: // SUBSCRIPTION_OWNER_FENCED + case 1914: // SUBSCRIPTION_OWNER_REQUIRED + case 1915: // SUBSCRIPTION_OWNER_EPOCH_REQUIRED + case 1916: // SUBSCRIPTION_OWNER_LEASE_EXPIRED + case 1917: // SUBSCRIPTION_OWNER_EPOCH_CONFLICT + { + final String errorMessage = + String.format(INTERNAL_ERROR_FORMATTER, status.code, status.message); + LOGGER.warn(errorMessage); + throw new SubscriptionOwnerFencedException(errorMessage); + } case 1900: // SUBSCRIPTION_VERSION_ERROR case 1901: // SUBSCRIPTION_TYPE_ERROR case 1909: // SUBSCRIPTION_MISSING_CUSTOMER diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java index 84470d283c21b..ff67e3e532c3d 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java @@ -33,6 +33,8 @@ final class SubscriptionTableProvider extends AbstractSubscriptionProvider { final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -43,6 +45,8 @@ final class SubscriptionTableProvider extends AbstractSubscriptionProvider { encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java index 8f712782fb5f0..4c390c96420e3 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java @@ -45,6 +45,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -55,6 +57,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java index 939228a7f49e9..efc99debb4977 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java @@ -74,6 +74,24 @@ public SubscriptionTablePullConsumerBuilder consumerGroupId(final String consume return this; } + @Override + public SubscriptionTablePullConsumerBuilder ownerId(final String ownerId) { + super.ownerId(ownerId); + return this; + } + + @Override + public SubscriptionTablePullConsumerBuilder ownerEpoch(final long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + + @Override + public SubscriptionTablePullConsumerBuilder ownerEpoch(final Long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + @Override public SubscriptionTablePullConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) { super.heartbeatIntervalMs(heartbeatIntervalMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java index e90afc1d8d175..4c85993a933dc 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java @@ -41,6 +41,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -51,6 +53,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java index 27bf328fea9e6..143b34056af9e 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java @@ -76,6 +76,24 @@ public SubscriptionTablePushConsumerBuilder consumerGroupId(final String consume return this; } + @Override + public SubscriptionTablePushConsumerBuilder ownerId(final String ownerId) { + super.ownerId(ownerId); + return this; + } + + @Override + public SubscriptionTablePushConsumerBuilder ownerEpoch(final long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + + @Override + public SubscriptionTablePushConsumerBuilder ownerEpoch(final Long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + @Override public SubscriptionTablePushConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) { super.heartbeatIntervalMs(heartbeatIntervalMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java index 3589fbbcf749a..8720c577d4dfa 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java @@ -33,6 +33,8 @@ final class SubscriptionTreeProvider extends AbstractSubscriptionProvider { final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -43,6 +45,8 @@ final class SubscriptionTreeProvider extends AbstractSubscriptionProvider { encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java index 7225036aaa4cf..d4d615130625b 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java @@ -52,6 +52,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -62,6 +64,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); @@ -85,6 +89,8 @@ private SubscriptionTreePullConsumer(final SubscriptionTreePullConsumer.Builder .encryptedPassword(builder.encryptedPassword) .consumerId(builder.consumerId) .consumerGroupId(builder.consumerGroupId) + .ownerId(builder.ownerId) + .ownerEpoch(builder.ownerEpoch) .heartbeatIntervalMs(builder.heartbeatIntervalMs) .endpointsSyncIntervalMs(builder.endpointsSyncIntervalMs) .fileSaveDir(builder.fileSaveDir) @@ -238,6 +244,8 @@ public static class Builder { private String consumerId; private String consumerGroupId; + private String ownerId; + private Long ownerEpoch; private long heartbeatIntervalMs = ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE; private long endpointsSyncIntervalMs = @@ -299,6 +307,19 @@ public Builder consumerGroupId(@Nullable final String consumerGroupId) { return this; } + public Builder ownerId(@Nullable final String ownerId) { + if (Objects.isNull(ownerId)) { + return this; + } + this.ownerId = ownerId; + return this; + } + + public Builder ownerEpoch(final long ownerEpoch) { + this.ownerEpoch = ownerEpoch; + return this; + } + public Builder heartbeatIntervalMs(final long heartbeatIntervalMs) { this.heartbeatIntervalMs = Math.max(heartbeatIntervalMs, ConsumerConstant.HEARTBEAT_INTERVAL_MS_MIN_VALUE); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java index cbceb95d77f90..2d057b7bfbd80 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java @@ -74,6 +74,24 @@ public SubscriptionTreePullConsumerBuilder consumerGroupId(final String consumer return this; } + @Override + public SubscriptionTreePullConsumerBuilder ownerId(final String ownerId) { + super.ownerId(ownerId); + return this; + } + + @Override + public SubscriptionTreePullConsumerBuilder ownerEpoch(final long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + + @Override + public SubscriptionTreePullConsumerBuilder ownerEpoch(final Long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + @Override public SubscriptionTreePullConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) { super.heartbeatIntervalMs(heartbeatIntervalMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java index 4d8a5ef3e169f..8fbc33d14ce8a 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java @@ -51,6 +51,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -61,6 +63,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); @@ -84,6 +88,8 @@ private SubscriptionTreePushConsumer(final Builder builder) { .encryptedPassword(builder.encryptedPassword) .consumerId(builder.consumerId) .consumerGroupId(builder.consumerGroupId) + .ownerId(builder.ownerId) + .ownerEpoch(builder.ownerEpoch) .heartbeatIntervalMs(builder.heartbeatIntervalMs) .endpointsSyncIntervalMs(builder.endpointsSyncIntervalMs) .fileSaveDir(builder.fileSaveDir) @@ -192,6 +198,8 @@ public static class Builder { private String consumerId; private String consumerGroupId; + private String ownerId; + private Long ownerEpoch; private long heartbeatIntervalMs = ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE; private long endpointsSyncIntervalMs = @@ -256,6 +264,19 @@ public Builder consumerGroupId(@Nullable final String consumerGroupId) { return this; } + public Builder ownerId(@Nullable final String ownerId) { + if (Objects.isNull(ownerId)) { + return this; + } + this.ownerId = ownerId; + return this; + } + + public Builder ownerEpoch(final long ownerEpoch) { + this.ownerEpoch = ownerEpoch; + return this; + } + public Builder heartbeatIntervalMs(final long heartbeatIntervalMs) { this.heartbeatIntervalMs = Math.max(heartbeatIntervalMs, ConsumerConstant.HEARTBEAT_INTERVAL_MS_MIN_VALUE); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java index 86594433e77e0..9067f3e7ae564 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java @@ -76,6 +76,24 @@ public SubscriptionTreePushConsumerBuilder consumerGroupId(final String consumer return this; } + @Override + public SubscriptionTreePushConsumerBuilder ownerId(final String ownerId) { + super.ownerId(ownerId); + return this; + } + + @Override + public SubscriptionTreePushConsumerBuilder ownerEpoch(final long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + + @Override + public SubscriptionTreePushConsumerBuilder ownerEpoch(final Long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + @Override public SubscriptionTreePushConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) { super.heartbeatIntervalMs(heartbeatIntervalMs); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java index 6fcdcf28ebd0f..1849823888f0a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java @@ -338,6 +338,14 @@ public TSStatus alterTopic(AlterTopicPlan plan) { } private TSStatus alterTopicInternal(final AlterTopicPlan plan) { + try { + TopicMeta.validateOwnerProgression( + topicMetaKeeper.getTopicMeta(plan.getTopicMeta().getTopicName()), plan.getTopicMeta()); + } catch (final IllegalArgumentException e) { + return new TSStatus(TSStatusCode.SUBSCRIPTION_OWNER_EPOCH_CONFLICT.getStatusCode()) + .setMessage(e.getMessage()); + } + topicMetaKeeper.removeTopicMeta(plan.getTopicMeta().getTopicName()); topicMetaKeeper.addTopicMeta(plan.getTopicMeta().getTopicName(), plan.getTopicMeta()); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfoTest.java new file mode 100644 index 0000000000000..768eab6ea50d8 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfoTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.persistence.subscription; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; +import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.AlterTopicPlan; +import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.CreateTopicPlan; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public class SubscriptionInfoTest { + + @Test + public void testAlterTopicRejectsOwnerEpochRollback() { + final String topicName = "topic-" + UUID.randomUUID(); + final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(); + + final TopicMeta initialTopicMeta = createTopicMeta(topicName, "sn1", 5L); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + subscriptionInfo.createTopic(new CreateTopicPlan(initialTopicMeta)).getCode()); + + final TopicMeta transferredTopicMeta = initialTopicMeta.deepCopy(); + transferredTopicMeta.transferOwner("sn2", 6L); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + subscriptionInfo.alterTopic(new AlterTopicPlan(transferredTopicMeta)).getCode()); + + final TSStatus rollbackStatus = + subscriptionInfo.alterTopic(new AlterTopicPlan(createTopicMeta(topicName, "sn1", 5L))); + + Assert.assertEquals( + TSStatusCode.SUBSCRIPTION_OWNER_EPOCH_CONFLICT.getStatusCode(), rollbackStatus.getCode()); + Assert.assertEquals("sn2", subscriptionInfo.getTopicMeta(topicName).getOwnerId()); + Assert.assertEquals(6L, subscriptionInfo.getTopicMeta(topicName).getOwnerEpoch()); + } + + private TopicMeta createTopicMeta( + final String topicName, final String ownerId, final long ownerEpoch) { + final Map topicAttributes = new HashMap<>(); + topicAttributes.put(TopicConstant.OWNER_ID_KEY, ownerId); + topicAttributes.put(TopicConstant.OWNER_EPOCH_KEY, String.valueOf(ownerEpoch)); + return new TopicMeta(topicName, 1, topicAttributes); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java index 9b629ab9c8dde..129ba3cb2a34f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java @@ -19,10 +19,14 @@ package org.apache.iotdb.db.subscription.agent; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; import org.apache.iotdb.commons.subscription.meta.topic.TopicMetaKeeper; import org.apache.iotdb.db.i18n.DataNodeMiscMessages; import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaRespExceptionMessage; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.rpc.subscription.config.ConsumerConfig; import org.apache.iotdb.rpc.subscription.config.TopicConfig; import org.apache.iotdb.rpc.subscription.config.TopicConstant; @@ -31,6 +35,7 @@ import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -87,6 +92,8 @@ public TPushTopicMetaRespExceptionMessage handleSingleTopicMetaChanges( private void handleSingleTopicMetaChangesInternal(final TopicMeta metaFromCoordinator) { final String topicName = metaFromCoordinator.getTopicName(); + TopicMeta.validateOwnerProgression( + topicMetaKeeper.getTopicMeta(topicName), metaFromCoordinator); topicMetaKeeper.removeTopicMeta(topicName); topicMetaKeeper.addTopicMeta(topicName, metaFromCoordinator); } @@ -188,4 +195,73 @@ public Map getTopicConfigs(final Set topicNames) { releaseReadLock(); } } + + public TSStatus checkTopicOwner(final ConsumerConfig consumerConfig, final String topicName) { + acquireReadLock(); + try { + if (!topicMetaKeeper.containsTopicMeta(topicName)) { + return RpcUtils.SUCCESS_STATUS; + } + + final TopicMeta topicMeta = topicMetaKeeper.getTopicMeta(topicName); + if (!topicMeta.isOwnerFencingEnabled()) { + return RpcUtils.SUCCESS_STATUS; + } + + final String requestOwnerId = consumerConfig.getOwnerId(); + if (Objects.isNull(requestOwnerId)) { + return RpcUtils.getStatus( + TSStatusCode.SUBSCRIPTION_OWNER_REQUIRED, + String.format( + "Subscription: topic %s enables owner fencing, but consumer %s does not carry owner-id.", + topicName, consumerConfig)); + } + + final Long requestOwnerEpoch = consumerConfig.getOwnerEpoch(); + if (Objects.isNull(requestOwnerEpoch)) { + return RpcUtils.getStatus( + TSStatusCode.SUBSCRIPTION_OWNER_EPOCH_REQUIRED, + String.format( + "Subscription: topic %s enables owner fencing, but consumer %s does not carry owner-epoch.", + topicName, consumerConfig)); + } + + if (Objects.nonNull(topicMeta.getOwnerLeaseExpireTimeMs()) + && System.currentTimeMillis() > topicMeta.getOwnerLeaseExpireTimeMs()) { + return RpcUtils.getStatus( + TSStatusCode.SUBSCRIPTION_OWNER_LEASE_EXPIRED, + String.format( + "Subscription: owner lease for topic %s has expired, owner-id: %s, owner-epoch: %s.", + topicName, topicMeta.getOwnerId(), topicMeta.getOwnerEpoch())); + } + + if (!topicMeta.matchesOwner(requestOwnerId, requestOwnerEpoch)) { + return RpcUtils.getStatus( + TSStatusCode.SUBSCRIPTION_OWNER_FENCED, + String.format( + "Subscription: consumer owner is fenced for topic %s, request owner-id: %s," + + " request owner-epoch: %s, current owner-id: %s, current owner-epoch: %s.", + topicName, + requestOwnerId, + requestOwnerEpoch, + topicMeta.getOwnerId(), + topicMeta.getOwnerEpoch())); + } + + return RpcUtils.SUCCESS_STATUS; + } finally { + releaseReadLock(); + } + } + + public TSStatus checkTopicOwners( + final ConsumerConfig consumerConfig, final Iterable topicNames) { + for (final String topicName : topicNames) { + final TSStatus status = checkTopicOwner(consumerConfig, topicName); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } + } + return RpcUtils.SUCCESS_STATUS; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java index 2c4accecf45f0..fe08ffc43ca39 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java @@ -326,6 +326,16 @@ private TPipeSubscribeResp handlePipeSubscribeHeartbeatInternal( } // TODO: do something + final TSStatus ownerStatus = + SubscriptionAgent.topic() + .checkTopicOwners( + consumerConfig, + SubscriptionAgent.consumer() + .getTopicNamesSubscribedByConsumer( + consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId())); + if (ownerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(ownerStatus); + } LOGGER.info(DataNodeMiscMessages.SUBSCRIPTION_CONSUMER_HEARTBEAT_SUCCESS, consumerConfig); @@ -406,6 +416,11 @@ private TPipeSubscribeResp handlePipeSubscribeSubscribeInternal( // subscribe topics final Set topicNames = req.getTopicNames(); + final TSStatus ownerStatus = + SubscriptionAgent.topic().checkTopicOwners(consumerConfig, topicNames); + if (ownerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribeSubscribeResp.toTPipeSubscribeResp(ownerStatus); + } subscribe(consumerConfig, topicNames); LOGGER.info( @@ -498,16 +513,48 @@ private TPipeSubscribeResp handlePipeSubscribePollInternal(final PipeSubscribePo if (SubscriptionPollRequestType.isValidatedRequestType(requestType)) { switch (SubscriptionPollRequestType.valueOf(requestType)) { case POLL: + final Set pollTopicNames = ((PollPayload) request.getPayload()).getTopicNames(); + final Set subscribedTopicNames = + SubscriptionAgent.consumer() + .getTopicNamesSubscribedByConsumer( + consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId()); + final Set topicNamesToCheck = new HashSet<>(pollTopicNames); + topicNamesToCheck.removeIf(topicName -> !subscribedTopicNames.contains(topicName)); + final TSStatus ownerStatus = + SubscriptionAgent.topic().checkTopicOwners(consumerConfig, topicNamesToCheck); + if (ownerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribePollResp.toTPipeSubscribeResp(ownerStatus, Collections.emptyList()); + } events = handlePipeSubscribePollRequest( consumerConfig, (PollPayload) request.getPayload(), maxBytes); break; case POLL_FILE: + final TSStatus tsFileOwnerStatus = + SubscriptionAgent.topic() + .checkTopicOwner( + consumerConfig, + ((PollFilePayload) request.getPayload()).getCommitContext().getTopicName()); + if (tsFileOwnerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribePollResp.toTPipeSubscribeResp( + tsFileOwnerStatus, Collections.emptyList()); + } events = handlePipeSubscribePollTsFileRequest( consumerConfig, (PollFilePayload) request.getPayload()); break; case POLL_TABLETS: + final TSStatus tabletsOwnerStatus = + SubscriptionAgent.topic() + .checkTopicOwner( + consumerConfig, + ((PollTabletsPayload) request.getPayload()) + .getCommitContext() + .getTopicName()); + if (tabletsOwnerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribePollResp.toTPipeSubscribeResp( + tabletsOwnerStatus, Collections.emptyList()); + } events = handlePipeSubscribePollTabletsRequest( consumerConfig, (PollTabletsPayload) request.getPayload()); @@ -666,6 +713,16 @@ private TPipeSubscribeResp handlePipeSubscribeCommitInternal(final PipeSubscribe // commit (ack or nack) final List commitContexts = req.getCommitContexts(); + final TSStatus ownerStatus = + SubscriptionAgent.topic() + .checkTopicOwners( + consumerConfig, + commitContexts.stream() + .map(SubscriptionCommitContext::getTopicName) + .collect(Collectors.toSet())); + if (ownerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribeCommitResp.toTPipeSubscribeResp(ownerStatus); + } final boolean nack = req.isNack(); final List successfulCommitContexts = SubscriptionAgent.broker().commit(consumerConfig, commitContexts, nack); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java index ba0070187e368..c28111ec9d110 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java @@ -20,14 +20,19 @@ package org.apache.iotdb.db.subscription.receiver; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; +import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; +import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.rpc.subscription.config.ConsumerConfig; import org.apache.iotdb.rpc.subscription.config.ConsumerConstant; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.junit.Assert; import org.junit.Test; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -82,6 +87,76 @@ public void testCalculateConsumerInactivityTimeoutUsesHeartbeatMultiple() throws invokeCalculateConsumerInactivityTimeoutMs(receiver, createConsumerConfig(5_000L))); } + @Test + public void testTopicOwnerFencingStatus() { + final String topicName = "topic-" + UUID.randomUUID(); + + SubscriptionAgent.topic().handleSingleTopicMetaChanges(createTopicMeta(topicName, "sn1", 7L)); + try { + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + SubscriptionAgent.topic() + .checkTopicOwner(createConsumerConfig(1_000L, "sn1", 7L), topicName) + .getCode()); + Assert.assertEquals( + TSStatusCode.SUBSCRIPTION_OWNER_FENCED.getStatusCode(), + SubscriptionAgent.topic() + .checkTopicOwner(createConsumerConfig(1_000L, "sn2", 7L), topicName) + .getCode()); + Assert.assertEquals( + TSStatusCode.SUBSCRIPTION_OWNER_REQUIRED.getStatusCode(), + SubscriptionAgent.topic() + .checkTopicOwner(createConsumerConfig(1_000L), topicName) + .getCode()); + } finally { + SubscriptionAgent.topic().handleDropTopic(topicName); + } + } + + @Test + public void testOldOwnerFencedAfterNetworkPartitionAndTopicOwnerTransfer() { + final String topicName = "topic-" + UUID.randomUUID(); + final TopicMeta topicMeta = createTopicMeta(topicName, "sn1", 5L); + final ConsumerConfig oldSnConsumer = createConsumerConfig(1_000L, "sn1", 5L); + final ConsumerConfig newSnConsumer = createConsumerConfig(1_000L, "sn2", 6L); + + SubscriptionAgent.topic().handleSingleTopicMetaChanges(topicMeta); + try { + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + SubscriptionAgent.topic().checkTopicOwner(oldSnConsumer, topicName).getCode()); + + final TopicMeta transferredTopicMeta = topicMeta.deepCopy(); + transferredTopicMeta.transferOwner("sn2", 6L); + SubscriptionAgent.topic().handleSingleTopicMetaChanges(transferredTopicMeta); + + Assert.assertEquals( + TSStatusCode.SUBSCRIPTION_OWNER_FENCED.getStatusCode(), + SubscriptionAgent.topic().checkTopicOwner(oldSnConsumer, topicName).getCode()); + Assert.assertEquals( + TSStatusCode.SUBSCRIPTION_OWNER_FENCED.getStatusCode(), + SubscriptionAgent.topic() + .checkTopicOwners(oldSnConsumer, Collections.singleton(topicName)) + .getCode()); + Assert.assertNotNull( + SubscriptionAgent.topic() + .handleSingleTopicMetaChanges(createTopicMeta(topicName, "sn1", 5L))); + Assert.assertEquals( + TSStatusCode.SUBSCRIPTION_OWNER_FENCED.getStatusCode(), + SubscriptionAgent.topic().checkTopicOwner(oldSnConsumer, topicName).getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + SubscriptionAgent.topic().checkTopicOwner(newSnConsumer, topicName).getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + SubscriptionAgent.topic() + .checkTopicOwners(newSnConsumer, Collections.singleton(topicName)) + .getCode()); + } finally { + SubscriptionAgent.topic().handleDropTopic(topicName); + } + } + private long invokeCalculateConsumerInactivityTimeoutMs( final SubscriptionReceiverV1 receiver, final ConsumerConfig consumerConfig) throws Exception { final Method method = @@ -91,11 +166,30 @@ private long invokeCalculateConsumerInactivityTimeoutMs( return (long) method.invoke(receiver, consumerConfig); } + private TopicMeta createTopicMeta( + final String topicName, final String ownerId, final long ownerEpoch) { + final Map topicAttributes = new HashMap<>(); + topicAttributes.put(TopicConstant.OWNER_ID_KEY, ownerId); + topicAttributes.put(TopicConstant.OWNER_EPOCH_KEY, String.valueOf(ownerEpoch)); + return new TopicMeta(topicName, 1, topicAttributes); + } + private ConsumerConfig createConsumerConfig(final long heartbeatIntervalMs) { + return createConsumerConfig(heartbeatIntervalMs, null, null); + } + + private ConsumerConfig createConsumerConfig( + final long heartbeatIntervalMs, final String ownerId, final Long ownerEpoch) { final Map attributes = new HashMap<>(); attributes.put(ConsumerConstant.CONSUMER_ID_KEY, "consumer-" + UUID.randomUUID()); attributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "group-" + UUID.randomUUID()); attributes.put(ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, String.valueOf(heartbeatIntervalMs)); + if (ownerId != null) { + attributes.put(ConsumerConstant.OWNER_ID_KEY, ownerId); + } + if (ownerEpoch != null) { + attributes.put(ConsumerConstant.OWNER_EPOCH_KEY, String.valueOf(ownerEpoch)); + } return new ConsumerConfig(attributes); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java index badb77d6f486d..44222a5a697fc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.rpc.subscription.config.TopicConfig; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -45,11 +46,18 @@ public class TopicMeta { private long creationTime; // unit in ms private TopicConfig config; + private String ownerId; + private long ownerEpoch; + private long ownerLastTransferTimeMs; + private Long ownerLeaseExpireTimeMs; + // TODO: remove this variable later private Set subscribedConsumerGroupIds; // unused now private TopicMeta() { this.config = new TopicConfig(new HashMap<>()); + this.ownerEpoch = -1L; + this.ownerLastTransferTimeMs = -1L; this.subscribedConsumerGroupIds = new HashSet<>(); } @@ -59,6 +67,9 @@ public TopicMeta( this.topicName = topicName; this.creationTime = creationTime; this.config = new TopicConfig(topicAttributes); + this.ownerEpoch = -1L; + this.ownerLastTransferTimeMs = -1L; + initOwnerFromTopicAttributes(topicAttributes); this.subscribedConsumerGroupIds = new HashSet<>(); } @@ -68,6 +79,10 @@ public TopicMeta deepCopy() { copied.topicName = topicName; copied.creationTime = creationTime; copied.config = new TopicConfig(new HashMap<>(config.getAttribute())); + copied.ownerId = ownerId; + copied.ownerEpoch = ownerEpoch; + copied.ownerLastTransferTimeMs = ownerLastTransferTimeMs; + copied.ownerLeaseExpireTimeMs = ownerLeaseExpireTimeMs; copied.subscribedConsumerGroupIds = new HashSet<>(subscribedConsumerGroupIds); return copied; @@ -85,6 +100,81 @@ public TopicConfig getConfig() { return config; } + public boolean isOwnerFencingEnabled() { + return Objects.nonNull(ownerId) && ownerEpoch >= 0; + } + + public String getOwnerId() { + return ownerId; + } + + public long getOwnerEpoch() { + return ownerEpoch; + } + + public long getOwnerLastTransferTimeMs() { + return ownerLastTransferTimeMs; + } + + public Long getOwnerLeaseExpireTimeMs() { + return ownerLeaseExpireTimeMs; + } + + public void transferOwner(final String ownerId, final long ownerEpoch) { + transferOwner(ownerId, ownerEpoch, null); + } + + public void transferOwner( + final String ownerId, final long ownerEpoch, final Long ownerLeaseExpireTimeMs) { + if (Objects.isNull(ownerId) || ownerId.isEmpty()) { + throw new IllegalArgumentException("Subscription topic owner id should not be empty"); + } + if (ownerEpoch < 0) { + throw new IllegalArgumentException("Subscription topic owner epoch should not be negative"); + } + if (isOwnerFencingEnabled() && ownerEpoch <= this.ownerEpoch) { + throw new IllegalArgumentException( + String.format( + "Subscription topic owner epoch should increase monotonically, current epoch is %s," + + " incoming epoch is %s", + this.ownerEpoch, ownerEpoch)); + } + + this.ownerId = ownerId; + this.ownerEpoch = ownerEpoch; + this.ownerLeaseExpireTimeMs = ownerLeaseExpireTimeMs; + this.ownerLastTransferTimeMs = System.currentTimeMillis(); + + config.getAttribute().put(TopicConstant.OWNER_ID_KEY, ownerId); + config.getAttribute().put(TopicConstant.OWNER_EPOCH_KEY, String.valueOf(ownerEpoch)); + if (Objects.nonNull(ownerLeaseExpireTimeMs)) { + config + .getAttribute() + .put( + TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY, String.valueOf(ownerLeaseExpireTimeMs)); + } else { + config.getAttribute().remove(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY); + } + } + + public void clearOwner() { + ownerId = null; + ownerEpoch = -1L; + ownerLastTransferTimeMs = -1L; + ownerLeaseExpireTimeMs = null; + config.getAttribute().remove(TopicConstant.OWNER_ID_KEY); + config.getAttribute().remove(TopicConstant.OWNER_EPOCH_KEY); + config.getAttribute().remove(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY); + } + + public boolean matchesOwner(final String requestOwnerId, final Long requestOwnerEpoch) { + return !isOwnerFencingEnabled() + || (Objects.equals(ownerId, requestOwnerId) + && Objects.equals(ownerEpoch, requestOwnerEpoch) + && (Objects.isNull(ownerLeaseExpireTimeMs) + || System.currentTimeMillis() <= ownerLeaseExpireTimeMs)); + } + /** * @return true if the consumer group did not already subscribe this topic */ @@ -156,6 +246,8 @@ public static TopicMeta deserialize(final InputStream inputStream) throws IOExce topicMeta.subscribedConsumerGroupIds.add(ReadWriteIOUtils.readString(inputStream)); } + topicMeta.initOwnerFromTopicAttributes(topicMeta.config.getAttribute()); + return topicMeta; } @@ -177,9 +269,68 @@ public static TopicMeta deserialize(final ByteBuffer byteBuffer) { topicMeta.subscribedConsumerGroupIds.add(ReadWriteIOUtils.readString(byteBuffer)); } + topicMeta.initOwnerFromTopicAttributes(topicMeta.config.getAttribute()); + return topicMeta; } + public static void validateOwnerProgression( + final TopicMeta currentTopicMeta, final TopicMeta updatedTopicMeta) { + if (Objects.isNull(currentTopicMeta) + || Objects.isNull(updatedTopicMeta) + || !currentTopicMeta.isOwnerFencingEnabled()) { + return; + } + + if (!updatedTopicMeta.isOwnerFencingEnabled()) { + throw new IllegalArgumentException( + String.format( + "Subscription topic owner should not be cleared by stale topic meta, topic: %s," + + " current owner-id: %s, current owner-epoch: %s", + currentTopicMeta.getTopicName(), + currentTopicMeta.getOwnerId(), + currentTopicMeta.getOwnerEpoch())); + } + + final boolean epochRollback = + updatedTopicMeta.getOwnerEpoch() < currentTopicMeta.getOwnerEpoch(); + final boolean sameEpochOwnerChanged = + updatedTopicMeta.getOwnerEpoch() == currentTopicMeta.getOwnerEpoch() + && !Objects.equals(updatedTopicMeta.getOwnerId(), currentTopicMeta.getOwnerId()); + if (epochRollback || sameEpochOwnerChanged) { + throw new IllegalArgumentException( + String.format( + "Subscription topic owner epoch should not roll back, topic: %s, current owner-id:" + + " %s, current owner-epoch: %s, incoming owner-id: %s, incoming owner-epoch:" + + " %s", + currentTopicMeta.getTopicName(), + currentTopicMeta.getOwnerId(), + currentTopicMeta.getOwnerEpoch(), + updatedTopicMeta.getOwnerId(), + updatedTopicMeta.getOwnerEpoch())); + } + } + + private void initOwnerFromTopicAttributes(final Map topicAttributes) { + final TopicConfig topicConfig = new TopicConfig(topicAttributes); + final String configuredOwnerId = topicConfig.getString(TopicConstant.OWNER_ID_KEY); + if (Objects.isNull(configuredOwnerId)) { + return; + } + + final Long configuredOwnerEpoch = topicConfig.getLong(TopicConstant.OWNER_EPOCH_KEY); + if (Objects.isNull(configuredOwnerEpoch)) { + throw new IllegalArgumentException( + String.format( + "Subscription topic owner epoch should be set when %s is set", + TopicConstant.OWNER_ID_KEY)); + } + transferOwner( + configuredOwnerId, + configuredOwnerEpoch, + topicConfig.getLong(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY)); + } + /////////////////////////////// utilities /////////////////////////////// public Map generateExtractorAttributes(final String username) { @@ -257,12 +408,16 @@ public boolean equals(final Object obj) { final TopicMeta that = (TopicMeta) obj; return creationTime == that.creationTime && Objects.equals(topicName, that.topicName) - && Objects.equals(config, that.config); + && Objects.equals(config, that.config) + && Objects.equals(ownerId, that.ownerId) + && ownerEpoch == that.ownerEpoch + && Objects.equals(ownerLeaseExpireTimeMs, that.ownerLeaseExpireTimeMs); } @Override public int hashCode() { - return Objects.hash(topicName, creationTime, config); + return Objects.hash( + topicName, creationTime, config, ownerId, ownerEpoch, ownerLeaseExpireTimeMs); } @Override @@ -274,6 +429,14 @@ public String toString() { + creationTime + ", config=" + config + + ", ownerId='" + + ownerId + + "', ownerEpoch=" + + ownerEpoch + + ", ownerLastTransferTimeMs=" + + ownerLastTransferTimeMs + + ", ownerLeaseExpireTimeMs=" + + ownerLeaseExpireTimeMs + '}'; } } diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java index d9c280e14938c..0511edff6203e 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java @@ -20,10 +20,13 @@ package org.apache.iotdb.commons.subscription.topic; import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashMap; @@ -55,6 +58,64 @@ public void test() throws IOException { topicMeta.getSubscribedConsumerGroupIds(), topicMeta2.getSubscribedConsumerGroupIds()); } + @Test + public void testTopicOwnerDeSer() throws IOException { + Map topicAttributes = new HashMap<>(); + topicAttributes.put(TopicConstant.OWNER_ID_KEY, "sn1"); + topicAttributes.put(TopicConstant.OWNER_EPOCH_KEY, "5"); + + TopicMeta topicMeta = new TopicMeta("test_topic", 1, topicAttributes); + + Assert.assertTrue(topicMeta.isOwnerFencingEnabled()); + Assert.assertEquals("sn1", topicMeta.getOwnerId()); + Assert.assertEquals(5L, topicMeta.getOwnerEpoch()); + Assert.assertTrue(topicMeta.matchesOwner("sn1", 5L)); + Assert.assertFalse(topicMeta.matchesOwner("sn2", 5L)); + Assert.assertFalse(topicMeta.matchesOwner("sn1", 4L)); + + TopicMeta topicMeta1 = TopicMeta.deserialize(topicMeta.serialize()); + TopicMeta topicMeta2 = topicMeta1.deepCopy(); + + Assert.assertEquals(topicMeta, topicMeta1); + Assert.assertEquals(topicMeta, topicMeta2); + Assert.assertEquals(topicMeta.getOwnerId(), topicMeta2.getOwnerId()); + Assert.assertEquals(topicMeta.getOwnerEpoch(), topicMeta2.getOwnerEpoch()); + Assert.assertEquals( + topicMeta.getOwnerLeaseExpireTimeMs(), topicMeta2.getOwnerLeaseExpireTimeMs()); + + topicMeta.transferOwner("sn2", 6L, 100L); + Assert.assertEquals("sn2", topicMeta.getOwnerId()); + Assert.assertEquals(6L, topicMeta.getOwnerEpoch()); + Assert.assertEquals("sn2", topicMeta.getConfig().getString(TopicConstant.OWNER_ID_KEY)); + Assert.assertEquals( + 6L, topicMeta.getConfig().getLong(TopicConstant.OWNER_EPOCH_KEY).longValue()); + Assert.assertEquals( + 100L, + topicMeta.getConfig().getLong(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY).longValue()); + + topicMeta.clearOwner(); + Assert.assertFalse(topicMeta.isOwnerFencingEnabled()); + Assert.assertFalse(topicMeta.getConfig().hasAttribute(TopicConstant.OWNER_ID_KEY)); + Assert.assertFalse(topicMeta.getConfig().hasAttribute(TopicConstant.OWNER_EPOCH_KEY)); + Assert.assertFalse( + topicMeta.getConfig().hasAttribute(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY)); + } + + @Test + public void testSequentialTopicMetaDeserializeDoesNotConsumeNextTopic() throws IOException { + final TopicMeta firstTopicMeta = new TopicMeta("first_topic", 1, new HashMap<>()); + final TopicMeta secondTopicMeta = new TopicMeta("second_topic", 2, new HashMap<>()); + + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + firstTopicMeta.serialize(outputStream); + secondTopicMeta.serialize(outputStream); + + final ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + Assert.assertEquals(firstTopicMeta, TopicMeta.deserialize(inputStream)); + Assert.assertEquals(secondTopicMeta, TopicMeta.deserialize(inputStream)); + Assert.assertEquals(0, inputStream.available()); + } + @Test public void testGenerateExtractorAttributesWithEncryptedPassword() { final TopicMeta topicMeta = new TopicMeta("test_topic", 1, new HashMap<>());