From 7690ef6abdedb599ac73165c11f5f522d48e5b22 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Thu, 25 Jun 2026 03:04:04 -0400 Subject: [PATCH 1/8] feat(indexing): support durable event processing Signed-off-by: Yordis Prieto --- .../Indexing/IndexingSubscriptionTests.cs | 244 ++++++++++++++++++ .../Storage/Indexing/IIndexingComponent.cs | 22 ++ .../Storage/Indexing/IIndexingEventSource.cs | 48 ++++ .../Storage/Indexing/IndexCheckpoint.cs | 27 ++ .../Storage/Indexing/IndexingService.cs | 32 +++ .../Storage/Indexing/IndexingSubscription.cs | 156 +++++++++++ 6 files changed, 529 insertions(+) create mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs create mode 100644 src/EventStore.Core/Services/Storage/Indexing/IIndexingComponent.cs create mode 100644 src/EventStore.Core/Services/Storage/Indexing/IIndexingEventSource.cs create mode 100644 src/EventStore.Core/Services/Storage/Indexing/IndexCheckpoint.cs create mode 100644 src/EventStore.Core/Services/Storage/Indexing/IndexingService.cs create mode 100644 src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs new file mode 100644 index 000000000..77e163ac3 --- /dev/null +++ b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs @@ -0,0 +1,244 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using EventStore.Core.Data; +using EventStore.Core.Services.Storage.Indexing; +using EventStore.Core.Services.Transport.Common; +using EventStore.Core.Services.Transport.Enumerators; +using EventStore.Core.TransactionLog.LogRecords; +using Xunit; + +namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; + +public class IndexingSubscriptionTests +{ + [Fact] + public async Task starts_from_component_checkpoint() + { + var checkpoint = new IndexCheckpoint(10, 5); + var component = new FakeIndexingComponent(checkpoint); + var eventSource = new FakeIndexingEventSource(); + var eventSources = new FakeIndexingEventSourceFactory(eventSource); + await using var subscription = new IndexingSubscription( + component, + eventSources, + IndexingSubscriptionOptions.Default); + + await subscription.Start(CancellationToken.None); + + Assert.Equal(checkpoint, eventSources.Checkpoint); + } + + [Fact] + public async Task indexes_events_from_source() + { + var first = CreateResolvedEvent(1); + var second = CreateResolvedEvent(2); + var component = new FakeIndexingComponent(); + var eventSource = new FakeIndexingEventSource( + new ReadResponse.EventReceived(first), + new ReadResponse.SubscriptionCaughtUp(new TFPos(1, 1)), + new ReadResponse.EventReceived(second)); + await using var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + new IndexingSubscriptionOptions(2, TimeSpan.FromSeconds(30))); + + await subscription.Start(CancellationToken.None); + await component.Processor.WaitForIndexed(2); + + Assert.Equal(new[] { first, second }, component.Processor.Indexed); + } + + [Fact] + public async Task commits_when_batch_size_is_reached() + { + var component = new FakeIndexingComponent(); + var eventSource = new FakeIndexingEventSource( + new ReadResponse.EventReceived(CreateResolvedEvent(1)), + new ReadResponse.EventReceived(CreateResolvedEvent(2))); + await using var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + new IndexingSubscriptionOptions(2, TimeSpan.FromSeconds(30))); + + await subscription.Start(CancellationToken.None); + await component.Processor.WaitForCommits(1); + + Assert.Equal(1, component.Processor.CommitCount); + } + + [Fact] + public async Task commits_when_delay_elapses() + { + var component = new FakeIndexingComponent(); + var eventSource = new FakeIndexingEventSource(new ReadResponse.EventReceived(CreateResolvedEvent(1))); + await using var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + new IndexingSubscriptionOptions(100, TimeSpan.FromMilliseconds(25))); + + await subscription.Start(CancellationToken.None); + await component.Processor.WaitForCommits(1); + + Assert.Equal(1, component.Processor.CommitCount); + } + + [Fact] + public async Task commits_pending_events_when_disposed() + { + var component = new FakeIndexingComponent(); + var eventSource = new FakeIndexingEventSource(new ReadResponse.EventReceived(CreateResolvedEvent(1))); + var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + new IndexingSubscriptionOptions(100, TimeSpan.FromSeconds(30))); + + await subscription.Start(CancellationToken.None); + await component.Processor.WaitForIndexed(1); + await subscription.DisposeAsync(); + + Assert.Equal(1, component.Processor.CommitCount); + Assert.True(component.Disposed); + Assert.True(eventSource.Disposed); + } + + private static ResolvedEvent CreateResolvedEvent(long number) + { + var record = new EventRecord( + number, + number, + Guid.NewGuid(), + Guid.NewGuid(), + number, + 0, + $"stream-{number}", + number - 1, + DateTime.UtcNow, + PrepareFlags.SingleWrite | PrepareFlags.IsCommitted | PrepareFlags.Data, + $"event-{number}", + Array.Empty(), + Array.Empty()); + + return ResolvedEvent.ForUnresolvedEvent(record, number); + } + + private sealed class FakeIndexingComponent(IndexCheckpoint? checkpoint = null) : IIndexingComponent + { + public FakeIndexingProcessor Processor { get; } = new(); + + IIndexingProcessor IIndexingComponent.Processor => Processor; + + public bool Disposed { get; private set; } + + public ValueTask Initialize(CancellationToken token) => ValueTask.CompletedTask; + + public ValueTask ReadCheckpoint(CancellationToken token) => ValueTask.FromResult(checkpoint); + + public ValueTask DisposeAsync() + { + Disposed = true; + return ValueTask.CompletedTask; + } + } + + private sealed class FakeIndexingProcessor : IIndexingProcessor + { + private readonly List _indexed = []; + private readonly TaskCompletionSource _indexedEvents = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _committed = new(TaskCreationOptions.RunContinuationsAsynchronously); + private int _commitCount; + private int _waitForIndexed; + private int _waitForCommits; + + public IReadOnlyList Indexed => _indexed; + + public int CommitCount => Volatile.Read(ref _commitCount); + + public ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken token) + { + lock (_indexed) + { + _indexed.Add(resolvedEvent); + if (_indexed.Count >= Volatile.Read(ref _waitForIndexed)) + { + _indexedEvents.TrySetResult(); + } + } + + return ValueTask.CompletedTask; + } + + public ValueTask Commit(CancellationToken token) + { + if (Interlocked.Increment(ref _commitCount) >= Volatile.Read(ref _waitForCommits)) + { + _committed.TrySetResult(); + } + + return ValueTask.CompletedTask; + } + + public Task WaitForIndexed(int count) + { + lock (_indexed) + { + _waitForIndexed = count; + if (_indexed.Count >= count) + { + return Task.CompletedTask; + } + } + + return _indexedEvents.Task.WaitAsync(Timeout); + } + + public Task WaitForCommits(int count) + { + _waitForCommits = count; + return CommitCount >= count + ? Task.CompletedTask + : _committed.Task.WaitAsync(Timeout); + } + } + + private sealed class FakeIndexingEventSourceFactory(FakeIndexingEventSource source) : IIndexingEventSourceFactory + { + public IndexCheckpoint? Checkpoint { get; private set; } + + public IIndexingEventSource Create(IndexCheckpoint? checkpoint, CancellationToken token) + { + Checkpoint = checkpoint; + return source; + } + } + + private sealed class FakeIndexingEventSource(params ReadResponse[] responses) : IIndexingEventSource + { + private readonly Queue _responses = new(responses); + + public ReadResponse Current { get; private set; } + + public bool Disposed { get; private set; } + + public ValueTask MoveNextAsync() + { + if (!_responses.TryDequeue(out var response)) + { + return ValueTask.FromResult(false); + } + + Current = response; + return ValueTask.FromResult(true); + } + + public ValueTask DisposeAsync() + { + Disposed = true; + return ValueTask.CompletedTask; + } + } + + private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(5); +} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IIndexingComponent.cs b/src/EventStore.Core/Services/Storage/Indexing/IIndexingComponent.cs new file mode 100644 index 000000000..ca7cbea36 --- /dev/null +++ b/src/EventStore.Core/Services/Storage/Indexing/IIndexingComponent.cs @@ -0,0 +1,22 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using EventStore.Core.Data; + +namespace EventStore.Core.Services.Storage.Indexing; + +public interface IIndexingComponent : IAsyncDisposable +{ + ValueTask Initialize(CancellationToken token); + + ValueTask ReadCheckpoint(CancellationToken token); + + IIndexingProcessor Processor { get; } +} + +public interface IIndexingProcessor +{ + ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken token); + + ValueTask Commit(CancellationToken token); +} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IIndexingEventSource.cs b/src/EventStore.Core/Services/Storage/Indexing/IIndexingEventSource.cs new file mode 100644 index 000000000..5107efcb6 --- /dev/null +++ b/src/EventStore.Core/Services/Storage/Indexing/IIndexingEventSource.cs @@ -0,0 +1,48 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using EventStore.Core.Bus; +using EventStore.Core.Services.Storage.ReaderIndex; +using EventStore.Core.Services.Transport.Common; +using EventStore.Core.Services.Transport.Enumerators; +using EventStore.Core.Services.UserManagement; + +namespace EventStore.Core.Services.Storage.Indexing; + +public interface IIndexingEventSource : IAsyncDisposable +{ + ReadResponse Current { get; } + + ValueTask MoveNextAsync(); +} + +public interface IIndexingEventSourceFactory +{ + IIndexingEventSource Create(IndexCheckpoint? checkpoint, CancellationToken token); +} + +public sealed class AllStreamsIndexingEventSourceFactory(IPublisher publisher) : IIndexingEventSourceFactory +{ + public IIndexingEventSource Create(IndexCheckpoint? checkpoint, CancellationToken token) + { + var subscription = new Enumerator.AllSubscription( + publisher, + new DefaultExpiryStrategy(), + checkpoint?.ToPosition(), + resolveLinks: false, + SystemAccounts.System, + requiresLeader: false, + token); + + return new AllStreamsIndexingEventSource(subscription); + } + + private sealed class AllStreamsIndexingEventSource(Enumerator.AllSubscription subscription) : IIndexingEventSource + { + public ReadResponse Current => subscription.Current; + + public ValueTask MoveNextAsync() => subscription.MoveNextAsync(); + + public ValueTask DisposeAsync() => subscription.DisposeAsync(); + } +} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpoint.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpoint.cs new file mode 100644 index 000000000..be3dd478e --- /dev/null +++ b/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpoint.cs @@ -0,0 +1,27 @@ +using System; +using EventStore.Core.Services.Transport.Common; + +namespace EventStore.Core.Services.Storage.Indexing; + +public readonly record struct IndexCheckpoint +{ + public long CommitPosition { get; } + public long PreparePosition { get; } + + public IndexCheckpoint(long commitPosition, long preparePosition) + { + ArgumentOutOfRangeException.ThrowIfNegative(commitPosition); + ArgumentOutOfRangeException.ThrowIfNegative(preparePosition); + + if (commitPosition < preparePosition) + { + throw new ArgumentOutOfRangeException(nameof(CommitPosition), + "The commit position cannot be less than the prepare position."); + } + + CommitPosition = commitPosition; + PreparePosition = preparePosition; + } + + public Position ToPosition() => Position.FromInt64(CommitPosition, PreparePosition); +} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexingService.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexingService.cs new file mode 100644 index 000000000..78298c8bb --- /dev/null +++ b/src/EventStore.Core/Services/Storage/Indexing/IndexingService.cs @@ -0,0 +1,32 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using EventStore.Core.Bus; +using EventStore.Core.Messages; + +namespace EventStore.Core.Services.Storage.Indexing; + +public sealed class IndexingService : IAsyncHandle, IAsyncHandle, IAsyncDisposable +{ + private readonly IndexingSubscription _subscription; + + public IndexingService( + IIndexingComponent component, + IIndexingEventSourceFactory eventSourceFactory, + ISubscriber subscriber, + IndexingSubscriptionOptions options) + { + _subscription = new IndexingSubscription(component, eventSourceFactory, options); + + subscriber.Subscribe(this); + subscriber.Subscribe(this); + } + + public ValueTask HandleAsync(SystemMessage.SystemReady message, CancellationToken token) => + _subscription.Start(token); + + public ValueTask HandleAsync(SystemMessage.BecomeShuttingDown message, CancellationToken token) => + _subscription.DisposeAsync(); + + public ValueTask DisposeAsync() => _subscription.DisposeAsync(); +} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs new file mode 100644 index 000000000..caefd7764 --- /dev/null +++ b/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs @@ -0,0 +1,156 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using EventStore.Core.Services.Transport.Enumerators; +using Serilog; + +namespace EventStore.Core.Services.Storage.Indexing; + +public sealed record IndexingSubscriptionOptions +{ + public static readonly IndexingSubscriptionOptions Default = new(50000, TimeSpan.FromSeconds(10)); + + public int CheckpointCommitBatchSize { get; } + public TimeSpan CheckpointCommitDelay { get; } + + public IndexingSubscriptionOptions(int checkpointCommitBatchSize, TimeSpan checkpointCommitDelay) + { + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(checkpointCommitBatchSize); + ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(checkpointCommitDelay, TimeSpan.Zero); + + CheckpointCommitBatchSize = checkpointCommitBatchSize; + CheckpointCommitDelay = checkpointCommitDelay; + } +} + +public sealed class IndexingSubscription( + IIndexingComponent component, + IIndexingEventSourceFactory eventSourceFactory, + IndexingSubscriptionOptions options) : IAsyncDisposable +{ + private static readonly ILogger Log = Serilog.Log.ForContext(); + + private readonly CancellationTokenSource _stop = new(); + private readonly object _stateLock = new(); + + private IIndexingEventSource _eventSource; + private IndexCheckpointCommitTracker _commitTracker; + private Task _processing; + private bool _started; + private bool _disposed; + + public async ValueTask Start(CancellationToken token) + { + lock (_stateLock) + { + ObjectDisposedException.ThrowIf(_disposed, this); + if (_started) + { + throw new InvalidOperationException($"{nameof(IndexingSubscription)} has already been started."); + } + + _started = true; + } + + using var linked = CancellationTokenSource.CreateLinkedTokenSource(token, _stop.Token); + + await component.Initialize(linked.Token); + var checkpoint = await component.ReadCheckpoint(linked.Token); + + _commitTracker = new IndexCheckpointCommitTracker( + options.CheckpointCommitBatchSize, + options.CheckpointCommitDelay, + component.Processor.Commit, + _stop.Token); + + _eventSource = eventSourceFactory.Create(checkpoint, _stop.Token); + _processing = Task.Run(ProcessEvents); + } + + public async ValueTask DisposeAsync() + { + Task processing; + IIndexingEventSource eventSource; + IndexCheckpointCommitTracker commitTracker; + + lock (_stateLock) + { + if (_disposed) + { + return; + } + + _disposed = true; + processing = _processing; + eventSource = _eventSource; + commitTracker = _commitTracker; + } + + await _stop.CancelAsync(); + + if (processing is not null) + { + try + { + await processing; + } + catch (OperationCanceledException) + { + } + } + + if (commitTracker is not null) + { + await commitTracker.DisposeAsync(); + } + + if (eventSource is not null) + { + await eventSource.DisposeAsync(); + } + + await component.DisposeAsync(); + _stop.Dispose(); + } + + private async Task ProcessEvents() + { + while (!_stop.IsCancellationRequested) + { + bool hasNext; + try + { + hasNext = await _eventSource.MoveNextAsync(); + } + catch (OperationCanceledException) when (_stop.IsCancellationRequested) + { + return; + } + + if (!hasNext) + { + return; + } + + if (_eventSource.Current is not ReadResponse.EventReceived eventReceived) + { + continue; + } + + try + { + await component.Processor.Index(eventReceived.Event, _stop.Token); + _commitTracker.Track(); + } + catch (OperationCanceledException) when (_stop.IsCancellationRequested) + { + return; + } + catch (Exception ex) + { + Log.Error(ex, "Error while indexing event {eventType}", eventReceived.Event.Event.EventType); + throw; + } + } + } +} From fa94d8c8bf4d5106ca860a407b0e602613e7b9a2 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Thu, 25 Jun 2026 03:30:46 -0400 Subject: [PATCH 2/8] fix(indexing): keep lifecycle recovery safe Signed-off-by: Yordis Prieto --- .../Indexing/IndexingSubscriptionTests.cs | 76 ++++++++++- .../Storage/Indexing/IndexingSubscription.cs | 126 +++++++++++++++--- 2 files changed, 175 insertions(+), 27 deletions(-) diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs index 77e163ac3..705949b28 100644 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs @@ -30,6 +30,21 @@ public async Task starts_from_component_checkpoint() Assert.Equal(checkpoint, eventSources.Checkpoint); } + [Fact] + public async Task can_start_again_after_startup_failure() + { + var component = new FakeIndexingComponent(initializeFailures: 1); + var eventSource = new FakeIndexingEventSource(); + await using var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + IndexingSubscriptionOptions.Default); + + await Assert.ThrowsAsync(() => subscription.Start(CancellationToken.None).AsTask()); + + await subscription.Start(CancellationToken.None); + } + [Fact] public async Task indexes_events_from_source() { @@ -104,6 +119,25 @@ public async Task commits_pending_events_when_disposed() Assert.True(eventSource.Disposed); } + [Fact] + public async Task cleans_up_after_event_source_completion_faults_worker() + { + var component = new FakeIndexingComponent(); + var eventSource = new FakeIndexingEventSource(completeWhenDrained: true); + var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + IndexingSubscriptionOptions.Default); + + await subscription.Start(CancellationToken.None); + await eventSource.WaitForDrained(); + var exception = await Assert.ThrowsAsync(() => subscription.DisposeAsync().AsTask()); + + Assert.Contains("completed unexpectedly", exception.Message); + Assert.True(component.Disposed); + Assert.True(eventSource.Disposed); + } + private static ResolvedEvent CreateResolvedEvent(long number) { var record = new EventRecord( @@ -124,15 +158,27 @@ private static ResolvedEvent CreateResolvedEvent(long number) return ResolvedEvent.ForUnresolvedEvent(record, number); } - private sealed class FakeIndexingComponent(IndexCheckpoint? checkpoint = null) : IIndexingComponent + private sealed class FakeIndexingComponent( + IndexCheckpoint? checkpoint = null, + int initializeFailures = 0) : IIndexingComponent { + private int _initializeFailures = initializeFailures; + public FakeIndexingProcessor Processor { get; } = new(); IIndexingProcessor IIndexingComponent.Processor => Processor; public bool Disposed { get; private set; } - public ValueTask Initialize(CancellationToken token) => ValueTask.CompletedTask; + public ValueTask Initialize(CancellationToken token) + { + if (Interlocked.Decrement(ref _initializeFailures) >= 0) + { + throw new InvalidOperationException("initialize failed"); + } + + return ValueTask.CompletedTask; + } public ValueTask ReadCheckpoint(CancellationToken token) => ValueTask.FromResult(checkpoint); @@ -210,6 +256,7 @@ private sealed class FakeIndexingEventSourceFactory(FakeIndexingEventSource sour public IIndexingEventSource Create(IndexCheckpoint? checkpoint, CancellationToken token) { Checkpoint = checkpoint; + source.Bind(token); return source; } } @@ -217,20 +264,39 @@ public IIndexingEventSource Create(IndexCheckpoint? checkpoint, CancellationToke private sealed class FakeIndexingEventSource(params ReadResponse[] responses) : IIndexingEventSource { private readonly Queue _responses = new(responses); + private readonly TaskCompletionSource _drained = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly bool _completeWhenDrained; + private CancellationToken _token; + + public FakeIndexingEventSource(bool completeWhenDrained, params ReadResponse[] responses) : this(responses) + { + _completeWhenDrained = completeWhenDrained; + } public ReadResponse Current { get; private set; } public bool Disposed { get; private set; } - public ValueTask MoveNextAsync() + public void Bind(CancellationToken token) => _token = token; + + public Task WaitForDrained() => _drained.Task.WaitAsync(Timeout); + + public async ValueTask MoveNextAsync() { if (!_responses.TryDequeue(out var response)) { - return ValueTask.FromResult(false); + if (_completeWhenDrained) + { + _drained.TrySetResult(); + return false; + } + + await Task.Delay(global::System.Threading.Timeout.InfiniteTimeSpan, _token); + return false; } Current = response; - return ValueTask.FromResult(true); + return true; } public ValueTask DisposeAsync() diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs index caefd7764..6b8a1dfce 100644 --- a/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs +++ b/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs @@ -1,4 +1,5 @@ using System; +using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; using EventStore.Core.Services.Transport.Enumerators; @@ -36,6 +37,7 @@ public sealed class IndexingSubscription( private IIndexingEventSource _eventSource; private IndexCheckpointCommitTracker _commitTracker; private Task _processing; + private bool _starting; private bool _started; private bool _disposed; @@ -44,27 +46,57 @@ public async ValueTask Start(CancellationToken token) lock (_stateLock) { ObjectDisposedException.ThrowIf(_disposed, this); - if (_started) + if (_started || _starting) { throw new InvalidOperationException($"{nameof(IndexingSubscription)} has already been started."); } - _started = true; + _starting = true; } using var linked = CancellationTokenSource.CreateLinkedTokenSource(token, _stop.Token); - await component.Initialize(linked.Token); - var checkpoint = await component.ReadCheckpoint(linked.Token); + IIndexingEventSource eventSource = null; + IndexCheckpointCommitTracker commitTracker = null; - _commitTracker = new IndexCheckpointCommitTracker( - options.CheckpointCommitBatchSize, - options.CheckpointCommitDelay, - component.Processor.Commit, - _stop.Token); + try + { + await component.Initialize(linked.Token); + var checkpoint = await component.ReadCheckpoint(linked.Token); + + commitTracker = new IndexCheckpointCommitTracker( + options.CheckpointCommitBatchSize, + options.CheckpointCommitDelay, + component.Processor.Commit, + CancellationToken.None); + + eventSource = eventSourceFactory.Create(checkpoint, _stop.Token); + + lock (_stateLock) + { + ObjectDisposedException.ThrowIf(_disposed, this); + + _commitTracker = commitTracker; + _eventSource = eventSource; + _processing = Task.Run(ProcessEvents); + ObserveProcessingFault(_processing); + _started = true; + _starting = false; + } + } + catch + { + lock (_stateLock) + _starting = false; + + if (commitTracker is not null) + await commitTracker.DisposeAsync(); - _eventSource = eventSourceFactory.Create(checkpoint, _stop.Token); - _processing = Task.Run(ProcessEvents); + if (eventSource is not null) + await eventSource.DisposeAsync(); + + throw; + } } public async ValueTask DisposeAsync() @@ -88,29 +120,79 @@ public async ValueTask DisposeAsync() await _stop.CancelAsync(); - if (processing is not null) + Exception failure = null; + + try { - try + if (processing is not null) { - await processing; + try + { + await processing; + } + catch (OperationCanceledException) when (_stop.IsCancellationRequested) + { + } } - catch (OperationCanceledException) + } + catch (Exception ex) + { + failure = ex; + } + + try + { + if (commitTracker is not null) { + await commitTracker.DisposeAsync(); } } + catch (Exception ex) when (failure is null) + { + failure = ex; + } - if (commitTracker is not null) + try { - await commitTracker.DisposeAsync(); + if (eventSource is not null) + { + await eventSource.DisposeAsync(); + } + } + catch (Exception ex) when (failure is null) + { + failure = ex; } - if (eventSource is not null) + try + { + await component.DisposeAsync(); + } + catch (Exception ex) when (failure is null) { - await eventSource.DisposeAsync(); + failure = ex; } - await component.DisposeAsync(); - _stop.Dispose(); + try + { + _stop.Dispose(); + } + catch (Exception ex) when (failure is null) + { + failure = ex; + } + + if (failure is not null) + ExceptionDispatchInfo.Capture(failure).Throw(); + } + + private static void ObserveProcessingFault(Task processing) + { + processing.ContinueWith( + static task => Log.Error(task.Exception, "Indexing subscription stopped unexpectedly"), + CancellationToken.None, + TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); } private async Task ProcessEvents() @@ -129,7 +211,7 @@ private async Task ProcessEvents() if (!hasNext) { - return; + throw new InvalidOperationException("Indexing event source completed unexpectedly."); } if (_eventSource.Current is not ReadResponse.EventReceived eventReceived) From e660486053d33e2519112f9692d130682def28e8 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Thu, 25 Jun 2026 03:32:34 -0400 Subject: [PATCH 3/8] fix(indexing): preserve in-flight checkpoints Signed-off-by: Yordis Prieto --- .../Indexing/IndexingSubscriptionTests.cs | 43 ++++++++++++++++--- .../Storage/Indexing/IndexingSubscription.cs | 2 +- 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs index 705949b28..804baecbe 100644 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs @@ -119,6 +119,28 @@ public async Task commits_pending_events_when_disposed() Assert.True(eventSource.Disposed); } + [Fact] + public async Task commits_in_flight_event_when_disposed() + { + var component = new FakeIndexingComponent(pauseIndexCompletion: true); + var eventSource = new FakeIndexingEventSource(new ReadResponse.EventReceived(CreateResolvedEvent(1))); + var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + new IndexingSubscriptionOptions(100, TimeSpan.FromSeconds(30))); + + await subscription.Start(CancellationToken.None); + await component.Processor.WaitForIndexEntered(); + var disposal = subscription.DisposeAsync().AsTask(); + + component.Processor.ReleaseIndex(); + await disposal.WaitAsync(Timeout); + + Assert.Equal(1, component.Processor.CommitCount); + Assert.True(component.Disposed); + Assert.True(eventSource.Disposed); + } + [Fact] public async Task cleans_up_after_event_source_completion_faults_worker() { @@ -160,11 +182,12 @@ private static ResolvedEvent CreateResolvedEvent(long number) private sealed class FakeIndexingComponent( IndexCheckpoint? checkpoint = null, - int initializeFailures = 0) : IIndexingComponent + int initializeFailures = 0, + bool pauseIndexCompletion = false) : IIndexingComponent { private int _initializeFailures = initializeFailures; - public FakeIndexingProcessor Processor { get; } = new(); + public FakeIndexingProcessor Processor { get; } = new(pauseIndexCompletion); IIndexingProcessor IIndexingComponent.Processor => Processor; @@ -189,11 +212,13 @@ public ValueTask DisposeAsync() } } - private sealed class FakeIndexingProcessor : IIndexingProcessor + private sealed class FakeIndexingProcessor(bool pauseIndexCompletion = false) : IIndexingProcessor { private readonly List _indexed = []; private readonly TaskCompletionSource _indexedEvents = new(TaskCreationOptions.RunContinuationsAsynchronously); private readonly TaskCompletionSource _committed = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _indexEntered = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _releaseIndex = new(TaskCreationOptions.RunContinuationsAsynchronously); private int _commitCount; private int _waitForIndexed; private int _waitForCommits; @@ -202,7 +227,7 @@ private sealed class FakeIndexingProcessor : IIndexingProcessor public int CommitCount => Volatile.Read(ref _commitCount); - public ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken token) + public async ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken token) { lock (_indexed) { @@ -213,7 +238,11 @@ public ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken token) } } - return ValueTask.CompletedTask; + if (pauseIndexCompletion) + { + _indexEntered.TrySetResult(); + await _releaseIndex.Task.WaitAsync(token); + } } public ValueTask Commit(CancellationToken token) @@ -247,6 +276,10 @@ public Task WaitForCommits(int count) ? Task.CompletedTask : _committed.Task.WaitAsync(Timeout); } + + public Task WaitForIndexEntered() => _indexEntered.Task.WaitAsync(Timeout); + + public void ReleaseIndex() => _releaseIndex.TrySetResult(); } private sealed class FakeIndexingEventSourceFactory(FakeIndexingEventSource source) : IIndexingEventSourceFactory diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs index 6b8a1dfce..3cb67ffcf 100644 --- a/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs +++ b/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs @@ -221,7 +221,7 @@ private async Task ProcessEvents() try { - await component.Processor.Index(eventReceived.Event, _stop.Token); + await component.Processor.Index(eventReceived.Event, CancellationToken.None); _commitTracker.Track(); } catch (OperationCanceledException) when (_stop.IsCancellationRequested) From 1eac8b710a782c49012481c87ca096a6de3e5b18 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Thu, 25 Jun 2026 03:45:45 -0400 Subject: [PATCH 4/8] fix(tests): reduce scheduler timing flake Signed-off-by: Yordis Prieto --- .../TimerService/ThreadBasedSchedulerTests.cs | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/EventStore.Core.XUnit.Tests/Services/TimerService/ThreadBasedSchedulerTests.cs b/src/EventStore.Core.XUnit.Tests/Services/TimerService/ThreadBasedSchedulerTests.cs index 0f050a043..7dbb7e6fe 100644 --- a/src/EventStore.Core.XUnit.Tests/Services/TimerService/ThreadBasedSchedulerTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Services/TimerService/ThreadBasedSchedulerTests.cs @@ -12,6 +12,8 @@ namespace EventStore.Core.XUnit.Tests.Services.TimerService; public class ThreadBasedSchedulerTests { + private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(5); + [Fact] public async Task task_completes_when_stopped() { @@ -19,7 +21,7 @@ public async Task task_completes_when_stopped() scheduler.Stop(); - await scheduler.Task.WaitAsync(TimeSpan.FromSeconds(1)); + await scheduler.Task.WaitAsync(Timeout); } [Fact] @@ -30,7 +32,7 @@ public async Task future_work_does_not_keep_shutdown_incomplete() scheduler.Schedule(TimeSpan.FromMinutes(1), static (_, _) => { }, null); scheduler.Stop(); - await scheduler.Task.WaitAsync(TimeSpan.FromSeconds(1)); + await scheduler.Task.WaitAsync(Timeout); } [Fact] @@ -44,7 +46,7 @@ public async Task statistics_include_scheduled_work() await AssertEventually(() => scheduler.GetStatistics().Length == 2); scheduler.Stop(); - await scheduler.Task.WaitAsync(TimeSpan.FromSeconds(1)); + await scheduler.Task.WaitAsync(Timeout); } [Fact] @@ -60,17 +62,17 @@ public async Task callback_failure_does_not_fault_scheduler_lifetime() throw new InvalidOperationException("boom"); }, failedCallbackRan); - await failedCallbackRan.Task.WaitAsync(TimeSpan.FromSeconds(1)); + await failedCallbackRan.Task.WaitAsync(Timeout); scheduler.Schedule(TimeSpan.Zero, static (_, state) => { ((TaskCompletionSource)state).SetResult(); }, nextCallbackRan); - await nextCallbackRan.Task.WaitAsync(TimeSpan.FromSeconds(1)); + await nextCallbackRan.Task.WaitAsync(Timeout); scheduler.Stop(); - await scheduler.Task.WaitAsync(TimeSpan.FromSeconds(1)); + await scheduler.Task.WaitAsync(Timeout); } [Fact] @@ -90,7 +92,7 @@ public async Task callback_failure_is_counted_as_processed_work() ((TaskCompletionSource)state).SetResult(); }, nextCallbackRan); - await Task.WhenAll(failedCallbackRan.Task, nextCallbackRan.Task).WaitAsync(TimeSpan.FromSeconds(1)); + await Task.WhenAll(failedCallbackRan.Task, nextCallbackRan.Task).WaitAsync(Timeout); await AssertEventually(() => scheduler.GetStatistics().TotalItemsProcessed >= 4); } @@ -109,14 +111,14 @@ public async Task busy_tracker_observes_scheduler_work_and_idle() ((TaskCompletionSource)state).SetResult(); }, callbackRan); - await callbackRan.Task.WaitAsync(TimeSpan.FromSeconds(1)); + await callbackRan.Task.WaitAsync(Timeout); await AssertEventually(() => tracker.BusyCount > busyBefore && tracker.IdleCount > idleBefore); } private static async Task AssertEventually(Func condition) { - var deadline = DateTime.UtcNow.AddSeconds(1); + var deadline = DateTime.UtcNow.Add(Timeout); while (!condition()) { From abd1bf95936e48a0484354054870abc70f584349 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Thu, 25 Jun 2026 04:12:20 -0400 Subject: [PATCH 5/8] fix(indexing): preserve startup disposal ordering Signed-off-by: Yordis Prieto --- .../Indexing/IndexingSubscriptionTests.cs | 39 +++++++++++++++++- .../Storage/Indexing/IndexingSubscription.cs | 41 +++++++++++++++++-- 2 files changed, 74 insertions(+), 6 deletions(-) diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs index 804baecbe..01e49cbd9 100644 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs @@ -45,6 +45,28 @@ public async Task can_start_again_after_startup_failure() await subscription.Start(CancellationToken.None); } + [Fact] + public async Task waits_for_in_flight_start_before_disposing_component() + { + var component = new FakeIndexingComponent(pauseInitializeCompletion: true); + var eventSource = new FakeIndexingEventSource(); + var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + IndexingSubscriptionOptions.Default); + + var startup = subscription.Start(CancellationToken.None).AsTask(); + await component.WaitForInitializeEntered(); + var disposal = subscription.DisposeAsync().AsTask(); + + component.ReleaseInitialize(); + await Assert.ThrowsAsync(() => startup.WaitAsync(Timeout)); + await disposal.WaitAsync(Timeout); + + Assert.False(component.DisposedBeforeInitializeCompleted); + Assert.True(component.Disposed); + } + [Fact] public async Task indexes_events_from_source() { @@ -183,8 +205,11 @@ private static ResolvedEvent CreateResolvedEvent(long number) private sealed class FakeIndexingComponent( IndexCheckpoint? checkpoint = null, int initializeFailures = 0, + bool pauseInitializeCompletion = false, bool pauseIndexCompletion = false) : IIndexingComponent { + private readonly TaskCompletionSource _initializeEntered = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _releaseInitialize = new(TaskCreationOptions.RunContinuationsAsynchronously); private int _initializeFailures = initializeFailures; public FakeIndexingProcessor Processor { get; } = new(pauseIndexCompletion); @@ -192,24 +217,34 @@ private sealed class FakeIndexingComponent( IIndexingProcessor IIndexingComponent.Processor => Processor; public bool Disposed { get; private set; } + public bool DisposedBeforeInitializeCompleted { get; private set; } - public ValueTask Initialize(CancellationToken token) + public async ValueTask Initialize(CancellationToken token) { if (Interlocked.Decrement(ref _initializeFailures) >= 0) { throw new InvalidOperationException("initialize failed"); } - return ValueTask.CompletedTask; + if (pauseInitializeCompletion) + { + _initializeEntered.TrySetResult(); + await _releaseInitialize.Task; + } } public ValueTask ReadCheckpoint(CancellationToken token) => ValueTask.FromResult(checkpoint); public ValueTask DisposeAsync() { + DisposedBeforeInitializeCompleted = pauseInitializeCompletion && !_releaseInitialize.Task.IsCompleted; Disposed = true; return ValueTask.CompletedTask; } + + public Task WaitForInitializeEntered() => _initializeEntered.Task.WaitAsync(Timeout); + + public void ReleaseInitialize() => _releaseInitialize.TrySetResult(); } private sealed class FakeIndexingProcessor(bool pauseIndexCompletion = false) : IIndexingProcessor diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs index 3cb67ffcf..2105d3e78 100644 --- a/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs +++ b/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs @@ -36,13 +36,15 @@ public sealed class IndexingSubscription( private IIndexingEventSource _eventSource; private IndexCheckpointCommitTracker _commitTracker; + private Task _startup; private Task _processing; private bool _starting; private bool _started; private bool _disposed; - public async ValueTask Start(CancellationToken token) + public ValueTask Start(CancellationToken token) { + Task startup; lock (_stateLock) { ObjectDisposedException.ThrowIf(_disposed, this); @@ -52,8 +54,15 @@ public async ValueTask Start(CancellationToken token) } _starting = true; + startup = StartCore(token); + _startup = startup; } + return new ValueTask(startup); + } + + private async Task StartCore(CancellationToken token) + { using var linked = CancellationTokenSource.CreateLinkedTokenSource(token, _stop.Token); IIndexingEventSource eventSource = null; @@ -101,6 +110,7 @@ public async ValueTask Start(CancellationToken token) public async ValueTask DisposeAsync() { + Task startup; Task processing; IIndexingEventSource eventSource; IndexCheckpointCommitTracker commitTracker; @@ -113,15 +123,38 @@ public async ValueTask DisposeAsync() } _disposed = true; - processing = _processing; - eventSource = _eventSource; - commitTracker = _commitTracker; + startup = _startup; } await _stop.CancelAsync(); Exception failure = null; + try + { + if (startup is not null) + { + await startup; + } + } + catch (OperationCanceledException) when (_stop.IsCancellationRequested) + { + } + catch (ObjectDisposedException) when (_stop.IsCancellationRequested) + { + } + catch (Exception ex) + { + failure = ex; + } + + lock (_stateLock) + { + processing = _processing; + eventSource = _eventSource; + commitTracker = _commitTracker; + } + try { if (processing is not null) From a63ffba6357ea3a7ad9b11a2bfe2b62f755df0e5 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Thu, 25 Jun 2026 04:53:36 -0400 Subject: [PATCH 6/8] fix(indexing): keep startup cleanup quiet Signed-off-by: Yordis Prieto --- .../Indexing/IndexingSubscriptionTests.cs | 16 ++++++++++++++++ .../Storage/Indexing/IndexingSubscription.cs | 3 +-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs index 01e49cbd9..93747f594 100644 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs @@ -67,6 +67,22 @@ public async Task waits_for_in_flight_start_before_disposing_component() Assert.True(component.Disposed); } + [Fact] + public async Task dispose_does_not_surface_failed_startup() + { + var component = new FakeIndexingComponent(initializeFailures: 1); + var eventSource = new FakeIndexingEventSource(); + var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + IndexingSubscriptionOptions.Default); + + await Assert.ThrowsAsync(() => subscription.Start(CancellationToken.None).AsTask()); + await subscription.DisposeAsync(); + + Assert.True(component.Disposed); + } + [Fact] public async Task indexes_events_from_source() { diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs index 2105d3e78..6085ea38e 100644 --- a/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs +++ b/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs @@ -143,9 +143,8 @@ public async ValueTask DisposeAsync() catch (ObjectDisposedException) when (_stop.IsCancellationRequested) { } - catch (Exception ex) + catch { - failure = ex; } lock (_stateLock) From 12af0562b695d603b3f622fac45ebe0b1248af3e Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Thu, 25 Jun 2026 05:10:41 -0400 Subject: [PATCH 7/8] fix(indexing): keep test waits explicit Signed-off-by: Yordis Prieto --- .../Services/Storage/Indexing/IndexingSubscriptionTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs index 93747f594..3b3c9ef3e 100644 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs @@ -271,8 +271,8 @@ private sealed class FakeIndexingProcessor(bool pauseIndexCompletion = false) : private readonly TaskCompletionSource _indexEntered = new(TaskCreationOptions.RunContinuationsAsynchronously); private readonly TaskCompletionSource _releaseIndex = new(TaskCreationOptions.RunContinuationsAsynchronously); private int _commitCount; - private int _waitForIndexed; - private int _waitForCommits; + private int _waitForIndexed = int.MaxValue; + private int _waitForCommits = int.MaxValue; public IReadOnlyList Indexed => _indexed; From 016775fbf82d0a37ea823df7a919dfba424bc837 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Thu, 25 Jun 2026 05:33:45 -0400 Subject: [PATCH 8/8] fix(indexing): clean up failed workers Signed-off-by: Yordis Prieto --- .../Indexing/IndexingSubscriptionTests.cs | 13 ++++++++-- .../Storage/Indexing/IndexingSubscription.cs | 25 ++++++++++++++++--- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs index 3b3c9ef3e..b6e52b36f 100644 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs @@ -191,9 +191,10 @@ public async Task cleans_up_after_event_source_completion_faults_worker() await subscription.Start(CancellationToken.None); await eventSource.WaitForDrained(); - var exception = await Assert.ThrowsAsync(() => subscription.DisposeAsync().AsTask()); + await eventSource.WaitForDisposed(); + await component.WaitForDisposed(); + await subscription.DisposeAsync(); - Assert.Contains("completed unexpectedly", exception.Message); Assert.True(component.Disposed); Assert.True(eventSource.Disposed); } @@ -226,6 +227,7 @@ private sealed class FakeIndexingComponent( { private readonly TaskCompletionSource _initializeEntered = new(TaskCreationOptions.RunContinuationsAsynchronously); private readonly TaskCompletionSource _releaseInitialize = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _disposed = new(TaskCreationOptions.RunContinuationsAsynchronously); private int _initializeFailures = initializeFailures; public FakeIndexingProcessor Processor { get; } = new(pauseIndexCompletion); @@ -255,11 +257,14 @@ public ValueTask DisposeAsync() { DisposedBeforeInitializeCompleted = pauseInitializeCompletion && !_releaseInitialize.Task.IsCompleted; Disposed = true; + _disposed.TrySetResult(); return ValueTask.CompletedTask; } public Task WaitForInitializeEntered() => _initializeEntered.Task.WaitAsync(Timeout); + public Task WaitForDisposed() => _disposed.Task.WaitAsync(Timeout); + public void ReleaseInitialize() => _releaseInitialize.TrySetResult(); } @@ -349,6 +354,7 @@ private sealed class FakeIndexingEventSource(params ReadResponse[] responses) : { private readonly Queue _responses = new(responses); private readonly TaskCompletionSource _drained = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _disposed = new(TaskCreationOptions.RunContinuationsAsynchronously); private readonly bool _completeWhenDrained; private CancellationToken _token; @@ -365,6 +371,8 @@ public FakeIndexingEventSource(bool completeWhenDrained, params ReadResponse[] r public Task WaitForDrained() => _drained.Task.WaitAsync(Timeout); + public Task WaitForDisposed() => _disposed.Task.WaitAsync(Timeout); + public async ValueTask MoveNextAsync() { if (!_responses.TryDequeue(out var response)) @@ -386,6 +394,7 @@ public async ValueTask MoveNextAsync() public ValueTask DisposeAsync() { Disposed = true; + _disposed.TrySetResult(); return ValueTask.CompletedTask; } } diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs index 6085ea38e..ed1addf8e 100644 --- a/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs +++ b/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs @@ -108,7 +108,9 @@ private async Task StartCore(CancellationToken token) } } - public async ValueTask DisposeAsync() + public ValueTask DisposeAsync() => DisposeAsync(ignoreProcessingFailure: false); + + private async ValueTask DisposeAsync(bool ignoreProcessingFailure) { Task startup; Task processing; @@ -167,6 +169,9 @@ public async ValueTask DisposeAsync() } } } + catch (Exception) when (ignoreProcessingFailure) + { + } catch (Exception ex) { failure = ex; @@ -218,15 +223,29 @@ public async ValueTask DisposeAsync() ExceptionDispatchInfo.Capture(failure).Throw(); } - private static void ObserveProcessingFault(Task processing) + private void ObserveProcessingFault(Task processing) { processing.ContinueWith( - static task => Log.Error(task.Exception, "Indexing subscription stopped unexpectedly"), + task => _ = DisposeAfterProcessingFault(task), CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } + private async Task DisposeAfterProcessingFault(Task processing) + { + Log.Error(processing.Exception, "Indexing subscription stopped unexpectedly"); + + try + { + await DisposeAsync(ignoreProcessingFailure: true); + } + catch (Exception ex) + { + Log.Error(ex, "Error while disposing failed indexing subscription"); + } + } + private async Task ProcessEvents() { while (!_stop.IsCancellationRequested)