diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexCheckpointWriterTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexCheckpointWriterTests.cs index 68cc70330..aaa000a02 100644 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexCheckpointWriterTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexCheckpointWriterTests.cs @@ -23,6 +23,34 @@ public async Task read_delegates_to_store() Assert.Equal(1, store.ReadCalls); } + [Fact] + public async Task read_seeds_latest_checkpoint() + { + var store = new FakeIndexCheckpointStore { Checkpoint = new IndexCheckpoint(20, 15) }; + var writer = new IndexCheckpointWriter(store); + + await writer.Read(CancellationToken.None); + + var exception = Assert.Throws(() => + writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5))); + + Assert.Contains("backwards", exception.Message, StringComparison.OrdinalIgnoreCase); + } + + [Fact] + public async Task read_discards_stale_pending_checkpoint() + { + var store = new FakeIndexCheckpointStore { Checkpoint = new IndexCheckpoint(20, 15) }; + var writer = new IndexCheckpointWriter(store); + + writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5)); + await writer.Read(CancellationToken.None); + await writer.Commit(CancellationToken.None); + + Assert.Equal(0, store.WriteCalls); + Assert.Equal(new IndexCheckpoint(20, 15), store.Checkpoint); + } + [Fact] public async Task commit_before_tracking_is_no_op() { @@ -114,6 +142,113 @@ public async Task commit_passes_cancellation_token_to_store() Assert.Equal(cancellation.Token, store.LastWriteToken); } + [Fact] + public void track_rejects_regressive_pending_position() + { + var writer = new IndexCheckpointWriter(new InMemoryIndexCheckpointStore()); + + writer.Track(CreateResolvedEvent(commitPosition: 20, preparePosition: 15)); + + var exception = Assert.Throws(() => + writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5))); + + Assert.Contains("backwards", exception.Message, StringComparison.OrdinalIgnoreCase); + } + + [Fact] + public async Task tracking_equal_position_is_idempotent_and_commits_once() + { + var store = new FakeIndexCheckpointStore(); + var writer = new IndexCheckpointWriter(store); + + writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5)); + writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5)); + + await writer.Commit(CancellationToken.None); + await writer.Commit(CancellationToken.None); + + Assert.Equal(1, store.WriteCalls); + Assert.Equal(new IndexCheckpoint(10, 5), store.Checkpoint); + } + + [Fact] + public async Task commit_failure_leaves_pending_for_retry() + { + var store = new FakeIndexCheckpointStore { FailWrite = true }; + var writer = new IndexCheckpointWriter(store); + + writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5)); + + await Assert.ThrowsAsync(() => writer.Commit(CancellationToken.None).AsTask()); + + Assert.Equal(1, store.WriteCalls); + + store.FailWrite = false; + + await writer.Commit(CancellationToken.None); + + Assert.Equal(2, store.WriteCalls); + Assert.Equal(new IndexCheckpoint(10, 5), store.Checkpoint); + } + + [Fact] + public async Task successful_commit_clears_pending_so_second_commit_is_no_op() + { + var store = new FakeIndexCheckpointStore(); + var writer = new IndexCheckpointWriter(store); + + writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5)); + + await writer.Commit(CancellationToken.None); + await writer.Commit(CancellationToken.None); + + Assert.Equal(1, store.WriteCalls); + Assert.Equal(new IndexCheckpoint(10, 5), store.Checkpoint); + } + + [Fact] + public async Task successful_commit_prevents_later_regression() + { + var writer = new IndexCheckpointWriter(new InMemoryIndexCheckpointStore()); + + writer.Track(CreateResolvedEvent(commitPosition: 20, preparePosition: 15)); + await writer.Commit(CancellationToken.None); + + var exception = Assert.Throws(() => + writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5))); + + Assert.Contains("backwards", exception.Message, StringComparison.OrdinalIgnoreCase); + } + + [Fact] + public async Task tracking_higher_position_during_async_commit_preserves_pending_for_next_commit() + { + var store = new BlockingIndexCheckpointStore(); + var writer = new IndexCheckpointWriter(store); + + writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5)); + + var commitTask = writer.Commit(CancellationToken.None).AsTask(); + await store.WaitForWriteStarted(); + + writer.Track(CreateResolvedEvent(commitPosition: 20, preparePosition: 15)); + + store.ReleaseWrite(); + await commitTask; + + Assert.Equal(1, store.WriteCalls); + Assert.Equal(new IndexCheckpoint(10, 5), store.LastWritten); + + await writer.Commit(CancellationToken.None); + + Assert.Equal(2, store.WriteCalls); + Assert.Equal(new IndexCheckpoint(20, 15), store.LastWritten); + + await writer.Commit(CancellationToken.None); + + Assert.Equal(2, store.WriteCalls); + } + private static ResolvedEvent CreateResolvedEvent(long commitPosition, long preparePosition, bool isSelfCommitted = true) { var flags = PrepareFlags.SingleWrite | PrepareFlags.Data; @@ -145,6 +280,7 @@ private sealed class FakeIndexCheckpointStore : IIndexCheckpointStore public IndexCheckpoint? Checkpoint { get; set; } public bool CancelRead { get; init; } public bool CancelWrite { get; init; } + public bool FailWrite { get; set; } public int ReadCalls { get; private set; } public int WriteCalls { get; private set; } public CancellationToken LastReadToken { get; private set; } @@ -167,6 +303,12 @@ public ValueTask Write(IndexCheckpoint checkpoint, CancellationToken token) { WriteCalls++; LastWriteToken = token; + + if (FailWrite) + { + throw new InvalidOperationException("Simulated index checkpoint write failure."); + } + Checkpoint = checkpoint; if (CancelWrite) @@ -177,4 +319,30 @@ public ValueTask Write(IndexCheckpoint checkpoint, CancellationToken token) return ValueTask.CompletedTask; } } + + private sealed class BlockingIndexCheckpointStore : IIndexCheckpointStore + { + private readonly TaskCompletionSource _writeStarted = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _releaseWrite = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public IndexCheckpoint? LastWritten { get; private set; } + public int WriteCalls { get; private set; } + + public ValueTask Read(CancellationToken token) => + ValueTask.FromResult(LastWritten); + + public async ValueTask Write(IndexCheckpoint checkpoint, CancellationToken token) + { + WriteCalls++; + _writeStarted.TrySetResult(); + + await _releaseWrite.Task; + + LastWritten = checkpoint; + } + + public Task WaitForWriteStarted() => _writeStarted.Task; + + public void ReleaseWrite() => _releaseWrite.TrySetResult(); + } } diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpointWriter.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpointWriter.cs index d48b5fcb6..4652d60f1 100644 --- a/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpointWriter.cs +++ b/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpointWriter.cs @@ -9,6 +9,8 @@ public sealed class IndexCheckpointWriter { private readonly IIndexCheckpointStore _store; private readonly object _lock = new(); + private IndexCheckpoint? _latestCheckpoint; + private IndexCheckpoint? _persistedCheckpoint; private IndexCheckpoint? _pendingCheckpoint; public IndexCheckpointWriter(IIndexCheckpointStore store) @@ -16,7 +18,34 @@ public IndexCheckpointWriter(IIndexCheckpointStore store) _store = store ?? throw new ArgumentNullException(nameof(store)); } - public ValueTask Read(CancellationToken token) => _store.Read(token); + public async ValueTask Read(CancellationToken token) + { + var checkpoint = await _store.Read(token); + if (checkpoint is not { } latest) + { + return checkpoint; + } + + lock (_lock) + { + if (IsAheadOfLatest(latest)) + { + _latestCheckpoint = latest; + } + + if (IsAheadOfPersisted(latest)) + { + _persistedCheckpoint = latest; + } + + if (_pendingCheckpoint is { } pending && !IsAheadOfPersisted(pending)) + { + _pendingCheckpoint = null; + } + } + + return checkpoint; + } public void Track(ResolvedEvent resolvedEvent) { @@ -31,6 +60,24 @@ public void Track(ResolvedEvent resolvedEvent) lock (_lock) { + if (_latestCheckpoint is { } latest) + { + var latestPosition = latest.ToPosition(); + var checkpointPosition = checkpoint.ToPosition(); + + if (checkpointPosition < latestPosition) + { + throw new InvalidOperationException( + $"Cannot track index checkpoint progress backwards from {latestPosition} to {checkpointPosition}."); + } + + if (checkpointPosition == latestPosition) + { + return; + } + } + + _latestCheckpoint = checkpoint; _pendingCheckpoint = checkpoint; } } @@ -46,6 +93,12 @@ public async ValueTask Commit(CancellationToken token) return; } + if (!IsAheadOfPersisted(pending)) + { + _pendingCheckpoint = null; + return; + } + checkpoint = pending; } @@ -53,10 +106,26 @@ public async ValueTask Commit(CancellationToken token) lock (_lock) { + if (IsAheadOfPersisted(checkpoint)) + { + _persistedCheckpoint = checkpoint; + } + + if (IsAheadOfLatest(checkpoint)) + { + _latestCheckpoint = checkpoint; + } + if (_pendingCheckpoint == checkpoint) { _pendingCheckpoint = null; } } } + + private bool IsAheadOfLatest(IndexCheckpoint checkpoint) => + _latestCheckpoint is not { } latest || checkpoint.ToPosition() > latest.ToPosition(); + + private bool IsAheadOfPersisted(IndexCheckpoint checkpoint) => + _persistedCheckpoint is not { } persisted || checkpoint.ToPosition() > persisted.ToPosition(); }