Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,34 @@ public async Task read_delegates_to_store()
Assert.Equal(1, store.ReadCalls);
}

[Fact]
public async Task read_seeds_latest_checkpoint()
{
var store = new FakeIndexCheckpointStore { Checkpoint = new IndexCheckpoint(20, 15) };
var writer = new IndexCheckpointWriter(store);

await writer.Read(CancellationToken.None);

var exception = Assert.Throws<InvalidOperationException>(() =>
writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5)));

Assert.Contains("backwards", exception.Message, StringComparison.OrdinalIgnoreCase);
}

[Fact]
public async Task read_discards_stale_pending_checkpoint()
{
var store = new FakeIndexCheckpointStore { Checkpoint = new IndexCheckpoint(20, 15) };
var writer = new IndexCheckpointWriter(store);

writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5));
await writer.Read(CancellationToken.None);
await writer.Commit(CancellationToken.None);

Assert.Equal(0, store.WriteCalls);
Assert.Equal(new IndexCheckpoint(20, 15), store.Checkpoint);
}

[Fact]
public async Task commit_before_tracking_is_no_op()
{
Expand Down Expand Up @@ -114,6 +142,113 @@ public async Task commit_passes_cancellation_token_to_store()
Assert.Equal(cancellation.Token, store.LastWriteToken);
}

[Fact]
public void track_rejects_regressive_pending_position()
{
var writer = new IndexCheckpointWriter(new InMemoryIndexCheckpointStore());

writer.Track(CreateResolvedEvent(commitPosition: 20, preparePosition: 15));

var exception = Assert.Throws<InvalidOperationException>(() =>
writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5)));

Assert.Contains("backwards", exception.Message, StringComparison.OrdinalIgnoreCase);
}

[Fact]
public async Task tracking_equal_position_is_idempotent_and_commits_once()
{
var store = new FakeIndexCheckpointStore();
var writer = new IndexCheckpointWriter(store);

writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5));
writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5));

await writer.Commit(CancellationToken.None);
await writer.Commit(CancellationToken.None);

Assert.Equal(1, store.WriteCalls);
Assert.Equal(new IndexCheckpoint(10, 5), store.Checkpoint);
}

[Fact]
public async Task commit_failure_leaves_pending_for_retry()
{
var store = new FakeIndexCheckpointStore { FailWrite = true };
var writer = new IndexCheckpointWriter(store);

writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5));

await Assert.ThrowsAsync<InvalidOperationException>(() => writer.Commit(CancellationToken.None).AsTask());

Assert.Equal(1, store.WriteCalls);

store.FailWrite = false;

await writer.Commit(CancellationToken.None);

Assert.Equal(2, store.WriteCalls);
Assert.Equal(new IndexCheckpoint(10, 5), store.Checkpoint);
}

[Fact]
public async Task successful_commit_clears_pending_so_second_commit_is_no_op()
{
var store = new FakeIndexCheckpointStore();
var writer = new IndexCheckpointWriter(store);

writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5));

await writer.Commit(CancellationToken.None);
await writer.Commit(CancellationToken.None);

Assert.Equal(1, store.WriteCalls);
Assert.Equal(new IndexCheckpoint(10, 5), store.Checkpoint);
}

[Fact]
public async Task successful_commit_prevents_later_regression()
{
var writer = new IndexCheckpointWriter(new InMemoryIndexCheckpointStore());

writer.Track(CreateResolvedEvent(commitPosition: 20, preparePosition: 15));
await writer.Commit(CancellationToken.None);

var exception = Assert.Throws<InvalidOperationException>(() =>
writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5)));

Assert.Contains("backwards", exception.Message, StringComparison.OrdinalIgnoreCase);
}

[Fact]
public async Task tracking_higher_position_during_async_commit_preserves_pending_for_next_commit()
{
var store = new BlockingIndexCheckpointStore();
var writer = new IndexCheckpointWriter(store);

writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5));

var commitTask = writer.Commit(CancellationToken.None).AsTask();
await store.WaitForWriteStarted();

writer.Track(CreateResolvedEvent(commitPosition: 20, preparePosition: 15));

store.ReleaseWrite();
await commitTask;

Assert.Equal(1, store.WriteCalls);
Assert.Equal(new IndexCheckpoint(10, 5), store.LastWritten);

await writer.Commit(CancellationToken.None);

Assert.Equal(2, store.WriteCalls);
Assert.Equal(new IndexCheckpoint(20, 15), store.LastWritten);

await writer.Commit(CancellationToken.None);

Assert.Equal(2, store.WriteCalls);
}

private static ResolvedEvent CreateResolvedEvent(long commitPosition, long preparePosition, bool isSelfCommitted = true)
{
var flags = PrepareFlags.SingleWrite | PrepareFlags.Data;
Expand Down Expand Up @@ -145,6 +280,7 @@ private sealed class FakeIndexCheckpointStore : IIndexCheckpointStore
public IndexCheckpoint? Checkpoint { get; set; }
public bool CancelRead { get; init; }
public bool CancelWrite { get; init; }
public bool FailWrite { get; set; }
public int ReadCalls { get; private set; }
public int WriteCalls { get; private set; }
public CancellationToken LastReadToken { get; private set; }
Expand All @@ -167,6 +303,12 @@ public ValueTask Write(IndexCheckpoint checkpoint, CancellationToken token)
{
WriteCalls++;
LastWriteToken = token;

if (FailWrite)
{
throw new InvalidOperationException("Simulated index checkpoint write failure.");
}

Checkpoint = checkpoint;

if (CancelWrite)
Expand All @@ -177,4 +319,30 @@ public ValueTask Write(IndexCheckpoint checkpoint, CancellationToken token)
return ValueTask.CompletedTask;
}
}

private sealed class BlockingIndexCheckpointStore : IIndexCheckpointStore
{
private readonly TaskCompletionSource _writeStarted = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource _releaseWrite = new(TaskCreationOptions.RunContinuationsAsynchronously);

public IndexCheckpoint? LastWritten { get; private set; }
public int WriteCalls { get; private set; }

public ValueTask<IndexCheckpoint?> Read(CancellationToken token) =>
ValueTask.FromResult(LastWritten);

public async ValueTask Write(IndexCheckpoint checkpoint, CancellationToken token)
{
WriteCalls++;
_writeStarted.TrySetResult();

await _releaseWrite.Task;

LastWritten = checkpoint;
}

public Task WaitForWriteStarted() => _writeStarted.Task;

public void ReleaseWrite() => _releaseWrite.TrySetResult();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,43 @@ public sealed class IndexCheckpointWriter
{
private readonly IIndexCheckpointStore _store;
private readonly object _lock = new();
private IndexCheckpoint? _latestCheckpoint;
private IndexCheckpoint? _persistedCheckpoint;
private IndexCheckpoint? _pendingCheckpoint;

public IndexCheckpointWriter(IIndexCheckpointStore store)
{
_store = store ?? throw new ArgumentNullException(nameof(store));
}

public ValueTask<IndexCheckpoint?> Read(CancellationToken token) => _store.Read(token);
public async ValueTask<IndexCheckpoint?> Read(CancellationToken token)
{
var checkpoint = await _store.Read(token);
if (checkpoint is not { } latest)
{
return checkpoint;
}

lock (_lock)
{
if (IsAheadOfLatest(latest))
{
_latestCheckpoint = latest;
}

if (IsAheadOfPersisted(latest))
{
_persistedCheckpoint = latest;
}

if (_pendingCheckpoint is { } pending && !IsAheadOfPersisted(pending))
{
_pendingCheckpoint = null;
}
}

return checkpoint;
}

public void Track(ResolvedEvent resolvedEvent)
{
Expand All @@ -31,6 +60,24 @@ public void Track(ResolvedEvent resolvedEvent)

lock (_lock)
{
if (_latestCheckpoint is { } latest)
{
var latestPosition = latest.ToPosition();
var checkpointPosition = checkpoint.ToPosition();

if (checkpointPosition < latestPosition)
{
throw new InvalidOperationException(
$"Cannot track index checkpoint progress backwards from {latestPosition} to {checkpointPosition}.");
}

if (checkpointPosition == latestPosition)
{
return;
}
}

_latestCheckpoint = checkpoint;
Comment thread
cursor[bot] marked this conversation as resolved.
_pendingCheckpoint = checkpoint;
}
}
Expand All @@ -46,17 +93,39 @@ public async ValueTask Commit(CancellationToken token)
return;
}

if (!IsAheadOfPersisted(pending))
{
_pendingCheckpoint = null;
return;
}

checkpoint = pending;
}

await _store.Write(checkpoint, token);

lock (_lock)
{
if (IsAheadOfPersisted(checkpoint))
{
_persistedCheckpoint = checkpoint;
}

if (IsAheadOfLatest(checkpoint))
{
_latestCheckpoint = checkpoint;
}

if (_pendingCheckpoint == checkpoint)
{
_pendingCheckpoint = null;
}
}
}

private bool IsAheadOfLatest(IndexCheckpoint checkpoint) =>
_latestCheckpoint is not { } latest || checkpoint.ToPosition() > latest.ToPosition();

private bool IsAheadOfPersisted(IndexCheckpoint checkpoint) =>
_persistedCheckpoint is not { } persisted || checkpoint.ToPosition() > persisted.ToPosition();
}
Loading