diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs new file mode 100644 index 000000000..b6e52b36f --- /dev/null +++ b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs @@ -0,0 +1,403 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using EventStore.Core.Data; +using EventStore.Core.Services.Storage.Indexing; +using EventStore.Core.Services.Transport.Common; +using EventStore.Core.Services.Transport.Enumerators; +using EventStore.Core.TransactionLog.LogRecords; +using Xunit; + +namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; + +public class IndexingSubscriptionTests +{ + [Fact] + public async Task starts_from_component_checkpoint() + { + var checkpoint = new IndexCheckpoint(10, 5); + var component = new FakeIndexingComponent(checkpoint); + var eventSource = new FakeIndexingEventSource(); + var eventSources = new FakeIndexingEventSourceFactory(eventSource); + await using var subscription = new IndexingSubscription( + component, + eventSources, + IndexingSubscriptionOptions.Default); + + await subscription.Start(CancellationToken.None); + + Assert.Equal(checkpoint, eventSources.Checkpoint); + } + + [Fact] + public async Task can_start_again_after_startup_failure() + { + var component = new FakeIndexingComponent(initializeFailures: 1); + var eventSource = new FakeIndexingEventSource(); + await using var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + IndexingSubscriptionOptions.Default); + + await Assert.ThrowsAsync(() => subscription.Start(CancellationToken.None).AsTask()); + + await subscription.Start(CancellationToken.None); + } + + [Fact] + public async Task waits_for_in_flight_start_before_disposing_component() + { + var component = new FakeIndexingComponent(pauseInitializeCompletion: true); + var eventSource = new FakeIndexingEventSource(); + var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + IndexingSubscriptionOptions.Default); + + var startup = subscription.Start(CancellationToken.None).AsTask(); + await component.WaitForInitializeEntered(); + var disposal = subscription.DisposeAsync().AsTask(); + + component.ReleaseInitialize(); + await Assert.ThrowsAsync(() => startup.WaitAsync(Timeout)); + await disposal.WaitAsync(Timeout); + + Assert.False(component.DisposedBeforeInitializeCompleted); + Assert.True(component.Disposed); + } + + [Fact] + public async Task dispose_does_not_surface_failed_startup() + { + var component = new FakeIndexingComponent(initializeFailures: 1); + var eventSource = new FakeIndexingEventSource(); + var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + IndexingSubscriptionOptions.Default); + + await Assert.ThrowsAsync(() => subscription.Start(CancellationToken.None).AsTask()); + await subscription.DisposeAsync(); + + Assert.True(component.Disposed); + } + + [Fact] + public async Task indexes_events_from_source() + { + var first = CreateResolvedEvent(1); + var second = CreateResolvedEvent(2); + var component = new FakeIndexingComponent(); + var eventSource = new FakeIndexingEventSource( + new ReadResponse.EventReceived(first), + new ReadResponse.SubscriptionCaughtUp(new TFPos(1, 1)), + new ReadResponse.EventReceived(second)); + await using var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + new IndexingSubscriptionOptions(2, TimeSpan.FromSeconds(30))); + + await subscription.Start(CancellationToken.None); + await component.Processor.WaitForIndexed(2); + + Assert.Equal(new[] { first, second }, component.Processor.Indexed); + } + + [Fact] + public async Task commits_when_batch_size_is_reached() + { + var component = new FakeIndexingComponent(); + var eventSource = new FakeIndexingEventSource( + new ReadResponse.EventReceived(CreateResolvedEvent(1)), + new ReadResponse.EventReceived(CreateResolvedEvent(2))); + await using var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + new IndexingSubscriptionOptions(2, TimeSpan.FromSeconds(30))); + + await subscription.Start(CancellationToken.None); + await component.Processor.WaitForCommits(1); + + Assert.Equal(1, component.Processor.CommitCount); + } + + [Fact] + public async Task commits_when_delay_elapses() + { + var component = new FakeIndexingComponent(); + var eventSource = new FakeIndexingEventSource(new ReadResponse.EventReceived(CreateResolvedEvent(1))); + await using var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + new IndexingSubscriptionOptions(100, TimeSpan.FromMilliseconds(25))); + + await subscription.Start(CancellationToken.None); + await component.Processor.WaitForCommits(1); + + Assert.Equal(1, component.Processor.CommitCount); + } + + [Fact] + public async Task commits_pending_events_when_disposed() + { + var component = new FakeIndexingComponent(); + var eventSource = new FakeIndexingEventSource(new ReadResponse.EventReceived(CreateResolvedEvent(1))); + var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + new IndexingSubscriptionOptions(100, TimeSpan.FromSeconds(30))); + + await subscription.Start(CancellationToken.None); + await component.Processor.WaitForIndexed(1); + await subscription.DisposeAsync(); + + Assert.Equal(1, component.Processor.CommitCount); + Assert.True(component.Disposed); + Assert.True(eventSource.Disposed); + } + + [Fact] + public async Task commits_in_flight_event_when_disposed() + { + var component = new FakeIndexingComponent(pauseIndexCompletion: true); + var eventSource = new FakeIndexingEventSource(new ReadResponse.EventReceived(CreateResolvedEvent(1))); + var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + new IndexingSubscriptionOptions(100, TimeSpan.FromSeconds(30))); + + await subscription.Start(CancellationToken.None); + await component.Processor.WaitForIndexEntered(); + var disposal = subscription.DisposeAsync().AsTask(); + + component.Processor.ReleaseIndex(); + await disposal.WaitAsync(Timeout); + + Assert.Equal(1, component.Processor.CommitCount); + Assert.True(component.Disposed); + Assert.True(eventSource.Disposed); + } + + [Fact] + public async Task cleans_up_after_event_source_completion_faults_worker() + { + var component = new FakeIndexingComponent(); + var eventSource = new FakeIndexingEventSource(completeWhenDrained: true); + var subscription = new IndexingSubscription( + component, + new FakeIndexingEventSourceFactory(eventSource), + IndexingSubscriptionOptions.Default); + + await subscription.Start(CancellationToken.None); + await eventSource.WaitForDrained(); + await eventSource.WaitForDisposed(); + await component.WaitForDisposed(); + await subscription.DisposeAsync(); + + Assert.True(component.Disposed); + Assert.True(eventSource.Disposed); + } + + private static ResolvedEvent CreateResolvedEvent(long number) + { + var record = new EventRecord( + number, + number, + Guid.NewGuid(), + Guid.NewGuid(), + number, + 0, + $"stream-{number}", + number - 1, + DateTime.UtcNow, + PrepareFlags.SingleWrite | PrepareFlags.IsCommitted | PrepareFlags.Data, + $"event-{number}", + Array.Empty(), + Array.Empty()); + + return ResolvedEvent.ForUnresolvedEvent(record, number); + } + + private sealed class FakeIndexingComponent( + IndexCheckpoint? checkpoint = null, + int initializeFailures = 0, + bool pauseInitializeCompletion = false, + bool pauseIndexCompletion = false) : IIndexingComponent + { + private readonly TaskCompletionSource _initializeEntered = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _releaseInitialize = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _disposed = new(TaskCreationOptions.RunContinuationsAsynchronously); + private int _initializeFailures = initializeFailures; + + public FakeIndexingProcessor Processor { get; } = new(pauseIndexCompletion); + + IIndexingProcessor IIndexingComponent.Processor => Processor; + + public bool Disposed { get; private set; } + public bool DisposedBeforeInitializeCompleted { get; private set; } + + public async ValueTask Initialize(CancellationToken token) + { + if (Interlocked.Decrement(ref _initializeFailures) >= 0) + { + throw new InvalidOperationException("initialize failed"); + } + + if (pauseInitializeCompletion) + { + _initializeEntered.TrySetResult(); + await _releaseInitialize.Task; + } + } + + public ValueTask ReadCheckpoint(CancellationToken token) => ValueTask.FromResult(checkpoint); + + public ValueTask DisposeAsync() + { + DisposedBeforeInitializeCompleted = pauseInitializeCompletion && !_releaseInitialize.Task.IsCompleted; + Disposed = true; + _disposed.TrySetResult(); + return ValueTask.CompletedTask; + } + + public Task WaitForInitializeEntered() => _initializeEntered.Task.WaitAsync(Timeout); + + public Task WaitForDisposed() => _disposed.Task.WaitAsync(Timeout); + + public void ReleaseInitialize() => _releaseInitialize.TrySetResult(); + } + + private sealed class FakeIndexingProcessor(bool pauseIndexCompletion = false) : IIndexingProcessor + { + private readonly List _indexed = []; + private readonly TaskCompletionSource _indexedEvents = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _committed = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _indexEntered = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _releaseIndex = new(TaskCreationOptions.RunContinuationsAsynchronously); + private int _commitCount; + private int _waitForIndexed = int.MaxValue; + private int _waitForCommits = int.MaxValue; + + public IReadOnlyList Indexed => _indexed; + + public int CommitCount => Volatile.Read(ref _commitCount); + + public async ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken token) + { + lock (_indexed) + { + _indexed.Add(resolvedEvent); + if (_indexed.Count >= Volatile.Read(ref _waitForIndexed)) + { + _indexedEvents.TrySetResult(); + } + } + + if (pauseIndexCompletion) + { + _indexEntered.TrySetResult(); + await _releaseIndex.Task.WaitAsync(token); + } + } + + public ValueTask Commit(CancellationToken token) + { + if (Interlocked.Increment(ref _commitCount) >= Volatile.Read(ref _waitForCommits)) + { + _committed.TrySetResult(); + } + + return ValueTask.CompletedTask; + } + + public Task WaitForIndexed(int count) + { + lock (_indexed) + { + _waitForIndexed = count; + if (_indexed.Count >= count) + { + return Task.CompletedTask; + } + } + + return _indexedEvents.Task.WaitAsync(Timeout); + } + + public Task WaitForCommits(int count) + { + _waitForCommits = count; + return CommitCount >= count + ? Task.CompletedTask + : _committed.Task.WaitAsync(Timeout); + } + + public Task WaitForIndexEntered() => _indexEntered.Task.WaitAsync(Timeout); + + public void ReleaseIndex() => _releaseIndex.TrySetResult(); + } + + private sealed class FakeIndexingEventSourceFactory(FakeIndexingEventSource source) : IIndexingEventSourceFactory + { + public IndexCheckpoint? Checkpoint { get; private set; } + + public IIndexingEventSource Create(IndexCheckpoint? checkpoint, CancellationToken token) + { + Checkpoint = checkpoint; + source.Bind(token); + return source; + } + } + + private sealed class FakeIndexingEventSource(params ReadResponse[] responses) : IIndexingEventSource + { + private readonly Queue _responses = new(responses); + private readonly TaskCompletionSource _drained = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _disposed = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly bool _completeWhenDrained; + private CancellationToken _token; + + public FakeIndexingEventSource(bool completeWhenDrained, params ReadResponse[] responses) : this(responses) + { + _completeWhenDrained = completeWhenDrained; + } + + public ReadResponse Current { get; private set; } + + public bool Disposed { get; private set; } + + public void Bind(CancellationToken token) => _token = token; + + public Task WaitForDrained() => _drained.Task.WaitAsync(Timeout); + + public Task WaitForDisposed() => _disposed.Task.WaitAsync(Timeout); + + public async ValueTask MoveNextAsync() + { + if (!_responses.TryDequeue(out var response)) + { + if (_completeWhenDrained) + { + _drained.TrySetResult(); + return false; + } + + await Task.Delay(global::System.Threading.Timeout.InfiniteTimeSpan, _token); + return false; + } + + Current = response; + return true; + } + + public ValueTask DisposeAsync() + { + Disposed = true; + _disposed.TrySetResult(); + return ValueTask.CompletedTask; + } + } + + private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(5); +} diff --git a/src/EventStore.Core.XUnit.Tests/Services/TimerService/ThreadBasedSchedulerTests.cs b/src/EventStore.Core.XUnit.Tests/Services/TimerService/ThreadBasedSchedulerTests.cs index 0f050a043..7dbb7e6fe 100644 --- a/src/EventStore.Core.XUnit.Tests/Services/TimerService/ThreadBasedSchedulerTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Services/TimerService/ThreadBasedSchedulerTests.cs @@ -12,6 +12,8 @@ namespace EventStore.Core.XUnit.Tests.Services.TimerService; public class ThreadBasedSchedulerTests { + private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(5); + [Fact] public async Task task_completes_when_stopped() { @@ -19,7 +21,7 @@ public async Task task_completes_when_stopped() scheduler.Stop(); - await scheduler.Task.WaitAsync(TimeSpan.FromSeconds(1)); + await scheduler.Task.WaitAsync(Timeout); } [Fact] @@ -30,7 +32,7 @@ public async Task future_work_does_not_keep_shutdown_incomplete() scheduler.Schedule(TimeSpan.FromMinutes(1), static (_, _) => { }, null); scheduler.Stop(); - await scheduler.Task.WaitAsync(TimeSpan.FromSeconds(1)); + await scheduler.Task.WaitAsync(Timeout); } [Fact] @@ -44,7 +46,7 @@ public async Task statistics_include_scheduled_work() await AssertEventually(() => scheduler.GetStatistics().Length == 2); scheduler.Stop(); - await scheduler.Task.WaitAsync(TimeSpan.FromSeconds(1)); + await scheduler.Task.WaitAsync(Timeout); } [Fact] @@ -60,17 +62,17 @@ public async Task callback_failure_does_not_fault_scheduler_lifetime() throw new InvalidOperationException("boom"); }, failedCallbackRan); - await failedCallbackRan.Task.WaitAsync(TimeSpan.FromSeconds(1)); + await failedCallbackRan.Task.WaitAsync(Timeout); scheduler.Schedule(TimeSpan.Zero, static (_, state) => { ((TaskCompletionSource)state).SetResult(); }, nextCallbackRan); - await nextCallbackRan.Task.WaitAsync(TimeSpan.FromSeconds(1)); + await nextCallbackRan.Task.WaitAsync(Timeout); scheduler.Stop(); - await scheduler.Task.WaitAsync(TimeSpan.FromSeconds(1)); + await scheduler.Task.WaitAsync(Timeout); } [Fact] @@ -90,7 +92,7 @@ public async Task callback_failure_is_counted_as_processed_work() ((TaskCompletionSource)state).SetResult(); }, nextCallbackRan); - await Task.WhenAll(failedCallbackRan.Task, nextCallbackRan.Task).WaitAsync(TimeSpan.FromSeconds(1)); + await Task.WhenAll(failedCallbackRan.Task, nextCallbackRan.Task).WaitAsync(Timeout); await AssertEventually(() => scheduler.GetStatistics().TotalItemsProcessed >= 4); } @@ -109,14 +111,14 @@ public async Task busy_tracker_observes_scheduler_work_and_idle() ((TaskCompletionSource)state).SetResult(); }, callbackRan); - await callbackRan.Task.WaitAsync(TimeSpan.FromSeconds(1)); + await callbackRan.Task.WaitAsync(Timeout); await AssertEventually(() => tracker.BusyCount > busyBefore && tracker.IdleCount > idleBefore); } private static async Task AssertEventually(Func condition) { - var deadline = DateTime.UtcNow.AddSeconds(1); + var deadline = DateTime.UtcNow.Add(Timeout); while (!condition()) { diff --git a/src/EventStore.Core/Services/Storage/Indexing/IIndexingComponent.cs b/src/EventStore.Core/Services/Storage/Indexing/IIndexingComponent.cs new file mode 100644 index 000000000..ca7cbea36 --- /dev/null +++ b/src/EventStore.Core/Services/Storage/Indexing/IIndexingComponent.cs @@ -0,0 +1,22 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using EventStore.Core.Data; + +namespace EventStore.Core.Services.Storage.Indexing; + +public interface IIndexingComponent : IAsyncDisposable +{ + ValueTask Initialize(CancellationToken token); + + ValueTask ReadCheckpoint(CancellationToken token); + + IIndexingProcessor Processor { get; } +} + +public interface IIndexingProcessor +{ + ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken token); + + ValueTask Commit(CancellationToken token); +} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IIndexingEventSource.cs b/src/EventStore.Core/Services/Storage/Indexing/IIndexingEventSource.cs new file mode 100644 index 000000000..5107efcb6 --- /dev/null +++ b/src/EventStore.Core/Services/Storage/Indexing/IIndexingEventSource.cs @@ -0,0 +1,48 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using EventStore.Core.Bus; +using EventStore.Core.Services.Storage.ReaderIndex; +using EventStore.Core.Services.Transport.Common; +using EventStore.Core.Services.Transport.Enumerators; +using EventStore.Core.Services.UserManagement; + +namespace EventStore.Core.Services.Storage.Indexing; + +public interface IIndexingEventSource : IAsyncDisposable +{ + ReadResponse Current { get; } + + ValueTask MoveNextAsync(); +} + +public interface IIndexingEventSourceFactory +{ + IIndexingEventSource Create(IndexCheckpoint? checkpoint, CancellationToken token); +} + +public sealed class AllStreamsIndexingEventSourceFactory(IPublisher publisher) : IIndexingEventSourceFactory +{ + public IIndexingEventSource Create(IndexCheckpoint? checkpoint, CancellationToken token) + { + var subscription = new Enumerator.AllSubscription( + publisher, + new DefaultExpiryStrategy(), + checkpoint?.ToPosition(), + resolveLinks: false, + SystemAccounts.System, + requiresLeader: false, + token); + + return new AllStreamsIndexingEventSource(subscription); + } + + private sealed class AllStreamsIndexingEventSource(Enumerator.AllSubscription subscription) : IIndexingEventSource + { + public ReadResponse Current => subscription.Current; + + public ValueTask MoveNextAsync() => subscription.MoveNextAsync(); + + public ValueTask DisposeAsync() => subscription.DisposeAsync(); + } +} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpoint.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpoint.cs new file mode 100644 index 000000000..be3dd478e --- /dev/null +++ b/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpoint.cs @@ -0,0 +1,27 @@ +using System; +using EventStore.Core.Services.Transport.Common; + +namespace EventStore.Core.Services.Storage.Indexing; + +public readonly record struct IndexCheckpoint +{ + public long CommitPosition { get; } + public long PreparePosition { get; } + + public IndexCheckpoint(long commitPosition, long preparePosition) + { + ArgumentOutOfRangeException.ThrowIfNegative(commitPosition); + ArgumentOutOfRangeException.ThrowIfNegative(preparePosition); + + if (commitPosition < preparePosition) + { + throw new ArgumentOutOfRangeException(nameof(CommitPosition), + "The commit position cannot be less than the prepare position."); + } + + CommitPosition = commitPosition; + PreparePosition = preparePosition; + } + + public Position ToPosition() => Position.FromInt64(CommitPosition, PreparePosition); +} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexingService.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexingService.cs new file mode 100644 index 000000000..78298c8bb --- /dev/null +++ b/src/EventStore.Core/Services/Storage/Indexing/IndexingService.cs @@ -0,0 +1,32 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using EventStore.Core.Bus; +using EventStore.Core.Messages; + +namespace EventStore.Core.Services.Storage.Indexing; + +public sealed class IndexingService : IAsyncHandle, IAsyncHandle, IAsyncDisposable +{ + private readonly IndexingSubscription _subscription; + + public IndexingService( + IIndexingComponent component, + IIndexingEventSourceFactory eventSourceFactory, + ISubscriber subscriber, + IndexingSubscriptionOptions options) + { + _subscription = new IndexingSubscription(component, eventSourceFactory, options); + + subscriber.Subscribe(this); + subscriber.Subscribe(this); + } + + public ValueTask HandleAsync(SystemMessage.SystemReady message, CancellationToken token) => + _subscription.Start(token); + + public ValueTask HandleAsync(SystemMessage.BecomeShuttingDown message, CancellationToken token) => + _subscription.DisposeAsync(); + + public ValueTask DisposeAsync() => _subscription.DisposeAsync(); +} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs new file mode 100644 index 000000000..ed1addf8e --- /dev/null +++ b/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs @@ -0,0 +1,289 @@ +using System; +using System.Runtime.ExceptionServices; +using System.Threading; +using System.Threading.Tasks; +using EventStore.Core.Services.Transport.Enumerators; +using Serilog; + +namespace EventStore.Core.Services.Storage.Indexing; + +public sealed record IndexingSubscriptionOptions +{ + public static readonly IndexingSubscriptionOptions Default = new(50000, TimeSpan.FromSeconds(10)); + + public int CheckpointCommitBatchSize { get; } + public TimeSpan CheckpointCommitDelay { get; } + + public IndexingSubscriptionOptions(int checkpointCommitBatchSize, TimeSpan checkpointCommitDelay) + { + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(checkpointCommitBatchSize); + ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(checkpointCommitDelay, TimeSpan.Zero); + + CheckpointCommitBatchSize = checkpointCommitBatchSize; + CheckpointCommitDelay = checkpointCommitDelay; + } +} + +public sealed class IndexingSubscription( + IIndexingComponent component, + IIndexingEventSourceFactory eventSourceFactory, + IndexingSubscriptionOptions options) : IAsyncDisposable +{ + private static readonly ILogger Log = Serilog.Log.ForContext(); + + private readonly CancellationTokenSource _stop = new(); + private readonly object _stateLock = new(); + + private IIndexingEventSource _eventSource; + private IndexCheckpointCommitTracker _commitTracker; + private Task _startup; + private Task _processing; + private bool _starting; + private bool _started; + private bool _disposed; + + public ValueTask Start(CancellationToken token) + { + Task startup; + lock (_stateLock) + { + ObjectDisposedException.ThrowIf(_disposed, this); + if (_started || _starting) + { + throw new InvalidOperationException($"{nameof(IndexingSubscription)} has already been started."); + } + + _starting = true; + startup = StartCore(token); + _startup = startup; + } + + return new ValueTask(startup); + } + + private async Task StartCore(CancellationToken token) + { + using var linked = CancellationTokenSource.CreateLinkedTokenSource(token, _stop.Token); + + IIndexingEventSource eventSource = null; + IndexCheckpointCommitTracker commitTracker = null; + + try + { + await component.Initialize(linked.Token); + var checkpoint = await component.ReadCheckpoint(linked.Token); + + commitTracker = new IndexCheckpointCommitTracker( + options.CheckpointCommitBatchSize, + options.CheckpointCommitDelay, + component.Processor.Commit, + CancellationToken.None); + + eventSource = eventSourceFactory.Create(checkpoint, _stop.Token); + + lock (_stateLock) + { + ObjectDisposedException.ThrowIf(_disposed, this); + + _commitTracker = commitTracker; + _eventSource = eventSource; + _processing = Task.Run(ProcessEvents); + ObserveProcessingFault(_processing); + _started = true; + _starting = false; + } + } + catch + { + lock (_stateLock) + _starting = false; + + if (commitTracker is not null) + await commitTracker.DisposeAsync(); + + if (eventSource is not null) + await eventSource.DisposeAsync(); + + throw; + } + } + + public ValueTask DisposeAsync() => DisposeAsync(ignoreProcessingFailure: false); + + private async ValueTask DisposeAsync(bool ignoreProcessingFailure) + { + Task startup; + Task processing; + IIndexingEventSource eventSource; + IndexCheckpointCommitTracker commitTracker; + + lock (_stateLock) + { + if (_disposed) + { + return; + } + + _disposed = true; + startup = _startup; + } + + await _stop.CancelAsync(); + + Exception failure = null; + + try + { + if (startup is not null) + { + await startup; + } + } + catch (OperationCanceledException) when (_stop.IsCancellationRequested) + { + } + catch (ObjectDisposedException) when (_stop.IsCancellationRequested) + { + } + catch + { + } + + lock (_stateLock) + { + processing = _processing; + eventSource = _eventSource; + commitTracker = _commitTracker; + } + + try + { + if (processing is not null) + { + try + { + await processing; + } + catch (OperationCanceledException) when (_stop.IsCancellationRequested) + { + } + } + } + catch (Exception) when (ignoreProcessingFailure) + { + } + catch (Exception ex) + { + failure = ex; + } + + try + { + if (commitTracker is not null) + { + await commitTracker.DisposeAsync(); + } + } + catch (Exception ex) when (failure is null) + { + failure = ex; + } + + try + { + if (eventSource is not null) + { + await eventSource.DisposeAsync(); + } + } + catch (Exception ex) when (failure is null) + { + failure = ex; + } + + try + { + await component.DisposeAsync(); + } + catch (Exception ex) when (failure is null) + { + failure = ex; + } + + try + { + _stop.Dispose(); + } + catch (Exception ex) when (failure is null) + { + failure = ex; + } + + if (failure is not null) + ExceptionDispatchInfo.Capture(failure).Throw(); + } + + private void ObserveProcessingFault(Task processing) + { + processing.ContinueWith( + task => _ = DisposeAfterProcessingFault(task), + CancellationToken.None, + TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); + } + + private async Task DisposeAfterProcessingFault(Task processing) + { + Log.Error(processing.Exception, "Indexing subscription stopped unexpectedly"); + + try + { + await DisposeAsync(ignoreProcessingFailure: true); + } + catch (Exception ex) + { + Log.Error(ex, "Error while disposing failed indexing subscription"); + } + } + + private async Task ProcessEvents() + { + while (!_stop.IsCancellationRequested) + { + bool hasNext; + try + { + hasNext = await _eventSource.MoveNextAsync(); + } + catch (OperationCanceledException) when (_stop.IsCancellationRequested) + { + return; + } + + if (!hasNext) + { + throw new InvalidOperationException("Indexing event source completed unexpectedly."); + } + + if (_eventSource.Current is not ReadResponse.EventReceived eventReceived) + { + continue; + } + + try + { + await component.Processor.Index(eventReceived.Event, CancellationToken.None); + _commitTracker.Track(); + } + catch (OperationCanceledException) when (_stop.IsCancellationRequested) + { + return; + } + catch (Exception ex) + { + Log.Error(ex, "Error while indexing event {eventType}", eventReceived.Event.Event.EventType); + throw; + } + } + } +}