Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,11 @@ public enum TSStatusCode {
SHOW_SUBSCRIPTION_ERROR(1910),
SUBSCRIPTION_PIPE_TIMEOUT_ERROR(1911),
SUBSCRIPTION_NOT_ENABLED_ERROR(1912),
SUBSCRIPTION_OWNER_FENCED(1913),
SUBSCRIPTION_OWNER_REQUIRED(1914),
SUBSCRIPTION_OWNER_EPOCH_REQUIRED(1915),
SUBSCRIPTION_OWNER_LEASE_EXPIRED(1916),
SUBSCRIPTION_OWNER_EPOCH_CONFLICT(1917),

// Topic
CREATE_TOPIC_ERROR(2000),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ public String getConsumerGroupId() {
return getString(ConsumerConstant.CONSUMER_GROUP_ID_KEY);
}

public String getOwnerId() {
return getString(ConsumerConstant.OWNER_ID_KEY);
}

public Long getOwnerEpoch() {
return hasAttribute(ConsumerConstant.OWNER_EPOCH_KEY)
? getLong(ConsumerConstant.OWNER_EPOCH_KEY)
: null;
}

public String getUsername() {
return getString(ConsumerConstant.USERNAME_KEY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class ConsumerConstant {

public static final String CONSUMER_ID_KEY = "consumer-id";
public static final String CONSUMER_GROUP_ID_KEY = "group-id";
public static final String OWNER_ID_KEY = "owner-id";
public static final String OWNER_EPOCH_KEY = "owner-epoch";

public static final String HEARTBEAT_INTERVAL_MS_KEY = "heartbeat-interval-ms";
public static final long HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE = 30_000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public class TopicConstant {
public static final String STRICT_KEY = "strict";
public static final String STRICT_DEFAULT_VALUE = "true";

public static final String OWNER_ID_KEY = "owner-id";
public static final String OWNER_EPOCH_KEY = "owner-epoch";
public static final String OWNER_LEASE_EXPIRE_TIME_MS_KEY = "owner-lease-expire-time-ms";

private TopicConstant() {
throw new IllegalStateException(SubscriptionMessages.UTILITY_CLASS);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.rpc.subscription.exception;

import java.util.Objects;

public class SubscriptionOwnerFencedException extends SubscriptionRuntimeNonCriticalException {

Check warning on line 24 in iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionOwnerFencedException.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This class has 8 parents which is greater than 5 authorized.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5uLPRoYgb0IV-Vu4_D&open=AZ5uLPRoYgb0IV-Vu4_D&pullRequest=17780

public SubscriptionOwnerFencedException(final String message) {
super(message);
}

public SubscriptionOwnerFencedException(final String message, final Throwable cause) {
super(message, cause);
}

@Override
public boolean equals(final Object obj) {
return obj instanceof SubscriptionOwnerFencedException
&& Objects.equals(getMessage(), ((SubscriptionOwnerFencedException) obj).getMessage())
&& Objects.equals(getTimeStamp(), ((SubscriptionOwnerFencedException) obj).getTimeStamp());
}

@Override
public int hashCode() {
return super.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable {

protected String consumerId;
protected String consumerGroupId;
protected String ownerId;
protected Long ownerEpoch;

private final long heartbeatIntervalMs;
private final long endpointsSyncIntervalMs;
Expand Down Expand Up @@ -154,6 +156,14 @@ public String getConsumerGroupId() {
return consumerGroupId;
}

public String getOwnerId() {
return ownerId;
}

public Long getOwnerEpoch() {
return ownerEpoch;
}

/////////////////////////////// ctor ///////////////////////////////

protected AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder) {
Expand Down Expand Up @@ -183,6 +193,8 @@ protected AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder

this.consumerId = builder.consumerId;
this.consumerGroupId = builder.consumerGroupId;
this.ownerId = builder.ownerId;
this.ownerEpoch = builder.ownerEpoch;

this.heartbeatIntervalMs = builder.heartbeatIntervalMs;
this.endpointsSyncIntervalMs = builder.endpointsSyncIntervalMs;
Expand Down Expand Up @@ -213,6 +225,8 @@ protected AbstractSubscriptionConsumer(
.encryptedPassword((String) properties.get(ConsumerConstant.ENCRYPTED_PASSWORD_KEY))
.consumerId((String) properties.get(ConsumerConstant.CONSUMER_ID_KEY))
.consumerGroupId((String) properties.get(ConsumerConstant.CONSUMER_GROUP_ID_KEY))
.ownerId((String) properties.get(ConsumerConstant.OWNER_ID_KEY))
.ownerEpoch((Long) properties.get(ConsumerConstant.OWNER_EPOCH_KEY))
.heartbeatIntervalMs(
(Long)
properties.getOrDefault(
Expand Down Expand Up @@ -394,6 +408,8 @@ protected abstract AbstractSubscriptionProvider constructSubscriptionProvider(
final String encryptedPassword,
final String consumerId,
final String consumerGroupId,
final String ownerId,
final Long ownerEpoch,
final int thriftMaxFrameSize,
final long heartbeatIntervalMs,
final int connectionTimeoutInMs);
Expand All @@ -408,6 +424,8 @@ AbstractSubscriptionProvider constructProviderAndHandshake(final TEndPoint endPo
this.encryptedPassword,
this.consumerId,
this.consumerGroupId,
this.ownerId,
this.ownerEpoch,
this.thriftMaxFrameSize,
this.heartbeatIntervalMs,
this.connectionTimeoutInMs);
Expand Down Expand Up @@ -1429,6 +1447,8 @@ protected Map<String, String> coreReportMessage() {
final Map<String, String> result = new HashMap<>();
result.put("consumerId", consumerId);
result.put("consumerGroupId", consumerGroupId);
result.put("ownerId", ownerId);
result.put("ownerEpoch", String.valueOf(ownerEpoch));
result.put("isClosed", isClosed.toString());
result.put("fileSaveDir", fileSaveDir);
result.put(
Expand All @@ -1443,6 +1463,8 @@ protected Map<String, String> allReportMessage() {
final Map<String, String> result = new HashMap<>();
result.put("consumerId", consumerId);
result.put("consumerGroupId", consumerGroupId);
result.put("ownerId", ownerId);
result.put("ownerEpoch", String.valueOf(ownerEpoch));
result.put("heartbeatIntervalMs", String.valueOf(heartbeatIntervalMs));
result.put("endpointsSyncIntervalMs", String.valueOf(endpointsSyncIntervalMs));
result.put("providers", providers.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class AbstractSubscriptionConsumerBuilder {

protected String consumerId;
protected String consumerGroupId;
protected String ownerId;
protected Long ownerEpoch;

protected long heartbeatIntervalMs = ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE;
protected long endpointsSyncIntervalMs =
Expand Down Expand Up @@ -111,6 +113,27 @@ public AbstractSubscriptionConsumerBuilder consumerGroupId(
return this;
}

public AbstractSubscriptionConsumerBuilder ownerId(@Nullable final String ownerId) {
if (Objects.isNull(ownerId)) {
return this;
}
this.ownerId = ownerId;
return this;
}

public AbstractSubscriptionConsumerBuilder ownerEpoch(final long ownerEpoch) {
this.ownerEpoch = ownerEpoch;
return this;
}

public AbstractSubscriptionConsumerBuilder ownerEpoch(@Nullable final Long ownerEpoch) {
if (Objects.isNull(ownerEpoch)) {
return this;
}
this.ownerEpoch = ownerEpoch;
return this;
}

public AbstractSubscriptionConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) {
this.heartbeatIntervalMs =
Math.max(heartbeatIntervalMs, ConsumerConstant.HEARTBEAT_INTERVAL_MS_MIN_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.rpc.subscription.config.TopicConfig;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionOwnerFencedException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionPipeTimeoutException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeCriticalException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeNonCriticalException;
Expand Down Expand Up @@ -81,6 +82,8 @@

private String consumerId;
private String consumerGroupId;
private final String ownerId;
private final Long ownerEpoch;

private final AtomicBoolean isClosed = new AtomicBoolean(true);
private final AtomicBoolean isAvailable = new AtomicBoolean(false);
Expand Down Expand Up @@ -109,6 +112,8 @@
final String encryptedPassword,
final String consumerId,
final String consumerGroupId,
final String ownerId,
final Long ownerEpoch,
final int thriftMaxFrameSize,
final long heartbeatIntervalMs,
final int connectionTimeoutInMs) {
Expand All @@ -124,6 +129,8 @@
this.endPoint = endPoint;
this.consumerId = consumerId;
this.consumerGroupId = consumerGroupId;
this.ownerId = ownerId;
this.ownerEpoch = ownerEpoch;
this.username = username;
this.password = password;
this.encryptedPassword = encryptedPassword;
Expand Down Expand Up @@ -176,6 +183,12 @@
final Map<String, String> consumerAttributes = new HashMap<>();
consumerAttributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, consumerGroupId);
consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
if (ownerId != null) {
consumerAttributes.put(ConsumerConstant.OWNER_ID_KEY, ownerId);
}
if (ownerEpoch != null) {
consumerAttributes.put(ConsumerConstant.OWNER_EPOCH_KEY, String.valueOf(ownerEpoch));
}
consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username);
consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password);
if (encryptedPassword != null) {
Expand Down Expand Up @@ -451,6 +464,17 @@
case 1911: // SUBSCRIPTION_PIPE_TIMEOUT_ERROR
throw new SubscriptionPipeTimeoutException(
String.format(SUBSCRIPTION_PIPE_TIMEOUT_FORMATTER, status.code, status.message));
case 1913: // SUBSCRIPTION_OWNER_FENCED
case 1914: // SUBSCRIPTION_OWNER_REQUIRED
case 1915: // SUBSCRIPTION_OWNER_EPOCH_REQUIRED
case 1916: // SUBSCRIPTION_OWNER_LEASE_EXPIRED
case 1917: // SUBSCRIPTION_OWNER_EPOCH_CONFLICT
{

Check warning on line 472 in iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

'block lcurly' has incorrect indentation level 8, expected level should be 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5uLPLMYgb0IV-Vu4-9&open=AZ5uLPLMYgb0IV-Vu4-9&pullRequest=17780

Check warning on line 472 in iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

'{' at column 9 should be on the previous line.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5uLPLMYgb0IV-Vu4--&open=AZ5uLPLMYgb0IV-Vu4--&pullRequest=17780
final String errorMessage =

Check warning on line 473 in iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

'block' child has incorrect indentation level 10, expected level should be 8.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5uLPLMYgb0IV-Vu4-_&open=AZ5uLPLMYgb0IV-Vu4-_&pullRequest=17780
String.format(INTERNAL_ERROR_FORMATTER, status.code, status.message);
LOGGER.warn(errorMessage);

Check warning on line 475 in iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

'block' child has incorrect indentation level 10, expected level should be 8.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5uLPLMYgb0IV-Vu4_A&open=AZ5uLPLMYgb0IV-Vu4_A&pullRequest=17780
throw new SubscriptionOwnerFencedException(errorMessage);

Check warning on line 476 in iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

'block' child has incorrect indentation level 10, expected level should be 8.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5uLPLMYgb0IV-Vu4_B&open=AZ5uLPLMYgb0IV-Vu4_B&pullRequest=17780
}

Check warning on line 477 in iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

'block rcurly' has incorrect indentation level 8, expected level should be 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5uLPLMYgb0IV-Vu4_C&open=AZ5uLPLMYgb0IV-Vu4_C&pullRequest=17780
case 1900: // SUBSCRIPTION_VERSION_ERROR
case 1901: // SUBSCRIPTION_TYPE_ERROR
case 1909: // SUBSCRIPTION_MISSING_CUSTOMER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ final class SubscriptionTableProvider extends AbstractSubscriptionProvider {
final String encryptedPassword,
final String consumerId,
final String consumerGroupId,
final String ownerId,
final Long ownerEpoch,
final int thriftMaxFrameSize,
final long heartbeatIntervalMs,
final int connectionTimeoutInMs) {
Expand All @@ -43,6 +45,8 @@ final class SubscriptionTableProvider extends AbstractSubscriptionProvider {
encryptedPassword,
consumerId,
consumerGroupId,
ownerId,
ownerEpoch,
thriftMaxFrameSize,
heartbeatIntervalMs,
connectionTimeoutInMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider(
final String encryptedPassword,
final String consumerId,
final String consumerGroupId,
final String ownerId,
final Long ownerEpoch,
final int thriftMaxFrameSize,
final long heartbeatIntervalMs,
final int connectionTimeoutInMs) {
Expand All @@ -55,6 +57,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider(
encryptedPassword,
consumerId,
consumerGroupId,
ownerId,
ownerEpoch,
thriftMaxFrameSize,
heartbeatIntervalMs,
connectionTimeoutInMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,24 @@ public SubscriptionTablePullConsumerBuilder consumerGroupId(final String consume
return this;
}

@Override
public SubscriptionTablePullConsumerBuilder ownerId(final String ownerId) {
super.ownerId(ownerId);
return this;
}

@Override
public SubscriptionTablePullConsumerBuilder ownerEpoch(final long ownerEpoch) {
super.ownerEpoch(ownerEpoch);
return this;
}

@Override
public SubscriptionTablePullConsumerBuilder ownerEpoch(final Long ownerEpoch) {
super.ownerEpoch(ownerEpoch);
return this;
}

@Override
public SubscriptionTablePullConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) {
super.heartbeatIntervalMs(heartbeatIntervalMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider(
final String encryptedPassword,
final String consumerId,
final String consumerGroupId,
final String ownerId,
final Long ownerEpoch,
final int thriftMaxFrameSize,
final long heartbeatIntervalMs,
final int connectionTimeoutInMs) {
Expand All @@ -51,6 +53,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider(
encryptedPassword,
consumerId,
consumerGroupId,
ownerId,
ownerEpoch,
thriftMaxFrameSize,
heartbeatIntervalMs,
connectionTimeoutInMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,24 @@ public SubscriptionTablePushConsumerBuilder consumerGroupId(final String consume
return this;
}

@Override
public SubscriptionTablePushConsumerBuilder ownerId(final String ownerId) {
super.ownerId(ownerId);
return this;
}

@Override
public SubscriptionTablePushConsumerBuilder ownerEpoch(final long ownerEpoch) {
super.ownerEpoch(ownerEpoch);
return this;
}

@Override
public SubscriptionTablePushConsumerBuilder ownerEpoch(final Long ownerEpoch) {
super.ownerEpoch(ownerEpoch);
return this;
}

@Override
public SubscriptionTablePushConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) {
super.heartbeatIntervalMs(heartbeatIntervalMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ final class SubscriptionTreeProvider extends AbstractSubscriptionProvider {
final String encryptedPassword,
final String consumerId,
final String consumerGroupId,
final String ownerId,
final Long ownerEpoch,
final int thriftMaxFrameSize,
final long heartbeatIntervalMs,
final int connectionTimeoutInMs) {
Expand All @@ -43,6 +45,8 @@ final class SubscriptionTreeProvider extends AbstractSubscriptionProvider {
encryptedPassword,
consumerId,
consumerGroupId,
ownerId,
ownerEpoch,
thriftMaxFrameSize,
heartbeatIntervalMs,
connectionTimeoutInMs);
Expand Down
Loading
Loading