Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/content/en/docs/documentation/error-handling-retries.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ these features:

2. In case an exception is thrown, a retry is initiated. However, if an event is received
meanwhile, it will be reconciled instantly, and this execution won't count as a retry attempt.
If that event-triggered reconciliation also fails inside the current retry window, the
existing retry deadline is preserved rather than reset — the failure does not advance the
retry counter unless the original deadline is imminent.
3. If the retry limit is reached (so no more automatic retry would happen), but a new event
received, the reconciliation will still happen, but won't reset the retry, and will still be
marked as the last attempt in the retry info. The point (1) still holds - thus successful reconciliation will reset the retry - but no retry will happen in case of an error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ public class EventProcessor<P extends HasMetadata> implements EventHandler, Life
private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50;

/**
* Threshold below which an event-driven failed reconciliation that lands inside the current retry
* window is allowed to consume a retry attempt (i.e. advance the retry counter). Above this
* threshold the existing retry deadline is preserved instead.
*/
private static final long RETRY_DEADLINE_PRESERVE_THRESHOLD_MILLIS = 5_000;
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is up to debate if this should be configurable, but maybe we can for now stick KISS principle


private volatile boolean running;
private final ControllerConfiguration<?> controllerConfiguration;
private final ReconciliationDispatcher<P> reconciliationDispatcher;
Expand Down Expand Up @@ -369,6 +376,15 @@ private void handleRetryOnException(ExecutionScope<P> executionScope, Exception
submitReconciliationExecution(state);
return;
}
Optional<Duration> remaining = state.getRetry().remainingDurationUntilNextRetry();
if (remaining.isPresent()
&& remaining.get().toMillis() > RETRY_DEADLINE_PRESERVE_THRESHOLD_MILLIS) {
log.debug(
"Preserving existing retry deadline; remaining: {} ms. Not consuming a retry attempt.",
remaining.get().toMillis());
retryEventSource().scheduleOnce(resourceID, remaining.get().toMillis());
return;
}
Optional<Long> nextDelay = state.getRetry().nextDelay();
nextDelay.ifPresentOrElse(
delay -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.javaoperatorsdk.operator.processing.retry;

import java.time.Duration;
import java.util.Optional;

public class GenericRetryExecution implements RetryExecution {
Expand All @@ -23,6 +24,7 @@ public class GenericRetryExecution implements RetryExecution {

private int lastAttemptIndex = 0;
private long currentInterval;
private Long lastNextDelayCallEpochMillis;

public GenericRetryExecution(GenericRetry genericRetry) {
this.genericRetry = genericRetry;
Expand All @@ -40,6 +42,7 @@ public Optional<Long> nextDelay() {
}
}
lastAttemptIndex++;
lastNextDelayCallEpochMillis = System.currentTimeMillis();
return Optional.of(currentInterval);
}

Expand All @@ -52,4 +55,16 @@ public boolean isLastAttempt() {
public int getAttemptCount() {
return lastAttemptIndex;
}

@Override
public Optional<Duration> remainingDurationUntilNextRetry() {
if (lastNextDelayCallEpochMillis == null) {
return Optional.empty();
}
long remaining = (lastNextDelayCallEpochMillis + currentInterval) - System.currentTimeMillis();
if (remaining <= 0) {
return Optional.empty();
}
return Optional.of(Duration.ofMillis(remaining));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.javaoperatorsdk.operator.processing.retry;

import java.time.Duration;
import java.util.Optional;

import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
Expand All @@ -25,4 +26,15 @@ public interface RetryExecution extends RetryInfo {
* @return the time to wait until the next execution in milliseconds
*/
Optional<Long> nextDelay();

/**
* Remaining time of the currently scheduled retry interval, i.e. the time until the previously
* computed retry delay would elapse. Returns an empty {@link Optional} if no retry has been
* scheduled yet (i.e. {@link #nextDelay()} has never been called) or if the deadline has already
* passed.
*
* <p>Used to decide whether an event-driven failed reconciliation that lands well inside the
* retry window should consume a retry attempt or simply be re-scheduled on the original deadline.
*/
Optional<Duration> remainingDurationUntilNextRetry();
}
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,98 @@ void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() {
verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong());
}

@Test
void preservesRetryDeadlineWhenRemainingDurationAboveThreshold() {
RetryExecution mockRetryExecution = mock(RetryExecution.class);
when(mockRetryExecution.nextDelay()).thenReturn(Optional.of(60_000L));
when(mockRetryExecution.remainingDurationUntilNextRetry())
.thenReturn(Optional.of(Duration.ofMillis(50_000)));
Retry retry = mock(Retry.class);
when(retry.initExecution()).thenReturn(mockRetryExecution);
eventProcessorWithRetry =
spy(
new EventProcessor(
controllerConfiguration(retry, LinearRateLimiter.deactivatedRateLimiter()),
reconciliationDispatcherMock,
eventSourceManagerMock,
metricsMock));
eventProcessorWithRetry.start();
when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);

TestCustomResource customResource = testCustomResource();
ExecutionScope executionScope =
new ExecutionScope(null, null, false, false).setResource(customResource);
PostExecutionControl postExecutionControl =
PostExecutionControl.exceptionDuringExecution(new RuntimeException("test"));

eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl);

verify(mockRetryExecution, never()).nextDelay();
verify(retryTimerEventSourceMock, times(1))
.scheduleOnce(eq(ResourceID.fromResource(customResource)), eq(50_000L));
}

@Test
void consumesRetryAttemptWhenRemainingDurationAtOrBelowThreshold() {
RetryExecution mockRetryExecution = mock(RetryExecution.class);
when(mockRetryExecution.nextDelay()).thenReturn(Optional.of(60_000L));
when(mockRetryExecution.remainingDurationUntilNextRetry())
.thenReturn(Optional.of(Duration.ofMillis(2_000)));
Retry retry = mock(Retry.class);
when(retry.initExecution()).thenReturn(mockRetryExecution);
eventProcessorWithRetry =
spy(
new EventProcessor(
controllerConfiguration(retry, LinearRateLimiter.deactivatedRateLimiter()),
reconciliationDispatcherMock,
eventSourceManagerMock,
metricsMock));
eventProcessorWithRetry.start();
when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);

TestCustomResource customResource = testCustomResource();
ExecutionScope executionScope =
new ExecutionScope(null, null, false, false).setResource(customResource);
PostExecutionControl postExecutionControl =
PostExecutionControl.exceptionDuringExecution(new RuntimeException("test"));

eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl);

verify(mockRetryExecution, times(1)).nextDelay();
verify(retryTimerEventSourceMock, times(1))
.scheduleOnce(eq(ResourceID.fromResource(customResource)), eq(60_000L));
}

@Test
void firstFailureSchedulesUsingNextDelayWhenNoRemainingDuration() {
RetryExecution mockRetryExecution = mock(RetryExecution.class);
when(mockRetryExecution.nextDelay()).thenReturn(Optional.of(60_000L));
when(mockRetryExecution.remainingDurationUntilNextRetry()).thenReturn(Optional.empty());
Retry retry = mock(Retry.class);
when(retry.initExecution()).thenReturn(mockRetryExecution);
eventProcessorWithRetry =
spy(
new EventProcessor(
controllerConfiguration(retry, LinearRateLimiter.deactivatedRateLimiter()),
reconciliationDispatcherMock,
eventSourceManagerMock,
metricsMock));
eventProcessorWithRetry.start();
when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);

TestCustomResource customResource = testCustomResource();
ExecutionScope executionScope =
new ExecutionScope(null, null, false, false).setResource(customResource);
PostExecutionControl postExecutionControl =
PostExecutionControl.exceptionDuringExecution(new RuntimeException("test"));

eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl);

verify(mockRetryExecution, times(1)).nextDelay();
verify(retryTimerEventSourceMock, times(1))
.scheduleOnce(eq(ResourceID.fromResource(customResource)), eq(60_000L));
}

@Test
void executionOfReconciliationShouldNotStartIfProcessorStopped() throws InterruptedException {
when(reconciliationDispatcherMock.handleExecution(any()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

import static org.assertj.core.api.Assertions.assertThat;

public class GenericRetryExecutionTest {
class GenericRetryExecutionTest {

@Test
public void noNextDelayIfMaxAttemptLimitReached() {
void noNextDelayIfMaxAttemptLimitReached() {
RetryExecution retryExecution =
GenericRetry.defaultLimitedExponentialRetry().setMaxAttempts(3).initExecution();
Optional<Long> res = callNextDelayNTimes(retryExecution, 2);
Expand All @@ -35,7 +35,7 @@ public void noNextDelayIfMaxAttemptLimitReached() {
}

@Test
public void canLimitMaxIntervalLength() {
void canLimitMaxIntervalLength() {
RetryExecution retryExecution =
GenericRetry.defaultLimitedExponentialRetry()
.setInitialInterval(2000)
Expand All @@ -49,13 +49,13 @@ public void canLimitMaxIntervalLength() {
}

@Test
public void supportsNoRetry() {
void supportsNoRetry() {
RetryExecution retryExecution = GenericRetry.noRetry().initExecution();
assertThat(retryExecution.nextDelay()).isEmpty();
}

@Test
public void supportsIsLastExecution() {
void supportsIsLastExecution() {
GenericRetryExecution execution = new GenericRetry().setMaxAttempts(2).initExecution();
assertThat(execution.isLastAttempt()).isFalse();

Expand All @@ -65,19 +65,67 @@ public void supportsIsLastExecution() {
}

@Test
public void returnAttemptIndex() {
void returnAttemptIndex() {
RetryExecution retryExecution = GenericRetry.defaultLimitedExponentialRetry().initExecution();

assertThat(retryExecution.getAttemptCount()).isEqualTo(0);
retryExecution.nextDelay();
assertThat(retryExecution.getAttemptCount()).isEqualTo(1);
}

private RetryExecution getDefaultRetryExecution() {
return GenericRetry.defaultLimitedExponentialRetry().initExecution();
@Test
void remainingDurationEmptyBeforeFirstNextDelay() {
RetryExecution retryExecution = GenericRetry.defaultLimitedExponentialRetry().initExecution();

assertThat(retryExecution.remainingDurationUntilNextRetry()).isEmpty();
}

@Test
void remainingDurationPresentAfterNextDelay() {
long interval = 60_000L;
RetryExecution retryExecution = new GenericRetry().setInitialInterval(interval).initExecution();

retryExecution.nextDelay();

Optional<java.time.Duration> remaining = retryExecution.remainingDurationUntilNextRetry();
assertThat(remaining).isPresent();
assertThat(remaining.get().toMillis()).isPositive().isLessThanOrEqualTo(interval);
}

@Test
void remainingDurationEmptyAfterIntervalElapsed() throws InterruptedException {
RetryExecution retryExecution = new GenericRetry().setInitialInterval(20).initExecution();

retryExecution.nextDelay();
Thread.sleep(60);

assertThat(retryExecution.remainingDurationUntilNextRetry()).isEmpty();
}

@Test
void remainingDurationReflectsUpdatedIntervalAfterSubsequentNextDelay() {
long initialInterval = 1000L;
double multiplier = 2.0;
RetryExecution retryExecution =
new GenericRetry()
.setInitialInterval(initialInterval)
.setIntervalMultiplier(multiplier)
.initExecution();

// first two calls keep the initial interval (multiplier only kicks in after attempt 1)
retryExecution.nextDelay();
retryExecution.nextDelay();
// third call doubles the interval to 2000ms
retryExecution.nextDelay();

Optional<java.time.Duration> remaining = retryExecution.remainingDurationUntilNextRetry();
assertThat(remaining).isPresent();
assertThat(remaining.get().toMillis())
.isPositive()
.isLessThanOrEqualTo((long) (initialInterval * multiplier));
}

public Optional<Long> callNextDelayNTimes(RetryExecution retryExecution, int n) {
Optional<Long> callNextDelayNTimes(RetryExecution retryExecution, int n) {
for (int i = 0; i < n; i++) {
retryExecution.nextDelay();
}
Expand Down
Loading
Loading