diff --git a/docs/content/en/docs/documentation/error-handling-retries.md b/docs/content/en/docs/documentation/error-handling-retries.md index eeecf54751..7bd4ad2e22 100644 --- a/docs/content/en/docs/documentation/error-handling-retries.md +++ b/docs/content/en/docs/documentation/error-handling-retries.md @@ -135,6 +135,9 @@ these features: 2. In case an exception is thrown, a retry is initiated. However, if an event is received meanwhile, it will be reconciled instantly, and this execution won't count as a retry attempt. + If that event-triggered reconciliation also fails inside the current retry window, the + existing retry deadline is preserved rather than reset — the failure does not advance the + retry counter unless the original deadline is imminent. 3. If the retry limit is reached (so no more automatic retry would happen), but a new event received, the reconciliation will still happen, but won't reset the retry, and will still be marked as the last attempt in the retry info. The point (1) still holds - thus successful reconciliation will reset the retry - but no retry will happen in case of an error. diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index 1aecac6c9a..83e42687bc 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -49,6 +49,13 @@ public class EventProcessor

implements EventHandler, Life private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50; + /** + * Threshold below which an event-driven failed reconciliation that lands inside the current retry + * window is allowed to consume a retry attempt (i.e. advance the retry counter). Above this + * threshold the existing retry deadline is preserved instead. + */ + private static final long RETRY_DEADLINE_PRESERVE_THRESHOLD_MILLIS = 5_000; + private volatile boolean running; private final ControllerConfiguration controllerConfiguration; private final ReconciliationDispatcher

reconciliationDispatcher; @@ -369,6 +376,15 @@ private void handleRetryOnException(ExecutionScope

executionScope, Exception submitReconciliationExecution(state); return; } + Optional remaining = state.getRetry().remainingDurationUntilNextRetry(); + if (remaining.isPresent() + && remaining.get().toMillis() > RETRY_DEADLINE_PRESERVE_THRESHOLD_MILLIS) { + log.debug( + "Preserving existing retry deadline; remaining: {} ms. Not consuming a retry attempt.", + remaining.get().toMillis()); + retryEventSource().scheduleOnce(resourceID, remaining.get().toMillis()); + return; + } Optional nextDelay = state.getRetry().nextDelay(); nextDelay.ifPresentOrElse( delay -> { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java index 4bdce57a77..fadc022de7 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java @@ -15,6 +15,7 @@ */ package io.javaoperatorsdk.operator.processing.retry; +import java.time.Duration; import java.util.Optional; public class GenericRetryExecution implements RetryExecution { @@ -23,6 +24,7 @@ public class GenericRetryExecution implements RetryExecution { private int lastAttemptIndex = 0; private long currentInterval; + private Long lastNextDelayCallEpochMillis; public GenericRetryExecution(GenericRetry genericRetry) { this.genericRetry = genericRetry; @@ -40,6 +42,7 @@ public Optional nextDelay() { } } lastAttemptIndex++; + lastNextDelayCallEpochMillis = System.currentTimeMillis(); return Optional.of(currentInterval); } @@ -52,4 +55,16 @@ public boolean isLastAttempt() { public int getAttemptCount() { return lastAttemptIndex; } + + @Override + public Optional remainingDurationUntilNextRetry() { + if (lastNextDelayCallEpochMillis == null) { + return Optional.empty(); + } + long remaining = (lastNextDelayCallEpochMillis + currentInterval) - System.currentTimeMillis(); + if (remaining <= 0) { + return Optional.empty(); + } + return Optional.of(Duration.ofMillis(remaining)); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/RetryExecution.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/RetryExecution.java index caf71d7a33..a644a274ba 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/RetryExecution.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/RetryExecution.java @@ -15,6 +15,7 @@ */ package io.javaoperatorsdk.operator.processing.retry; +import java.time.Duration; import java.util.Optional; import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; @@ -25,4 +26,15 @@ public interface RetryExecution extends RetryInfo { * @return the time to wait until the next execution in milliseconds */ Optional nextDelay(); + + /** + * Remaining time of the currently scheduled retry interval, i.e. the time until the previously + * computed retry delay would elapse. Returns an empty {@link Optional} if no retry has been + * scheduled yet (i.e. {@link #nextDelay()} has never been called) or if the deadline has already + * passed. + * + *

Used to decide whether an event-driven failed reconciliation that lands well inside the + * retry window should consume a retry attempt or simply be re-scheduled on the original deadline. + */ + Optional remainingDurationUntilNextRetry(); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index fb8f7c0805..f7864f2f16 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -465,6 +465,98 @@ void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() { verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong()); } + @Test + void preservesRetryDeadlineWhenRemainingDurationAboveThreshold() { + RetryExecution mockRetryExecution = mock(RetryExecution.class); + when(mockRetryExecution.nextDelay()).thenReturn(Optional.of(60_000L)); + when(mockRetryExecution.remainingDurationUntilNextRetry()) + .thenReturn(Optional.of(Duration.ofMillis(50_000))); + Retry retry = mock(Retry.class); + when(retry.initExecution()).thenReturn(mockRetryExecution); + eventProcessorWithRetry = + spy( + new EventProcessor( + controllerConfiguration(retry, LinearRateLimiter.deactivatedRateLimiter()), + reconciliationDispatcherMock, + eventSourceManagerMock, + metricsMock)); + eventProcessorWithRetry.start(); + when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock); + + TestCustomResource customResource = testCustomResource(); + ExecutionScope executionScope = + new ExecutionScope(null, null, false, false).setResource(customResource); + PostExecutionControl postExecutionControl = + PostExecutionControl.exceptionDuringExecution(new RuntimeException("test")); + + eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl); + + verify(mockRetryExecution, never()).nextDelay(); + verify(retryTimerEventSourceMock, times(1)) + .scheduleOnce(eq(ResourceID.fromResource(customResource)), eq(50_000L)); + } + + @Test + void consumesRetryAttemptWhenRemainingDurationAtOrBelowThreshold() { + RetryExecution mockRetryExecution = mock(RetryExecution.class); + when(mockRetryExecution.nextDelay()).thenReturn(Optional.of(60_000L)); + when(mockRetryExecution.remainingDurationUntilNextRetry()) + .thenReturn(Optional.of(Duration.ofMillis(2_000))); + Retry retry = mock(Retry.class); + when(retry.initExecution()).thenReturn(mockRetryExecution); + eventProcessorWithRetry = + spy( + new EventProcessor( + controllerConfiguration(retry, LinearRateLimiter.deactivatedRateLimiter()), + reconciliationDispatcherMock, + eventSourceManagerMock, + metricsMock)); + eventProcessorWithRetry.start(); + when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock); + + TestCustomResource customResource = testCustomResource(); + ExecutionScope executionScope = + new ExecutionScope(null, null, false, false).setResource(customResource); + PostExecutionControl postExecutionControl = + PostExecutionControl.exceptionDuringExecution(new RuntimeException("test")); + + eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl); + + verify(mockRetryExecution, times(1)).nextDelay(); + verify(retryTimerEventSourceMock, times(1)) + .scheduleOnce(eq(ResourceID.fromResource(customResource)), eq(60_000L)); + } + + @Test + void firstFailureSchedulesUsingNextDelayWhenNoRemainingDuration() { + RetryExecution mockRetryExecution = mock(RetryExecution.class); + when(mockRetryExecution.nextDelay()).thenReturn(Optional.of(60_000L)); + when(mockRetryExecution.remainingDurationUntilNextRetry()).thenReturn(Optional.empty()); + Retry retry = mock(Retry.class); + when(retry.initExecution()).thenReturn(mockRetryExecution); + eventProcessorWithRetry = + spy( + new EventProcessor( + controllerConfiguration(retry, LinearRateLimiter.deactivatedRateLimiter()), + reconciliationDispatcherMock, + eventSourceManagerMock, + metricsMock)); + eventProcessorWithRetry.start(); + when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock); + + TestCustomResource customResource = testCustomResource(); + ExecutionScope executionScope = + new ExecutionScope(null, null, false, false).setResource(customResource); + PostExecutionControl postExecutionControl = + PostExecutionControl.exceptionDuringExecution(new RuntimeException("test")); + + eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl); + + verify(mockRetryExecution, times(1)).nextDelay(); + verify(retryTimerEventSourceMock, times(1)) + .scheduleOnce(eq(ResourceID.fromResource(customResource)), eq(60_000L)); + } + @Test void executionOfReconciliationShouldNotStartIfProcessorStopped() throws InterruptedException { when(reconciliationDispatcherMock.handleExecution(any())) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java index 8f5a446788..8d7ec55e37 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java @@ -21,10 +21,10 @@ import static org.assertj.core.api.Assertions.assertThat; -public class GenericRetryExecutionTest { +class GenericRetryExecutionTest { @Test - public void noNextDelayIfMaxAttemptLimitReached() { + void noNextDelayIfMaxAttemptLimitReached() { RetryExecution retryExecution = GenericRetry.defaultLimitedExponentialRetry().setMaxAttempts(3).initExecution(); Optional res = callNextDelayNTimes(retryExecution, 2); @@ -35,7 +35,7 @@ public void noNextDelayIfMaxAttemptLimitReached() { } @Test - public void canLimitMaxIntervalLength() { + void canLimitMaxIntervalLength() { RetryExecution retryExecution = GenericRetry.defaultLimitedExponentialRetry() .setInitialInterval(2000) @@ -49,13 +49,13 @@ public void canLimitMaxIntervalLength() { } @Test - public void supportsNoRetry() { + void supportsNoRetry() { RetryExecution retryExecution = GenericRetry.noRetry().initExecution(); assertThat(retryExecution.nextDelay()).isEmpty(); } @Test - public void supportsIsLastExecution() { + void supportsIsLastExecution() { GenericRetryExecution execution = new GenericRetry().setMaxAttempts(2).initExecution(); assertThat(execution.isLastAttempt()).isFalse(); @@ -65,7 +65,7 @@ public void supportsIsLastExecution() { } @Test - public void returnAttemptIndex() { + void returnAttemptIndex() { RetryExecution retryExecution = GenericRetry.defaultLimitedExponentialRetry().initExecution(); assertThat(retryExecution.getAttemptCount()).isEqualTo(0); @@ -73,11 +73,59 @@ public void returnAttemptIndex() { assertThat(retryExecution.getAttemptCount()).isEqualTo(1); } - private RetryExecution getDefaultRetryExecution() { - return GenericRetry.defaultLimitedExponentialRetry().initExecution(); + @Test + void remainingDurationEmptyBeforeFirstNextDelay() { + RetryExecution retryExecution = GenericRetry.defaultLimitedExponentialRetry().initExecution(); + + assertThat(retryExecution.remainingDurationUntilNextRetry()).isEmpty(); + } + + @Test + void remainingDurationPresentAfterNextDelay() { + long interval = 60_000L; + RetryExecution retryExecution = new GenericRetry().setInitialInterval(interval).initExecution(); + + retryExecution.nextDelay(); + + Optional remaining = retryExecution.remainingDurationUntilNextRetry(); + assertThat(remaining).isPresent(); + assertThat(remaining.get().toMillis()).isPositive().isLessThanOrEqualTo(interval); + } + + @Test + void remainingDurationEmptyAfterIntervalElapsed() throws InterruptedException { + RetryExecution retryExecution = new GenericRetry().setInitialInterval(20).initExecution(); + + retryExecution.nextDelay(); + Thread.sleep(60); + + assertThat(retryExecution.remainingDurationUntilNextRetry()).isEmpty(); + } + + @Test + void remainingDurationReflectsUpdatedIntervalAfterSubsequentNextDelay() { + long initialInterval = 1000L; + double multiplier = 2.0; + RetryExecution retryExecution = + new GenericRetry() + .setInitialInterval(initialInterval) + .setIntervalMultiplier(multiplier) + .initExecution(); + + // first two calls keep the initial interval (multiplier only kicks in after attempt 1) + retryExecution.nextDelay(); + retryExecution.nextDelay(); + // third call doubles the interval to 2000ms + retryExecution.nextDelay(); + + Optional remaining = retryExecution.remainingDurationUntilNextRetry(); + assertThat(remaining).isPresent(); + assertThat(remaining.get().toMillis()) + .isPositive() + .isLessThanOrEqualTo((long) (initialInterval * multiplier)); } - public Optional callNextDelayNTimes(RetryExecution retryExecution, int n) { + Optional callNextDelayNTimes(RetryExecution retryExecution, int n) { for (int i = 0; i < n; i++) { retryExecution.nextDelay(); } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/retry/RetryIntervalHonoredOnFrequentEventsIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/retry/RetryIntervalHonoredOnFrequentEventsIT.java new file mode 100644 index 0000000000..df525e8056 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/retry/RetryIntervalHonoredOnFrequentEventsIT.java @@ -0,0 +1,107 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.baseapi.retry; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.javaoperatorsdk.annotation.Sample; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.processing.retry.GenericRetry; + +import static io.javaoperatorsdk.operator.baseapi.retry.RetryIT.createTestCustomResource; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +@Sample( + tldr = "Retry Interval Honored Despite Frequent Reconciliation Triggers", + description = + """ + Verifies that with a low max attempts (3) and a high retry interval (1 minute), \ + reconciliations triggered by external events (e.g. resource updates) during the retry \ + window do not consume retry attempts. The retry counter should only advance when the \ + scheduled retry deadline is approached, so the configured interval is honored. + """) +class RetryIntervalHonoredOnFrequentEventsIT { + + private static final Logger log = + LoggerFactory.getLogger(RetryIntervalHonoredOnFrequentEventsIT.class); + + public static final int MAX_RETRY_ATTEMPTS = 3; + public static final int RETRY_INTERVAL_MILLIS = 60_000; + public static final int ALL_EXECUTIONS_TO_FAIL = 99; + public static final int NUMBER_OF_UPDATES = 5; + + RetryTestCustomReconciler reconciler = new RetryTestCustomReconciler(ALL_EXECUTIONS_TO_FAIL); + + @RegisterExtension + LocallyRunOperatorExtension operator = + LocallyRunOperatorExtension.builder() + .withReconciler( + reconciler, + new GenericRetry() + .setInitialInterval(RETRY_INTERVAL_MILLIS) + .withLinearRetry() + .setMaxAttempts(MAX_RETRY_ATTEMPTS)) + .build(); + + @Test + void frequentEventsDuringRetryWindowDoNotExhaustRetryCounter() { + RetryTestCustomResource resource = createTestCustomResource("frequent-events"); + var created = operator.create(resource); + + // Wait until the initial reconciliation has been executed and failed; the retry timer is now + // armed for RETRY_INTERVAL_MILLIS in the future, retry counter is at 1. + await() + .pollInterval(Duration.ofMillis(50)) + .atMost(5, TimeUnit.SECONDS) + .untilAsserted( + () -> assertThat(reconciler.getNumberOfExecutions()).isGreaterThanOrEqualTo(1)); + + // Trigger several updates spaced apart so each results in its own reconciliation cycle. Each + // failed reconciliation lands well inside the 1 minute retry window, so the retry counter + // must NOT advance — only the original retry deadline matters. + IntStream.rangeClosed(1, NUMBER_OF_UPDATES) + .forEach( + i -> { + log.debug("replacing resource, iteration: {}", i); + var latest = + operator.get(RetryTestCustomResource.class, created.getMetadata().getName()); + latest.getSpec().setValue("update-" + i); + operator.replace(latest); + int expectedExecutions = i + 1; + await() + .pollInterval(Duration.ofMillis(50)) + .atMost(5, TimeUnit.SECONDS) + .untilAsserted( + () -> + assertThat(reconciler.getNumberOfExecutions()) + .isGreaterThanOrEqualTo(expectedExecutions)); + }); + + // Reconciliations did happen for every event (so events are not lost) but the retry counter + // observed inside the reconciler never went past 1: the configured 1 minute interval is + // honored even under a steady stream of external events. + assertThat(reconciler.getNumberOfExecutions()).isGreaterThanOrEqualTo(NUMBER_OF_UPDATES + 1); + assertThat(reconciler.getMaxObservedRetryAttempt()).isEqualTo(1); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/retry/RetryTestCustomReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/retry/RetryTestCustomReconciler.java index 30a339fc4d..f981b9e1cb 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/retry/RetryTestCustomReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/retry/RetryTestCustomReconciler.java @@ -32,6 +32,7 @@ public class RetryTestCustomReconciler private static final Logger log = LoggerFactory.getLogger(RetryTestCustomReconciler.class); private final AtomicInteger numberOfExecutions = new AtomicInteger(0); + private final AtomicInteger maxObservedRetryAttempt = new AtomicInteger(0); private final AtomicInteger numberOfExecutionFails; @@ -43,6 +44,12 @@ public RetryTestCustomReconciler(int numberOfExecutionFails) { public UpdateControl reconcile( RetryTestCustomResource resource, Context context) { numberOfExecutions.addAndGet(1); + context + .getRetryInfo() + .ifPresent( + info -> + maxObservedRetryAttempt.updateAndGet( + prev -> Math.max(prev, info.getAttemptCount()))); log.info("Value: " + resource.getSpec().getValue()); @@ -70,4 +77,8 @@ private void ensureStatusExists(RetryTestCustomResource resource) { public int getNumberOfExecutions() { return numberOfExecutions.get(); } + + public int getMaxObservedRetryAttempt() { + return maxObservedRetryAttempt.get(); + } }