From c369153251b02faa85009bb875a7615c0d976544 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Tue, 30 Jun 2026 22:53:41 -0400 Subject: [PATCH] chore(indexing): remove core indexing Signed-off-by: Yordis Prieto --- .../ClusterVNodeHostedService.cs | 6 +- .../when_cancelling_storage_reader_worker.cs | 2 - .../VirtualStreamReaderProviderTests.cs | 72 --- .../InMemory/VirtualStreamReaderTests.cs | 14 +- ...lStreamsIndexingEventSourceFactoryTests.cs | 16 - .../InMemoryIndexCheckpointStoreTests.cs | 68 --- .../InMemoryIndexDefinitionStoreTests.cs | 225 -------- .../InMemoryIndexVirtualStreamReaderTests.cs | 319 ----------- .../IndexCheckpointCommitTrackerTests.cs | 344 ----------- .../Storage/Indexing/IndexCheckpointTests.cs | 54 -- .../Indexing/IndexCheckpointWriterTests.cs | 348 ------------ .../Storage/Indexing/IndexDefinitionTests.cs | 132 ----- .../Storage/Indexing/IndexNameTests.cs | 51 -- .../Storage/Indexing/IndexStreamIdTests.cs | 47 -- .../Indexing/IndexVirtualStreamReaderTests.cs | 69 --- .../Indexing/IndexingComponentHostTests.cs | 286 ---------- .../Storage/Indexing/IndexingServiceTests.cs | 266 --------- .../IndexingSubscriptionOptionsTests.cs | 39 -- .../Indexing/IndexingSubscriptionTests.cs | 532 ------------------ src/EventStore.Core/ClusterVNode.cs | 10 +- .../Storage/InMemory/IVirtualStreamReader.cs | 2 - .../InMemory/IVirtualStreamReaderProvider.cs | 16 - .../InMemory/SingleEventInMemoryStream.cs | 2 - .../Storage/InMemory/VirtualStreamReader.cs | 3 - .../Storage/Indexing/IIndexCheckpointStore.cs | 11 - .../Storage/Indexing/IIndexDefinitionStore.cs | 39 -- .../Storage/Indexing/IIndexingComponent.cs | 23 - .../Storage/Indexing/IIndexingEventSource.cs | 53 -- .../Indexing/InMemoryIndexCheckpointStore.cs | 32 -- .../Indexing/InMemoryIndexDefinitionStore.cs | 69 --- .../Indexing/InMemoryIndexEventBuffer.cs | 89 --- .../InMemoryIndexVirtualStreamReader.cs | 166 ------ .../Storage/Indexing/IndexCheckpoint.cs | 27 - .../Indexing/IndexCheckpointCommitTracker.cs | 195 ------- .../Storage/Indexing/IndexCheckpointWriter.cs | 131 ----- .../Storage/Indexing/IndexDefinition.cs | 102 ---- .../Services/Storage/Indexing/IndexName.cs | 32 -- .../Storage/Indexing/IndexStreamId.cs | 26 - .../Indexing/IndexVirtualStreamReader.cs | 32 -- .../Storage/Indexing/IndexingComponentHost.cs | 96 ---- .../Storage/Indexing/IndexingService.cs | 102 ---- .../Storage/Indexing/IndexingSubscription.cs | 316 ----------- .../Services/SubscriptionsService.cs | 5 +- 43 files changed, 10 insertions(+), 4459 deletions(-) delete mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/InMemory/VirtualStreamReaderProviderTests.cs delete mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/AllStreamsIndexingEventSourceFactoryTests.cs delete mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/InMemoryIndexCheckpointStoreTests.cs delete mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/InMemoryIndexDefinitionStoreTests.cs delete mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/InMemoryIndexVirtualStreamReaderTests.cs delete mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexCheckpointCommitTrackerTests.cs delete mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexCheckpointTests.cs delete mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexCheckpointWriterTests.cs delete mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexDefinitionTests.cs delete mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexNameTests.cs delete mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexStreamIdTests.cs delete mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexVirtualStreamReaderTests.cs delete mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingComponentHostTests.cs delete mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingServiceTests.cs delete mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionOptionsTests.cs delete mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs delete mode 100644 src/EventStore.Core/Services/Storage/InMemory/IVirtualStreamReaderProvider.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/IIndexCheckpointStore.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/IIndexDefinitionStore.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/IIndexingComponent.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/IIndexingEventSource.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexCheckpointStore.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexDefinitionStore.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexEventBuffer.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexVirtualStreamReader.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/IndexCheckpoint.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/IndexCheckpointCommitTracker.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/IndexCheckpointWriter.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/IndexDefinition.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/IndexName.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/IndexStreamId.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/IndexVirtualStreamReader.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/IndexingComponentHost.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/IndexingService.cs delete mode 100644 src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs diff --git a/src/EventStore.ClusterNode/ClusterVNodeHostedService.cs b/src/EventStore.ClusterNode/ClusterVNodeHostedService.cs index ca9090d11..04835eec1 100644 --- a/src/EventStore.ClusterNode/ClusterVNodeHostedService.cs +++ b/src/EventStore.ClusterNode/ClusterVNodeHostedService.cs @@ -21,7 +21,6 @@ using EventStore.Core.LogAbstraction; using EventStore.Core.PluginModel; using EventStore.Core.Services.PersistentSubscription.ConsumerStrategy; -using EventStore.Core.Services.Storage.InMemory; using EventStore.PluginHosting; using EventStore.Plugins; using EventStore.Plugins.Authentication; @@ -125,10 +124,7 @@ public ClusterVNodeHostedService( Node = ClusterVNode.Create(_options, logFormatFactory, GetAuthenticationProviderFactory(), authProviderFactory, GetPersistentSubscriptionConsumerStrategyFactories(), certificateProvider, - configuration, - additionalVirtualStreamReaders: _options.PlugableComponents - .OfType() - .GetVirtualStreamReaders()); + configuration); EnabledNodeSubsystems = projectionMode >= ProjectionType.System ? new[] { NodeSubsystems.Projections } diff --git a/src/EventStore.Core.Tests/Services/Storage/when_cancelling_storage_reader_worker.cs b/src/EventStore.Core.Tests/Services/Storage/when_cancelling_storage_reader_worker.cs index 8ad0b9d16..4063f98e0 100644 --- a/src/EventStore.Core.Tests/Services/Storage/when_cancelling_storage_reader_worker.cs +++ b/src/EventStore.Core.Tests/Services/Storage/when_cancelling_storage_reader_worker.cs @@ -357,8 +357,6 @@ private sealed class StubVirtualStreamReader : IVirtualStreamReader public long GetLastEventNumber(string streamId) => ExpectedVersion.NoStream; - public long GetLastIndexedPosition(string streamId) => -1; - public bool CanReadStream(string streamId) => false; } } diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/InMemory/VirtualStreamReaderProviderTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/InMemory/VirtualStreamReaderProviderTests.cs deleted file mode 100644 index a5f0047df..000000000 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/InMemory/VirtualStreamReaderProviderTests.cs +++ /dev/null @@ -1,72 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using EventStore.Core.Messages; -using EventStore.Core.Services.Storage.InMemory; -using EventStore.Plugins; -using Xunit; - -namespace EventStore.Core.XUnit.Tests.Services.Storage.InMemory; - -public class VirtualStreamReaderProviderTests -{ - [Fact] - public void returns_no_readers_when_no_provider_components_are_configured() - { - var readers = new ClusterVNodeOptions() - .PlugableComponents - .OfType() - .GetVirtualStreamReaders(); - - Assert.Empty(readers); - } - - [Fact] - public void collects_readers_from_provider_components_in_registration_order() - { - var first = new FakeVirtualStreamReader("$mem-first"); - var second = new FakeVirtualStreamReader("$mem-second"); - var third = new FakeVirtualStreamReader("$mem-third"); - - var options = new ClusterVNodeOptions() - .WithPlugableComponent(new FakeComponent("regular")) - .WithPlugableComponent(new FakeProvider("first-provider", first, second)) - .WithPlugableComponent(new FakeProvider("second-provider", third)); - - var readers = options - .PlugableComponents - .OfType() - .GetVirtualStreamReaders(); - - Assert.Equal([first, second, third], readers); - } - - private sealed class FakeComponent(string name) : Plugin(name); - - private sealed class FakeProvider(string name, params IVirtualStreamReader[] readers) - : Plugin(name), IVirtualStreamReaderProvider - { - public IReadOnlyList VirtualStreamReaders { get; } = readers; - } - - private sealed class FakeVirtualStreamReader(string streamId) : IVirtualStreamReader - { - public ValueTask ReadForwards( - ClientMessage.ReadStreamEventsForward msg, - CancellationToken token) => - throw new NotSupportedException(); - - public ValueTask ReadBackwards( - ClientMessage.ReadStreamEventsBackward msg, - CancellationToken token) => - throw new NotSupportedException(); - - public long GetLastEventNumber(string streamId) => 0; - - public long GetLastIndexedPosition(string streamId) => 0; - - public bool CanReadStream(string candidateStreamId) => candidateStreamId == streamId; - } -} diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/InMemory/VirtualStreamReaderTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/InMemory/VirtualStreamReaderTests.cs index 6dfa4c174..c30d6c4d5 100644 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/InMemory/VirtualStreamReaderTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Services/Storage/InMemory/VirtualStreamReaderTests.cs @@ -402,12 +402,11 @@ public async Task read_forwards_uses_the_reader_that_claims_the_stream() [Fact] public void metadata_uses_the_reader_that_claims_the_stream() { - var firstReader = new FakeVirtualStreamReader(FirstStream, lastEventNumber: 10, lastIndexedPosition: 100); - var secondReader = new FakeVirtualStreamReader(SecondStream, lastEventNumber: 20, lastIndexedPosition: 200); + var firstReader = new FakeVirtualStreamReader(FirstStream, lastEventNumber: 10); + var secondReader = new FakeVirtualStreamReader(SecondStream, lastEventNumber: 20); var sut = new VirtualStreamReader([firstReader, secondReader]); Assert.Equal(20, sut.GetLastEventNumber(SecondStream)); - Assert.Equal(200, sut.GetLastIndexedPosition(SecondStream)); } [Fact] @@ -429,8 +428,7 @@ public async Task earlier_reader_wins_when_more_than_one_reader_claims_the_strea private sealed class FakeVirtualStreamReader( string streamId, - long lastEventNumber, - long lastIndexedPosition = -1) : IVirtualStreamReader + long lastEventNumber) : IVirtualStreamReader { public int ForwardReadCount { get; private set; } @@ -452,7 +450,7 @@ private sealed class FakeVirtualStreamReader( nextEventNumber: lastEventNumber + 1, lastEventNumber: lastEventNumber, isEndOfStream: true, - tfLastCommitPosition: lastIndexedPosition)); + tfLastCommitPosition: -1)); } public ValueTask ReadBackwards( @@ -471,12 +469,10 @@ private sealed class FakeVirtualStreamReader( nextEventNumber: -1, lastEventNumber: lastEventNumber, isEndOfStream: true, - tfLastCommitPosition: lastIndexedPosition)); + tfLastCommitPosition: -1)); public long GetLastEventNumber(string streamId) => lastEventNumber; - public long GetLastIndexedPosition(string streamId) => lastIndexedPosition; - public bool CanReadStream(string candidate) => candidate == streamId; } } diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/AllStreamsIndexingEventSourceFactoryTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/AllStreamsIndexingEventSourceFactoryTests.cs deleted file mode 100644 index 653c54823..000000000 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/AllStreamsIndexingEventSourceFactoryTests.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System; -using EventStore.Core.Services.Storage.Indexing; -using Xunit; - -namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; - -public class AllStreamsIndexingEventSourceFactoryTests -{ - [Fact] - public void constructor_rejects_missing_publisher() - { - var exception = Assert.Throws(() => new AllStreamsIndexingEventSourceFactory(null!)); - - Assert.Equal("publisher", exception.ParamName); - } -} diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/InMemoryIndexCheckpointStoreTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/InMemoryIndexCheckpointStoreTests.cs deleted file mode 100644 index f28d016fd..000000000 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/InMemoryIndexCheckpointStoreTests.cs +++ /dev/null @@ -1,68 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using EventStore.Core.Services.Storage.Indexing; -using Xunit; - -namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; - -public class InMemoryIndexCheckpointStoreTests -{ - [Fact] - public async Task read_returns_null_when_empty() - { - var store = new InMemoryIndexCheckpointStore(); - - var checkpoint = await store.Read(CancellationToken.None); - - Assert.Null(checkpoint); - } - - [Fact] - public async Task write_and_read_round_trip() - { - var store = new InMemoryIndexCheckpointStore(); - var expected = new IndexCheckpoint(10, 5); - - await store.Write(expected, CancellationToken.None); - var checkpoint = await store.Read(CancellationToken.None); - - Assert.Equal(expected, checkpoint); - } - - [Fact] - public async Task write_overwrites_stored_checkpoint() - { - var store = new InMemoryIndexCheckpointStore(); - var first = new IndexCheckpoint(10, 5); - var second = new IndexCheckpoint(20, 15); - - await store.Write(first, CancellationToken.None); - await store.Write(second, CancellationToken.None); - var checkpoint = await store.Read(CancellationToken.None); - - Assert.Equal(second, checkpoint); - } - - [Fact] - public async Task read_honors_cancelled_token() - { - var store = new InMemoryIndexCheckpointStore(); - using var cancellation = new CancellationTokenSource(); - await cancellation.CancelAsync(); - - await Assert.ThrowsAsync(() => - store.Read(cancellation.Token).AsTask()); - } - - [Fact] - public async Task write_honors_cancelled_token() - { - var store = new InMemoryIndexCheckpointStore(); - using var cancellation = new CancellationTokenSource(); - await cancellation.CancelAsync(); - - await Assert.ThrowsAsync(() => - store.Write(new IndexCheckpoint(10, 5), cancellation.Token).AsTask()); - } -} diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/InMemoryIndexDefinitionStoreTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/InMemoryIndexDefinitionStoreTests.cs deleted file mode 100644 index a2b7bd6f6..000000000 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/InMemoryIndexDefinitionStoreTests.cs +++ /dev/null @@ -1,225 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using EventStore.Core.Services.Storage.Indexing; -using Xunit; - -namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; - -public class InMemoryIndexDefinitionStoreTests -{ - [Fact] - public async Task read_returns_null_when_missing() - { - var store = new InMemoryIndexDefinitionStore(); - - var definition = await store.Read(new IndexName("orders"), CancellationToken.None); - - Assert.Null(definition); - } - - [Fact] - public async Task create_stores_definition() - { - var store = new InMemoryIndexDefinitionStore(); - var name = new IndexName("orders"); - var definition = CreateDefinition("event.type == 'order'"); - - var result = await store.Create(name, definition, CancellationToken.None); - var stored = await store.Read(name, CancellationToken.None); - - Assert.Equal(IndexDefinitionCreateResult.Created, result); - Assert.Equal(new StoredIndexDefinition(name, definition), stored); - } - - [Fact] - public async Task create_is_idempotent_for_same_definition() - { - var store = new InMemoryIndexDefinitionStore(); - var name = new IndexName("orders"); - var definition = CreateDefinition("event.type == 'order'"); - - await store.Create(name, definition, CancellationToken.None); - var result = await store.Create(name, definition, CancellationToken.None); - var stored = await store.Read(name, CancellationToken.None); - - Assert.Equal(IndexDefinitionCreateResult.AlreadyExists, result); - Assert.Equal(definition, stored.Definition); - } - - [Fact] - public async Task create_is_idempotent_for_equivalent_definition() - { - var store = new InMemoryIndexDefinitionStore(); - var name = new IndexName("orders"); - var first = CreateDefinition("event.type == 'order'"); - var second = CreateDefinition("event.type == 'order'"); - - await store.Create(name, first, CancellationToken.None); - var result = await store.Create(name, second, CancellationToken.None); - var stored = await store.Read(name, CancellationToken.None); - - Assert.Equal(IndexDefinitionCreateResult.AlreadyExists, result); - Assert.Equal(first, stored.Definition); - } - - [Fact] - public async Task create_reports_conflict_for_different_definition() - { - var store = new InMemoryIndexDefinitionStore(); - var name = new IndexName("orders"); - var first = CreateDefinition("event.type == 'order'"); - var second = CreateDefinition("event.type == 'invoice'"); - - await store.Create(name, first, CancellationToken.None); - var result = await store.Create(name, second, CancellationToken.None); - var stored = await store.Read(name, CancellationToken.None); - - Assert.Equal(IndexDefinitionCreateResult.Conflicts, result); - Assert.Equal(first, stored.Definition); - } - - [Fact] - public async Task list_returns_stored_definitions() - { - var store = new InMemoryIndexDefinitionStore(); - var orders = new StoredIndexDefinition(new IndexName("orders"), CreateDefinition("event.type == 'order'")); - var invoices = new StoredIndexDefinition(new IndexName("invoices"), CreateDefinition("event.type == 'invoice'")); - - await store.Create(orders.Name, orders.Definition, CancellationToken.None); - await store.Create(invoices.Name, invoices.Definition, CancellationToken.None); - var definitions = await store.List(CancellationToken.None); - - Assert.Equal([invoices, orders], definitions); - } - - [Fact] - public async Task list_returns_snapshot() - { - var store = new InMemoryIndexDefinitionStore(); - var orders = new StoredIndexDefinition(new IndexName("orders"), CreateDefinition("event.type == 'order'")); - - await store.Create(orders.Name, orders.Definition, CancellationToken.None); - var first = await store.List(CancellationToken.None); - await store.Delete(orders.Name, CancellationToken.None); - var second = await store.List(CancellationToken.None); - - Assert.Equal([orders], first); - Assert.Empty(second); - } - - [Fact] - public async Task delete_removes_definition() - { - var store = new InMemoryIndexDefinitionStore(); - var name = new IndexName("orders"); - - await store.Create(name, CreateDefinition("event.type == 'order'"), CancellationToken.None); - var deleted = await store.Delete(name, CancellationToken.None); - var stored = await store.Read(name, CancellationToken.None); - - Assert.True(deleted); - Assert.Null(stored); - } - - [Fact] - public async Task delete_returns_false_when_missing() - { - var store = new InMemoryIndexDefinitionStore(); - - var deleted = await store.Delete(new IndexName("orders"), CancellationToken.None); - - Assert.False(deleted); - } - - [Fact] - public async Task create_rejects_missing_name() - { - var store = new InMemoryIndexDefinitionStore(); - - var exception = await Assert.ThrowsAsync(() => - store.Create(null, CreateDefinition("event.type == 'order'"), CancellationToken.None).AsTask()); - - Assert.Equal("name", exception.ParamName); - } - - [Fact] - public async Task create_rejects_missing_definition() - { - var store = new InMemoryIndexDefinitionStore(); - - var exception = await Assert.ThrowsAsync(() => - store.Create(new IndexName("orders"), null, CancellationToken.None).AsTask()); - - Assert.Equal("definition", exception.ParamName); - } - - [Fact] - public async Task read_rejects_missing_name() - { - var store = new InMemoryIndexDefinitionStore(); - - var exception = await Assert.ThrowsAsync(() => - store.Read(null, CancellationToken.None).AsTask()); - - Assert.Equal("name", exception.ParamName); - } - - [Fact] - public async Task delete_rejects_missing_name() - { - var store = new InMemoryIndexDefinitionStore(); - - var exception = await Assert.ThrowsAsync(() => - store.Delete(null, CancellationToken.None).AsTask()); - - Assert.Equal("name", exception.ParamName); - } - - [Fact] - public async Task create_honors_cancelled_token() - { - var store = new InMemoryIndexDefinitionStore(); - using var cancellation = new CancellationTokenSource(); - await cancellation.CancelAsync(); - - await Assert.ThrowsAsync(() => - store.Create(new IndexName("orders"), CreateDefinition("event.type == 'order'"), cancellation.Token).AsTask()); - } - - [Fact] - public async Task read_honors_cancelled_token() - { - var store = new InMemoryIndexDefinitionStore(); - using var cancellation = new CancellationTokenSource(); - await cancellation.CancelAsync(); - - await Assert.ThrowsAsync(() => - store.Read(new IndexName("orders"), cancellation.Token).AsTask()); - } - - [Fact] - public async Task list_honors_cancelled_token() - { - var store = new InMemoryIndexDefinitionStore(); - using var cancellation = new CancellationTokenSource(); - await cancellation.CancelAsync(); - - await Assert.ThrowsAsync(() => - store.List(cancellation.Token).AsTask()); - } - - [Fact] - public async Task delete_honors_cancelled_token() - { - var store = new InMemoryIndexDefinitionStore(); - using var cancellation = new CancellationTokenSource(); - await cancellation.CancelAsync(); - - await Assert.ThrowsAsync(() => - store.Delete(new IndexName("orders"), cancellation.Token).AsTask()); - } - - private static IndexDefinition CreateDefinition(string filter) => - new(new IndexEventFilter(filter), [new IndexFieldDefinition("id", new IndexFieldSelector("event.body.id"))]); -} diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/InMemoryIndexVirtualStreamReaderTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/InMemoryIndexVirtualStreamReaderTests.cs deleted file mode 100644 index e1d37a350..000000000 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/InMemoryIndexVirtualStreamReaderTests.cs +++ /dev/null @@ -1,319 +0,0 @@ -using System; -using System.Security.Claims; -using System.Threading; -using System.Threading.Tasks; -using EventStore.Core.Data; -using EventStore.Core.Messages; -using EventStore.Core.Messaging; -using EventStore.Core.Services.Storage.Indexing; -using EventStore.Core.Services.Storage.InMemory; -using EventStore.Core.TransactionLog.LogRecords; -using Xunit; - -namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; - -public class InMemoryIndexVirtualStreamReaderTests -{ - private const string IndexStream = "$index-stream"; - - [Fact] - public void buffer_constructor_rejects_missing_stream_id() - { - var exception = Assert.Throws(() => new InMemoryIndexEventBuffer(null!)); - - Assert.Equal("streamId", exception.ParamName); - } - - [Fact] - public void reader_constructor_rejects_missing_buffer() - { - var exception = Assert.Throws(() => new InMemoryIndexVirtualStreamReader(null!)); - - Assert.Equal("buffer", exception.ParamName); - } - - [Fact] - public void append_rejects_missing_record() - { - var buffer = new InMemoryIndexEventBuffer(new IndexStreamId(IndexStream)); - - var exception = Assert.Throws(() => buffer.Append(null!, 1)); - - Assert.Equal("record", exception.ParamName); - } - - [Fact] - public void append_rejects_mismatched_stream_id() - { - var buffer = new InMemoryIndexEventBuffer(new IndexStreamId(IndexStream)); - - var exception = Assert.Throws(() => - buffer.Append(CreateEventRecord("other-stream", 0, 1), 1)); - - Assert.Equal("record", exception.ParamName); - } - - [Fact] - public void append_rejects_non_sequential_event_number() - { - var buffer = new InMemoryIndexEventBuffer(new IndexStreamId(IndexStream)); - buffer.Append(CreateEventRecord(IndexStream, 0, 1), 1); - - var exception = Assert.Throws(() => - buffer.Append(CreateEventRecord(IndexStream, 2, 3), 3)); - - Assert.Equal("record", exception.ParamName); - } - - [Fact] - public async Task read_forwards_empty_returns_no_stream() - { - var reader = CreateReader(); - - var result = await reader.ReadForwards(GenReadForwards(Guid.NewGuid(), 0, 10), CancellationToken.None); - - Assert.Equal(ReadStreamResult.NoStream, result.Result); - Assert.Equal(-1, result.NextEventNumber); - Assert.Equal(ExpectedVersion.NoStream, result.LastEventNumber); - Assert.Equal(-1, result.TfLastCommitPosition); - Assert.True(result.IsEndOfStream); - Assert.Empty(result.Events); - } - - [Fact] - public async Task read_backwards_empty_returns_no_stream() - { - var reader = CreateReader(); - - var result = await reader.ReadBackwards(GenReadBackwards(Guid.NewGuid(), 0, 10), CancellationToken.None); - - Assert.Equal(ReadStreamResult.NoStream, result.Result); - Assert.Equal(-1, result.NextEventNumber); - Assert.Equal(ExpectedVersion.NoStream, result.LastEventNumber); - Assert.Equal(-1, result.TfLastCommitPosition); - Assert.True(result.IsEndOfStream); - Assert.Empty(result.Events); - } - - [Fact] - public async Task read_forwards_pages_events() - { - var buffer = CreateBufferWithEvents(5); - var reader = new InMemoryIndexVirtualStreamReader(buffer); - var correlation = Guid.NewGuid(); - - var firstPage = await reader.ReadForwards(GenReadForwards(correlation, 0, 2), CancellationToken.None); - Assert.Equal(ReadStreamResult.Success, firstPage.Result); - Assert.Equal(2, firstPage.Events.Count); - Assert.Equal(0, firstPage.Events[0].Event.EventNumber); - Assert.Equal(1, firstPage.Events[1].Event.EventNumber); - Assert.Equal(2, firstPage.NextEventNumber); - Assert.Equal(4, firstPage.LastEventNumber); - Assert.False(firstPage.IsEndOfStream); - Assert.Equal(5, firstPage.TfLastCommitPosition); - - var secondPage = await reader.ReadForwards( - GenReadForwards(correlation, firstPage.NextEventNumber, 2), - CancellationToken.None); - Assert.Equal(2, secondPage.Events.Count); - Assert.Equal(2, secondPage.Events[0].Event.EventNumber); - Assert.Equal(3, secondPage.Events[1].Event.EventNumber); - Assert.Equal(4, secondPage.NextEventNumber); - Assert.False(secondPage.IsEndOfStream); - - var finalPage = await reader.ReadForwards( - GenReadForwards(correlation, secondPage.NextEventNumber, 2), - CancellationToken.None); - Assert.Single(finalPage.Events); - Assert.Equal(4, finalPage.Events[0].Event.EventNumber); - Assert.Equal(5, finalPage.NextEventNumber); - Assert.True(finalPage.IsEndOfStream); - } - - [Fact] - public async Task read_forwards_treats_negative_from_as_zero() - { - var buffer = CreateBufferWithEvents(1); - var reader = new InMemoryIndexVirtualStreamReader(buffer); - - var result = await reader.ReadForwards(GenReadForwards(Guid.NewGuid(), -1, 10), CancellationToken.None); - - Assert.Equal(ReadStreamResult.Success, result.Result); - Assert.Single(result.Events); - Assert.Equal(0, result.Events[0].Event.EventNumber); - } - - [Fact] - public async Task read_forwards_beyond_latest_event_returns_empty_success() - { - var buffer = CreateBufferWithEvents(1); - var reader = new InMemoryIndexVirtualStreamReader(buffer); - - var result = await reader.ReadForwards(GenReadForwards(Guid.NewGuid(), 100, 10), CancellationToken.None); - - Assert.Equal(ReadStreamResult.Success, result.Result); - Assert.Empty(result.Events); - Assert.Equal(1, result.NextEventNumber); - Assert.Equal(0, result.LastEventNumber); - Assert.True(result.IsEndOfStream); - } - - [Fact] - public async Task read_backwards_pages_events_from_latest() - { - var buffer = CreateBufferWithEvents(5); - var reader = new InMemoryIndexVirtualStreamReader(buffer); - var correlation = Guid.NewGuid(); - - var firstPage = await reader.ReadBackwards(GenReadBackwards(correlation, -1, 2), CancellationToken.None); - Assert.Equal(ReadStreamResult.Success, firstPage.Result); - Assert.Equal(2, firstPage.Events.Count); - Assert.Equal(4, firstPage.Events[0].Event.EventNumber); - Assert.Equal(3, firstPage.Events[1].Event.EventNumber); - Assert.Equal(2, firstPage.NextEventNumber); - Assert.Equal(4, firstPage.LastEventNumber); - Assert.False(firstPage.IsEndOfStream); - Assert.Equal(4, firstPage.FromEventNumber); - - var secondPage = await reader.ReadBackwards( - GenReadBackwards(correlation, firstPage.NextEventNumber, 2), - CancellationToken.None); - Assert.Equal(2, secondPage.Events.Count); - Assert.Equal(2, secondPage.Events[0].Event.EventNumber); - Assert.Equal(1, secondPage.Events[1].Event.EventNumber); - Assert.Equal(0, secondPage.NextEventNumber); - Assert.False(secondPage.IsEndOfStream); - - var finalPage = await reader.ReadBackwards( - GenReadBackwards(correlation, secondPage.NextEventNumber, 2), - CancellationToken.None); - Assert.Single(finalPage.Events); - Assert.Equal(0, finalPage.Events[0].Event.EventNumber); - Assert.Equal(-1, finalPage.NextEventNumber); - Assert.True(finalPage.IsEndOfStream); - } - - [Fact] - public async Task read_backwards_from_explicit_position() - { - var buffer = CreateBufferWithEvents(2); - var reader = new InMemoryIndexVirtualStreamReader(buffer); - - var result = await reader.ReadBackwards(GenReadBackwards(Guid.NewGuid(), 0, 10), CancellationToken.None); - - Assert.Equal(ReadStreamResult.Success, result.Result); - Assert.Single(result.Events); - Assert.Equal(0, result.Events[0].Event.EventNumber); - Assert.Equal(-1, result.NextEventNumber); - Assert.Equal(1, result.LastEventNumber); - Assert.True(result.IsEndOfStream); - } - - [Fact] - public async Task read_backwards_beyond_latest_event_reads_from_latest() - { - var buffer = CreateBufferWithEvents(2); - var reader = new InMemoryIndexVirtualStreamReader(buffer); - - var result = await reader.ReadBackwards(GenReadBackwards(Guid.NewGuid(), 100, 10), CancellationToken.None); - - Assert.Equal(ReadStreamResult.Success, result.Result); - Assert.Equal(2, result.Events.Count); - Assert.Equal(1, result.Events[0].Event.EventNumber); - Assert.Equal(0, result.Events[1].Event.EventNumber); - Assert.Equal(-1, result.NextEventNumber); - Assert.Equal(1, result.LastEventNumber); - Assert.True(result.IsEndOfStream); - Assert.Equal(100, result.FromEventNumber); - } - - [Fact] - public void metadata_reflects_buffer_state() - { - var buffer = CreateBufferWithEvents(3); - var reader = new InMemoryIndexVirtualStreamReader(buffer); - - Assert.Equal(2, reader.GetLastEventNumber(IndexStream)); - Assert.Equal(3, reader.GetLastIndexedPosition(IndexStream)); - Assert.Equal(ExpectedVersion.NoStream, reader.GetLastEventNumber("other-stream")); - Assert.Equal(-1, reader.GetLastIndexedPosition("other-stream")); - } - - [Fact] - public void can_read_stream_routes_through_virtual_stream_reader() - { - var buffer = CreateBufferWithEvents(1); - var reader = new InMemoryIndexVirtualStreamReader(buffer); - var composite = new VirtualStreamReader([reader]); - - Assert.True(composite.CanReadStream(IndexStream)); - Assert.False(composite.CanReadStream("other-stream")); - Assert.Equal(0, composite.GetLastEventNumber(IndexStream)); - Assert.Equal(1, composite.GetLastIndexedPosition(IndexStream)); - } - - private static InMemoryIndexVirtualStreamReader CreateReader() => - new(new InMemoryIndexEventBuffer(new IndexStreamId(IndexStream))); - - private static InMemoryIndexEventBuffer CreateBufferWithEvents(int count) - { - var buffer = new InMemoryIndexEventBuffer(new IndexStreamId(IndexStream)); - for (var i = 0; i < count; i++) - { - buffer.Append(CreateEventRecord(IndexStream, i, i + 1), i + 1); - } - - return buffer; - } - - private static EventRecord CreateEventRecord(string streamId, long number, long commitPosition) => - new( - number, - commitPosition, - Guid.NewGuid(), - Guid.NewGuid(), - commitPosition, - 0, - streamId, - number - 1, - DateTime.UtcNow, - PrepareFlags.SingleWrite | PrepareFlags.IsCommitted | PrepareFlags.Data, - $"event-{number}", - Array.Empty(), - Array.Empty()); - - private static ClientMessage.ReadStreamEventsForward GenReadForwards( - Guid correlation, - long fromEventNumber, - int maxCount, - string eventStreamId = IndexStream) => - new( - internalCorrId: correlation, - correlationId: correlation, - envelope: new NoopEnvelope(), - eventStreamId: eventStreamId, - fromEventNumber: fromEventNumber, - maxCount: maxCount, - resolveLinkTos: false, - requireLeader: false, - validationStreamVersion: null, - user: ClaimsPrincipal.Current, - replyOnExpired: true); - - private static ClientMessage.ReadStreamEventsBackward GenReadBackwards( - Guid correlation, - long fromEventNumber, - int maxCount, - string eventStreamId = IndexStream) => - new( - internalCorrId: correlation, - correlationId: correlation, - envelope: new NoopEnvelope(), - eventStreamId: eventStreamId, - fromEventNumber: fromEventNumber, - maxCount: maxCount, - resolveLinkTos: false, - requireLeader: false, - validationStreamVersion: null, - user: ClaimsPrincipal.Current); -} diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexCheckpointCommitTrackerTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexCheckpointCommitTrackerTests.cs deleted file mode 100644 index f0cb66b94..000000000 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexCheckpointCommitTrackerTests.cs +++ /dev/null @@ -1,344 +0,0 @@ -using System; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using EventStore.Core.Services.Storage.Indexing; -using Xunit; - -namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; - -public class IndexCheckpointCommitTrackerTests -{ - [Fact] - public async Task commits_when_batch_size_is_reached() - { - var calls = 0; - var committed = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - await using var tracker = new IndexCheckpointCommitTracker( - batchSize: 5, - maxCommitDelay: TimeSpan.FromSeconds(30), - commit: _ => - { - Interlocked.Increment(ref calls); - committed.SetResult(); - return ValueTask.CompletedTask; - }, - cancellationToken: CancellationToken.None); - - for (var index = 0; index < 5; index++) - tracker.Track(); - - await committed.Task.WaitAsync(CommitTimeout); - - Assert.Equal(1, calls); - } - - [Fact] - public async Task commits_pending_changes_when_delay_elapses() - { - var calls = 0; - var committed = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - await using var tracker = new IndexCheckpointCommitTracker( - batchSize: 100, - maxCommitDelay: TimeSpan.FromMilliseconds(25), - commit: _ => - { - Interlocked.Increment(ref calls); - committed.SetResult(); - return ValueTask.CompletedTask; - }, - cancellationToken: CancellationToken.None); - - tracker.Track(); - - await committed.Task.WaitAsync(CommitTimeout); - - Assert.Equal(1, calls); - } - - [Fact] - public async Task does_not_commit_when_no_changes_were_tracked() - { - var calls = 0; - await using (new IndexCheckpointCommitTracker( - batchSize: 100, - maxCommitDelay: TimeSpan.FromMilliseconds(25), - commit: _ => - { - Interlocked.Increment(ref calls); - return ValueTask.CompletedTask; - }, - cancellationToken: CancellationToken.None)) - { - await Task.Delay(TimeSpan.FromMilliseconds(75)); - } - - Assert.Equal(0, calls); - } - - [Fact] - public async Task does_not_run_commits_reentrantly() - { - var currentCommits = 0; - var maxConcurrentCommits = 0; - var firstCommitStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var releaseCommit = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - await using var tracker = new IndexCheckpointCommitTracker( - batchSize: 3, - maxCommitDelay: TimeSpan.FromSeconds(30), - commit: async _ => - { - var active = Interlocked.Increment(ref currentCommits); - SetMax(ref maxConcurrentCommits, active); - firstCommitStarted.TrySetResult(); - - await releaseCommit.Task; - - Interlocked.Decrement(ref currentCommits); - }, - cancellationToken: CancellationToken.None); - - for (var index = 0; index < 3; index++) - tracker.Track(); - - await firstCommitStarted.Task.WaitAsync(CommitTimeout); - - for (var index = 0; index < 3; index++) - tracker.Track(); - - releaseCommit.SetResult(); - - await Task.Delay(TimeSpan.FromMilliseconds(50)); - - Assert.Equal(1, maxConcurrentCommits); - } - - [Fact] - public async Task concurrent_tracks_trigger_one_batch_commit() - { - var calls = 0; - var committed = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - await using var tracker = new IndexCheckpointCommitTracker( - batchSize: 100, - maxCommitDelay: TimeSpan.FromSeconds(30), - commit: _ => - { - Interlocked.Increment(ref calls); - committed.SetResult(); - return ValueTask.CompletedTask; - }, - cancellationToken: CancellationToken.None); - - await Task.WhenAll(Enumerable - .Range(0, 100) - .Select(_ => Task.Run(tracker.Track))); - - await committed.Task.WaitAsync(CommitTimeout); - - Assert.Equal(1, calls); - } - - [Fact] - public async Task failed_commit_keeps_pending_changes_for_retry() - { - var calls = 0; - var committed = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - await using var tracker = new IndexCheckpointCommitTracker( - batchSize: 1, - maxCommitDelay: TimeSpan.FromSeconds(30), - commit: _ => - { - if (Interlocked.Increment(ref calls) == 1) - throw new InvalidOperationException("transient failure"); - - committed.SetResult(); - return ValueTask.CompletedTask; - }, - cancellationToken: CancellationToken.None); - - tracker.Track(); - - await committed.Task.WaitAsync(CommitTimeout); - - Assert.Equal(2, calls); - } - - [Fact] - public async Task cancellation_stops_future_tracks() - { - var calls = 0; - using var cancellation = new CancellationTokenSource(); - await using var tracker = new IndexCheckpointCommitTracker( - batchSize: 1, - maxCommitDelay: TimeSpan.FromSeconds(30), - commit: _ => - { - Interlocked.Increment(ref calls); - return ValueTask.CompletedTask; - }, - cancellationToken: cancellation.Token); - - await cancellation.CancelAsync(); - await Task.Delay(TimeSpan.FromMilliseconds(50)); - - var exception = Assert.Throws(tracker.Track); - await Task.Delay(TimeSpan.FromMilliseconds(50)); - - Assert.Contains(nameof(IndexCheckpointCommitTracker), exception.Message); - Assert.Equal(0, calls); - } - - [Fact] - public async Task cancellation_commits_pending_changes_before_stopping() - { - var calls = 0; - var committed = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - using var cancellation = new CancellationTokenSource(); - await using var tracker = new IndexCheckpointCommitTracker( - batchSize: 100, - maxCommitDelay: TimeSpan.FromSeconds(30), - commit: _ => - { - Interlocked.Increment(ref calls); - committed.SetResult(); - return ValueTask.CompletedTask; - }, - cancellationToken: cancellation.Token); - - tracker.Track(); - await cancellation.CancelAsync(); - - await committed.Task.WaitAsync(CommitTimeout); - - Assert.Equal(1, calls); - } - - [Fact] - public async Task dispose_commits_pending_changes_before_stopping() - { - var calls = 0; - var tracker = new IndexCheckpointCommitTracker( - batchSize: 100, - maxCommitDelay: TimeSpan.FromSeconds(30), - commit: _ => - { - Interlocked.Increment(ref calls); - return ValueTask.CompletedTask; - }, - cancellationToken: CancellationToken.None); - - tracker.Track(); - await tracker.DisposeAsync(); - - Assert.Equal(1, calls); - } - - [Fact] - public async Task dispose_surfaces_pending_commit_failure() - { - var tracker = new IndexCheckpointCommitTracker( - batchSize: 100, - maxCommitDelay: TimeSpan.FromSeconds(30), - commit: _ => throw new InvalidOperationException("commit failed"), - cancellationToken: CancellationToken.None); - - tracker.Track(); - - var exception = await Assert.ThrowsAsync( - () => tracker.DisposeAsync().AsTask()); - - Assert.Equal("commit failed", exception.Message); - } - - [Fact] - public async Task track_after_dispose_throws() - { - var tracker = new IndexCheckpointCommitTracker( - batchSize: 10, - maxCommitDelay: TimeSpan.FromSeconds(30), - commit: _ => ValueTask.CompletedTask, - cancellationToken: CancellationToken.None); - - await tracker.DisposeAsync(); - - var exception = Assert.Throws(tracker.Track); - Assert.Contains(nameof(IndexCheckpointCommitTracker), exception.Message); - } - - [Fact] - public async Task concurrent_dispose_calls_are_safe() - { - var commitStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var releaseCommit = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var tracker = new IndexCheckpointCommitTracker( - batchSize: 1, - maxCommitDelay: TimeSpan.FromSeconds(30), - commit: async _ => - { - commitStarted.SetResult(); - await releaseCommit.Task; - }, - cancellationToken: CancellationToken.None); - - tracker.Track(); - await commitStarted.Task.WaitAsync(CommitTimeout); - - var disposeTasks = Enumerable - .Range(0, 5) - .Select(_ => Task.Run(async () => await tracker.DisposeAsync())) - .ToArray(); - - await Task.Delay(TimeSpan.FromMilliseconds(50)); - - foreach (var disposeTask in disposeTasks) - Assert.False(disposeTask.IsCompleted); - - releaseCommit.SetResult(); - await Task.WhenAll(disposeTasks).WaitAsync(CommitTimeout); - } - - [Fact] - public async Task dispose_waits_for_active_commit_to_finish() - { - var commitFinished = false; - var commitStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var releaseCommit = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var tracker = new IndexCheckpointCommitTracker( - batchSize: 1, - maxCommitDelay: TimeSpan.FromSeconds(30), - commit: async _ => - { - commitStarted.SetResult(); - await releaseCommit.Task; - commitFinished = true; - }, - cancellationToken: CancellationToken.None); - - tracker.Track(); - await commitStarted.Task.WaitAsync(CommitTimeout); - - var disposeTask = tracker.DisposeAsync().AsTask(); - - Assert.False(disposeTask.IsCompleted); - - releaseCommit.SetResult(); - await disposeTask.WaitAsync(CommitTimeout); - - Assert.True(commitFinished); - } - - private static void SetMax(ref int target, int candidate) - { - while (true) - { - var current = Volatile.Read(ref target); - if (current >= candidate) - return; - - if (Interlocked.CompareExchange(ref target, candidate, current) == current) - return; - } - } - - private static readonly TimeSpan CommitTimeout = TimeSpan.FromSeconds(5); -} diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexCheckpointTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexCheckpointTests.cs deleted file mode 100644 index 4ed6c5702..000000000 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexCheckpointTests.cs +++ /dev/null @@ -1,54 +0,0 @@ -using System; -using EventStore.Core.Services.Storage.Indexing; -using EventStore.Core.Services.Transport.Common; -using Xunit; - -namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; - -public class IndexCheckpointTests -{ - [Theory] - [InlineData(0, 0)] - [InlineData(10, 5)] - public void constructor_accepts_valid_positions(long commitPosition, long preparePosition) - { - var checkpoint = new IndexCheckpoint(commitPosition, preparePosition); - - Assert.Equal(commitPosition, checkpoint.CommitPosition); - Assert.Equal(preparePosition, checkpoint.PreparePosition); - } - - [Fact] - public void constructor_rejects_negative_commit_position() - { - var exception = Assert.Throws(() => new IndexCheckpoint(-1, 0)); - - Assert.Equal("commitPosition", exception.ParamName); - } - - [Fact] - public void constructor_rejects_negative_prepare_position() - { - var exception = Assert.Throws(() => new IndexCheckpoint(0, -1)); - - Assert.Equal("preparePosition", exception.ParamName); - } - - [Fact] - public void constructor_rejects_commit_position_before_prepare_position() - { - var exception = Assert.Throws(() => new IndexCheckpoint(5, 10)); - - Assert.Equal("CommitPosition", exception.ParamName); - } - - [Fact] - public void converts_to_all_stream_position() - { - var checkpoint = new IndexCheckpoint(10, 5); - - var position = checkpoint.ToPosition(); - - Assert.Equal(Position.FromInt64(10, 5), position); - } -} diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexCheckpointWriterTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexCheckpointWriterTests.cs deleted file mode 100644 index aaa000a02..000000000 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexCheckpointWriterTests.cs +++ /dev/null @@ -1,348 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using EventStore.Core.Data; -using EventStore.Core.Services.Storage.Indexing; -using EventStore.Core.TransactionLog.LogRecords; -using Xunit; - -namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; - -public class IndexCheckpointWriterTests -{ - [Fact] - public async Task read_delegates_to_store() - { - var expected = new IndexCheckpoint(10, 5); - var store = new FakeIndexCheckpointStore { Checkpoint = expected }; - var writer = new IndexCheckpointWriter(store); - - var checkpoint = await writer.Read(CancellationToken.None); - - Assert.Equal(expected, checkpoint); - Assert.Equal(1, store.ReadCalls); - } - - [Fact] - public async Task read_seeds_latest_checkpoint() - { - var store = new FakeIndexCheckpointStore { Checkpoint = new IndexCheckpoint(20, 15) }; - var writer = new IndexCheckpointWriter(store); - - await writer.Read(CancellationToken.None); - - var exception = Assert.Throws(() => - writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5))); - - Assert.Contains("backwards", exception.Message, StringComparison.OrdinalIgnoreCase); - } - - [Fact] - public async Task read_discards_stale_pending_checkpoint() - { - var store = new FakeIndexCheckpointStore { Checkpoint = new IndexCheckpoint(20, 15) }; - var writer = new IndexCheckpointWriter(store); - - writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5)); - await writer.Read(CancellationToken.None); - await writer.Commit(CancellationToken.None); - - Assert.Equal(0, store.WriteCalls); - Assert.Equal(new IndexCheckpoint(20, 15), store.Checkpoint); - } - - [Fact] - public async Task commit_before_tracking_is_no_op() - { - var store = new FakeIndexCheckpointStore(); - var writer = new IndexCheckpointWriter(store); - - await writer.Commit(CancellationToken.None); - - Assert.Equal(0, store.WriteCalls); - Assert.Null(await store.Read(CancellationToken.None)); - } - - [Fact] - public async Task tracking_then_commit_writes_commit_and_prepare_positions() - { - var store = new InMemoryIndexCheckpointStore(); - var writer = new IndexCheckpointWriter(store); - - writer.Track(CreateResolvedEvent(commitPosition: 20, preparePosition: 15)); - await writer.Commit(CancellationToken.None); - - var checkpoint = await store.Read(CancellationToken.None); - - Assert.Equal(new IndexCheckpoint(20, 15), checkpoint); - } - - [Fact] - public async Task later_tracked_positions_win() - { - var store = new InMemoryIndexCheckpointStore(); - var writer = new IndexCheckpointWriter(store); - - writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5)); - writer.Track(CreateResolvedEvent(commitPosition: 20, preparePosition: 15)); - await writer.Commit(CancellationToken.None); - - var checkpoint = await store.Read(CancellationToken.None); - - Assert.Equal(new IndexCheckpoint(20, 15), checkpoint); - } - - [Fact] - public void track_without_original_position_throws() - { - var writer = new IndexCheckpointWriter(new InMemoryIndexCheckpointStore()); - var resolvedEvent = CreateResolvedEvent( - commitPosition: 10, - preparePosition: 5, - isSelfCommitted: false).WithoutPosition(); - - var exception = Assert.Throws(() => writer.Track(resolvedEvent)); - - Assert.Contains("original position", exception.Message, StringComparison.OrdinalIgnoreCase); - } - - [Fact] - public void constructor_rejects_null_store() - { - var exception = Assert.Throws(() => new IndexCheckpointWriter(null)); - - Assert.Equal("store", exception.ParamName); - } - - [Fact] - public async Task read_passes_cancellation_token_to_store() - { - using var cancellation = new CancellationTokenSource(); - await cancellation.CancelAsync(); - var store = new FakeIndexCheckpointStore { CancelRead = true }; - var writer = new IndexCheckpointWriter(store); - - await Assert.ThrowsAsync(() => writer.Read(cancellation.Token).AsTask()); - - Assert.Equal(cancellation.Token, store.LastReadToken); - } - - [Fact] - public async Task commit_passes_cancellation_token_to_store() - { - using var cancellation = new CancellationTokenSource(); - await cancellation.CancelAsync(); - var store = new FakeIndexCheckpointStore { CancelWrite = true }; - var writer = new IndexCheckpointWriter(store); - - writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5)); - - await Assert.ThrowsAsync(() => writer.Commit(cancellation.Token).AsTask()); - - Assert.Equal(cancellation.Token, store.LastWriteToken); - } - - [Fact] - public void track_rejects_regressive_pending_position() - { - var writer = new IndexCheckpointWriter(new InMemoryIndexCheckpointStore()); - - writer.Track(CreateResolvedEvent(commitPosition: 20, preparePosition: 15)); - - var exception = Assert.Throws(() => - writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5))); - - Assert.Contains("backwards", exception.Message, StringComparison.OrdinalIgnoreCase); - } - - [Fact] - public async Task tracking_equal_position_is_idempotent_and_commits_once() - { - var store = new FakeIndexCheckpointStore(); - var writer = new IndexCheckpointWriter(store); - - writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5)); - writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5)); - - await writer.Commit(CancellationToken.None); - await writer.Commit(CancellationToken.None); - - Assert.Equal(1, store.WriteCalls); - Assert.Equal(new IndexCheckpoint(10, 5), store.Checkpoint); - } - - [Fact] - public async Task commit_failure_leaves_pending_for_retry() - { - var store = new FakeIndexCheckpointStore { FailWrite = true }; - var writer = new IndexCheckpointWriter(store); - - writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5)); - - await Assert.ThrowsAsync(() => writer.Commit(CancellationToken.None).AsTask()); - - Assert.Equal(1, store.WriteCalls); - - store.FailWrite = false; - - await writer.Commit(CancellationToken.None); - - Assert.Equal(2, store.WriteCalls); - Assert.Equal(new IndexCheckpoint(10, 5), store.Checkpoint); - } - - [Fact] - public async Task successful_commit_clears_pending_so_second_commit_is_no_op() - { - var store = new FakeIndexCheckpointStore(); - var writer = new IndexCheckpointWriter(store); - - writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5)); - - await writer.Commit(CancellationToken.None); - await writer.Commit(CancellationToken.None); - - Assert.Equal(1, store.WriteCalls); - Assert.Equal(new IndexCheckpoint(10, 5), store.Checkpoint); - } - - [Fact] - public async Task successful_commit_prevents_later_regression() - { - var writer = new IndexCheckpointWriter(new InMemoryIndexCheckpointStore()); - - writer.Track(CreateResolvedEvent(commitPosition: 20, preparePosition: 15)); - await writer.Commit(CancellationToken.None); - - var exception = Assert.Throws(() => - writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5))); - - Assert.Contains("backwards", exception.Message, StringComparison.OrdinalIgnoreCase); - } - - [Fact] - public async Task tracking_higher_position_during_async_commit_preserves_pending_for_next_commit() - { - var store = new BlockingIndexCheckpointStore(); - var writer = new IndexCheckpointWriter(store); - - writer.Track(CreateResolvedEvent(commitPosition: 10, preparePosition: 5)); - - var commitTask = writer.Commit(CancellationToken.None).AsTask(); - await store.WaitForWriteStarted(); - - writer.Track(CreateResolvedEvent(commitPosition: 20, preparePosition: 15)); - - store.ReleaseWrite(); - await commitTask; - - Assert.Equal(1, store.WriteCalls); - Assert.Equal(new IndexCheckpoint(10, 5), store.LastWritten); - - await writer.Commit(CancellationToken.None); - - Assert.Equal(2, store.WriteCalls); - Assert.Equal(new IndexCheckpoint(20, 15), store.LastWritten); - - await writer.Commit(CancellationToken.None); - - Assert.Equal(2, store.WriteCalls); - } - - private static ResolvedEvent CreateResolvedEvent(long commitPosition, long preparePosition, bool isSelfCommitted = true) - { - var flags = PrepareFlags.SingleWrite | PrepareFlags.Data; - if (isSelfCommitted) - { - flags |= PrepareFlags.IsCommitted; - } - - var record = new EventRecord( - eventNumber: 0, - logPosition: preparePosition, - correlationId: Guid.NewGuid(), - eventId: Guid.NewGuid(), - transactionPosition: commitPosition, - transactionOffset: 0, - eventStreamId: "stream-1", - expectedVersion: -1, - timeStamp: DateTime.UtcNow, - flags: flags, - eventType: "event-type", - data: Array.Empty(), - metadata: Array.Empty()); - - return ResolvedEvent.ForUnresolvedEvent(record, commitPosition); - } - - private sealed class FakeIndexCheckpointStore : IIndexCheckpointStore - { - public IndexCheckpoint? Checkpoint { get; set; } - public bool CancelRead { get; init; } - public bool CancelWrite { get; init; } - public bool FailWrite { get; set; } - public int ReadCalls { get; private set; } - public int WriteCalls { get; private set; } - public CancellationToken LastReadToken { get; private set; } - public CancellationToken LastWriteToken { get; private set; } - - public ValueTask Read(CancellationToken token) - { - ReadCalls++; - LastReadToken = token; - - if (CancelRead) - { - token.ThrowIfCancellationRequested(); - } - - return ValueTask.FromResult(Checkpoint); - } - - public ValueTask Write(IndexCheckpoint checkpoint, CancellationToken token) - { - WriteCalls++; - LastWriteToken = token; - - if (FailWrite) - { - throw new InvalidOperationException("Simulated index checkpoint write failure."); - } - - Checkpoint = checkpoint; - - if (CancelWrite) - { - token.ThrowIfCancellationRequested(); - } - - return ValueTask.CompletedTask; - } - } - - private sealed class BlockingIndexCheckpointStore : IIndexCheckpointStore - { - private readonly TaskCompletionSource _writeStarted = new(TaskCreationOptions.RunContinuationsAsynchronously); - private readonly TaskCompletionSource _releaseWrite = new(TaskCreationOptions.RunContinuationsAsynchronously); - - public IndexCheckpoint? LastWritten { get; private set; } - public int WriteCalls { get; private set; } - - public ValueTask Read(CancellationToken token) => - ValueTask.FromResult(LastWritten); - - public async ValueTask Write(IndexCheckpoint checkpoint, CancellationToken token) - { - WriteCalls++; - _writeStarted.TrySetResult(); - - await _releaseWrite.Task; - - LastWritten = checkpoint; - } - - public Task WaitForWriteStarted() => _writeStarted.Task; - - public void ReleaseWrite() => _releaseWrite.TrySetResult(); - } -} diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexDefinitionTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexDefinitionTests.cs deleted file mode 100644 index d2436670a..000000000 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexDefinitionTests.cs +++ /dev/null @@ -1,132 +0,0 @@ -using System; -using EventStore.Core.Services.Storage.Indexing; -using Xunit; - -namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; - -public class IndexDefinitionTests -{ - [Fact] - public void constructor_rejects_missing_fields() - { - var exception = Assert.Throws(() => new IndexDefinition(new IndexEventFilter("event.type == 'order'"), null!)); - - Assert.Equal("fields", exception.ParamName); - } - - [Fact] - public void constructor_rejects_null_field() - { - var exception = Assert.Throws(() => new IndexDefinition(new IndexEventFilter("event.type == 'order'"), [null!])); - - Assert.Equal("fields", exception.ParamName); - } - - [Fact] - public void constructor_requires_filter_or_field() - { - var exception = Assert.Throws(() => new IndexDefinition(null, [])); - - Assert.Equal("fields", exception.ParamName); - } - - [Fact] - public void constructor_accepts_filter_without_fields() - { - var filter = new IndexEventFilter("event.type == 'order'"); - var definition = new IndexDefinition(filter, []); - - Assert.Same(filter, definition.Filter); - Assert.Empty(definition.Fields); - } - - [Fact] - public void constructor_accepts_field_without_filter() - { - var field = new IndexFieldDefinition("customerId", new IndexFieldSelector("event.body.customerId")); - var definition = new IndexDefinition(null, [field]); - - Assert.Null(definition.Filter); - Assert.Equal([field], definition.Fields); - } - - [Fact] - public void constructor_copies_fields() - { - var field = new IndexFieldDefinition("customerId", new IndexFieldSelector("event.body.customerId")); - var fields = new[] { field }; - var definition = new IndexDefinition(null, fields); - - fields[0] = new IndexFieldDefinition("tenantId", new IndexFieldSelector("event.body.tenantId")); - - Assert.Equal([field], definition.Fields); - } - - [Fact] - public void equivalent_definitions_are_equal() - { - var first = new IndexDefinition( - new IndexEventFilter("event.type == 'order'"), - [new IndexFieldDefinition("customerId", new IndexFieldSelector("event.body.customerId"))]); - var second = new IndexDefinition( - new IndexEventFilter("event.type == 'order'"), - [new IndexFieldDefinition("customerId", new IndexFieldSelector("event.body.customerId"))]); - - Assert.Equal(first, second); - Assert.Equal(first.GetHashCode(), second.GetHashCode()); - } - - [Fact] - public void different_fields_are_not_equal() - { - var first = new IndexDefinition( - new IndexEventFilter("event.type == 'order'"), - [new IndexFieldDefinition("customerId", new IndexFieldSelector("event.body.customerId"))]); - var second = new IndexDefinition( - new IndexEventFilter("event.type == 'order'"), - [new IndexFieldDefinition("tenantId", new IndexFieldSelector("event.body.tenantId"))]); - - Assert.NotEqual(first, second); - } - - [Theory] - [InlineData(null)] - [InlineData("")] - [InlineData(" ")] - public void filter_rejects_empty_value(string value) - { - var exception = Assert.Throws(() => new IndexEventFilter(value)); - - Assert.Equal("value", exception.ParamName); - } - - [Theory] - [InlineData(null)] - [InlineData("")] - [InlineData(" ")] - public void field_rejects_empty_name(string name) - { - var exception = Assert.Throws(() => new IndexFieldDefinition(name, new IndexFieldSelector("event.body.customerId"))); - - Assert.Equal("name", exception.ParamName); - } - - [Fact] - public void field_rejects_missing_selector() - { - var exception = Assert.Throws(() => new IndexFieldDefinition("customerId", null!)); - - Assert.Equal("selector", exception.ParamName); - } - - [Theory] - [InlineData(null)] - [InlineData("")] - [InlineData(" ")] - public void field_selector_rejects_empty_value(string value) - { - var exception = Assert.Throws(() => new IndexFieldSelector(value)); - - Assert.Equal("value", exception.ParamName); - } -} diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexNameTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexNameTests.cs deleted file mode 100644 index 05f2b91bf..000000000 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexNameTests.cs +++ /dev/null @@ -1,51 +0,0 @@ -using System; -using EventStore.Core.Services.Storage.Indexing; -using Xunit; - -namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; - -public class IndexNameTests -{ - [Theory] - [InlineData("orders")] - [InlineData("orders_by_customer")] - [InlineData("orders-by-customer")] - [InlineData("orders2")] - public void constructor_accepts_valid_name(string value) - { - var name = new IndexName(value); - - Assert.Equal(value, name.Value); - } - - [Theory] - [InlineData(null)] - [InlineData("")] - [InlineData(" ")] - public void constructor_rejects_empty_name(string value) - { - var exception = Assert.Throws(() => new IndexName(value)); - - Assert.Equal("value", exception.ParamName); - } - - [Theory] - [InlineData("Orders")] - [InlineData("orders by customer")] - [InlineData("orders.by.customer")] - [InlineData("orders/active")] - public void constructor_rejects_invalid_characters(string value) - { - var exception = Assert.Throws(() => new IndexName(value)); - - Assert.Equal("value", exception.ParamName); - } - - [Fact] - public void to_string_returns_name() - { - var name = new IndexName("orders"); - - Assert.Equal("orders", name.ToString()); - } -} diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexStreamIdTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexStreamIdTests.cs deleted file mode 100644 index 04dd57c1d..000000000 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexStreamIdTests.cs +++ /dev/null @@ -1,47 +0,0 @@ -using System; -using EventStore.Core.Services.Storage.Indexing; -using Xunit; - -namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; - -public class IndexStreamIdTests -{ - [Theory] - [InlineData("index-stream")] - [InlineData("$index-stream")] - public void constructor_accepts_valid_stream_id(string value) - { - var streamId = new IndexStreamId(value); - - Assert.Equal(value, streamId.Value); - } - - [Theory] - [InlineData(null)] - [InlineData("")] - [InlineData("$$")] - public void constructor_rejects_invalid_stream_id(string value) - { - var exception = Assert.Throws(() => new IndexStreamId(value)); - - Assert.Equal("value", exception.ParamName); - Assert.Equal("Index stream id must be a valid stream id. (Parameter 'value')", exception.Message); - } - - [Fact] - public void constructor_rejects_metastream_id() - { - var exception = Assert.Throws(() => new IndexStreamId("$$index-stream")); - - Assert.Equal("value", exception.ParamName); - Assert.Equal("Index stream id cannot be a metastream id. (Parameter 'value')", exception.Message); - } - - [Fact] - public void to_string_returns_stream_id() - { - var streamId = new IndexStreamId("$index-stream"); - - Assert.Equal("$index-stream", streamId.ToString()); - } -} diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexVirtualStreamReaderTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexVirtualStreamReaderTests.cs deleted file mode 100644 index 76ae22ac5..000000000 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexVirtualStreamReaderTests.cs +++ /dev/null @@ -1,69 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using EventStore.Core.Messages; -using EventStore.Core.Services.Storage.Indexing; -using Xunit; - -namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; - -public class IndexVirtualStreamReaderTests -{ - [Fact] - public void constructor_rejects_missing_stream_id() - { - var exception = Assert.Throws(() => new TestIndexVirtualStreamReader(null!)); - - Assert.Equal("streamId", exception.ParamName); - } - - [Fact] - public void exposes_stream_id() - { - var streamId = new IndexStreamId("$index-stream"); - var reader = new TestIndexVirtualStreamReader(streamId); - - Assert.Same(streamId, reader.StreamId); - } - - [Fact] - public void can_read_stream_when_candidate_matches() - { - var reader = new TestIndexVirtualStreamReader(new IndexStreamId("$index-stream")); - - Assert.True(reader.CanReadStream("$index-stream")); - } - - [Fact] - public void cannot_read_stream_when_candidate_differs() - { - var reader = new TestIndexVirtualStreamReader(new IndexStreamId("$index-stream")); - - Assert.False(reader.CanReadStream("other-stream")); - } - - [Fact] - public void cannot_read_stream_when_candidate_is_null() - { - var reader = new TestIndexVirtualStreamReader(new IndexStreamId("$index-stream")); - - Assert.False(reader.CanReadStream(null!)); - } - - private sealed class TestIndexVirtualStreamReader(IndexStreamId streamId) : IndexVirtualStreamReader(streamId) - { - public override ValueTask ReadForwards( - ClientMessage.ReadStreamEventsForward msg, - CancellationToken token) => - throw new NotSupportedException(); - - public override ValueTask ReadBackwards( - ClientMessage.ReadStreamEventsBackward msg, - CancellationToken token) => - throw new NotSupportedException(); - - public override long GetLastEventNumber(string streamId) => throw new NotSupportedException(); - - public override long GetLastIndexedPosition(string streamId) => throw new NotSupportedException(); - } -} diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingComponentHostTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingComponentHostTests.cs deleted file mode 100644 index e2c8344eb..000000000 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingComponentHostTests.cs +++ /dev/null @@ -1,286 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using EventStore.Core.Bus; -using EventStore.Core.Data; -using EventStore.Core.Messages; -using EventStore.Core.Messaging; -using EventStore.Core.Services.Storage.Indexing; -using EventStore.Core.Services.Storage.InMemory; -using EventStore.Core.Services.Transport.Enumerators; -using Microsoft.AspNetCore.Builder; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; -using Xunit; - -namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; - -public class IndexingComponentHostTests -{ - [Fact] - public void configure_services_rejects_missing_services() - { - var host = new IndexingComponentHost(new FakeIndexingComponent()); - var configuration = new ConfigurationBuilder().Build(); - - var exception = Assert.Throws(() => - host.ConfigureServices(null!, configuration)); - - Assert.Equal("services", exception.ParamName); - } - - [Fact] - public void configure_services_rejects_missing_configuration() - { - var host = new IndexingComponentHost(new FakeIndexingComponent()); - - var exception = Assert.Throws(() => - host.ConfigureServices(new ServiceCollection(), null!)); - - Assert.Equal("configuration", exception.ParamName); - } - - [Fact] - public void configure_application_rejects_missing_builder() - { - var host = new IndexingComponentHost(new FakeIndexingComponent()); - var configuration = new ConfigurationBuilder().Build(); - - var exception = Assert.Throws(() => - host.ConfigureApplication(null!, configuration)); - - Assert.Equal("builder", exception.ParamName); - } - - [Fact] - public void configure_application_rejects_missing_configuration() - { - var host = new IndexingComponentHost(new FakeIndexingComponent()); - using var provider = new ServiceCollection().BuildServiceProvider(); - - var exception = Assert.Throws(() => - host.ConfigureApplication(new ApplicationBuilder(provider), null!)); - - Assert.Equal("configuration", exception.ParamName); - } - - [Fact] - public void constructor_rejects_missing_components() - { - var exception = Assert.Throws(() => - new IndexingComponentHost((IReadOnlyList)null!)); - - Assert.Equal("components", exception.ParamName); - } - - [Fact] - public void constructor_rejects_empty_components() - { - var exception = Assert.Throws(() => - new IndexingComponentHost([])); - - Assert.Equal("components", exception.ParamName); - Assert.Equal("At least one indexing component is required. (Parameter 'components')", exception.Message); - } - - [Fact] - public void constructor_rejects_missing_component() - { - var exception = Assert.Throws(() => - new IndexingComponentHost([null!])); - - Assert.Equal("components", exception.ParamName); - Assert.Equal("Indexing components cannot contain null. (Parameter 'components')", exception.Message); - } - - [Fact] - public void constructor_rejects_component_with_missing_virtual_stream_readers() - { - var exception = Assert.Throws(() => - new IndexingComponentHost(new MissingVirtualStreamReadersComponent())); - - Assert.Equal("components", exception.ParamName); - Assert.Equal("Indexing component virtual stream readers cannot be null. (Parameter 'components')", exception.Message); - } - - [Fact] - public void constructor_rejects_component_with_missing_virtual_stream_reader() - { - var exception = Assert.Throws(() => - new IndexingComponentHost(new FakeIndexingComponent([null!]))); - - Assert.Equal("components", exception.ParamName); - Assert.Equal("Indexing component virtual stream readers cannot contain null. (Parameter 'components')", exception.Message); - } - - [Fact] - public void exposes_component_virtual_stream_readers() - { - var first = new FakeVirtualStreamReader("$idx-first"); - var second = new FakeVirtualStreamReader("$idx-second"); - var component = new FakeIndexingComponent(first, second); - var host = new IndexingComponentHost(component); - - Assert.Equal([first, second], host.VirtualStreamReaders); - } - - [Fact] - public void exposes_grouped_component_virtual_stream_readers() - { - var first = new FakeVirtualStreamReader("$idx-first"); - var second = new FakeVirtualStreamReader("$idx-second"); - var firstComponent = new FakeIndexingComponent(first); - var secondComponent = new FakeIndexingComponent(second); - var host = new IndexingComponentHost([firstComponent, secondComponent]); - - Assert.Equal([first, second], host.VirtualStreamReaders); - } - - [Fact] - public async Task configure_application_activates_registered_indexing_service() - { - var subscriber = new RecordingSubscriber(); - var services = new ServiceCollection(); - var component = new FakeIndexingComponent(); - var host = new IndexingComponentHost(component); - var configuration = new ConfigurationBuilder().Build(); - - services.AddSingleton(subscriber); - services.AddSingleton(new RecordingPublisher()); - host.ConfigureServices(services, configuration); - - await using var provider = services.BuildServiceProvider(); - _ = provider.GetRequiredService(); - - Assert.False(subscriber.Has()); - Assert.False(subscriber.Has()); - - host.ConfigureApplication(new ApplicationBuilder(provider), configuration); - - Assert.True(subscriber.Has()); - Assert.True(subscriber.Has()); - } - - [Fact] - public async Task configure_application_activates_grouped_indexing_services() - { - var subscriber = new RecordingSubscriber(); - var services = new ServiceCollection(); - var first = new FakeIndexingComponent(); - var second = new FakeIndexingComponent(); - var host = new IndexingComponentHost([first, second]); - var configuration = new ConfigurationBuilder().Build(); - - services.AddSingleton(subscriber); - services.AddSingleton(new RecordingPublisher()); - host.ConfigureServices(services, configuration); - - await using var provider = services.BuildServiceProvider(); - Assert.Equal(2, provider.GetServices().Count()); - - host.ConfigureApplication(new ApplicationBuilder(provider), configuration); - - Assert.Equal(2, subscriber.SubscriptionCount()); - Assert.Equal(2, subscriber.SubscriptionCount()); - } - - [Fact] - public async Task configure_application_can_register_indexing_service_more_than_once() - { - var subscriber = new RecordingSubscriber(); - var services = new ServiceCollection(); - var component = new FakeIndexingComponent(); - var host = new IndexingComponentHost(component); - var configuration = new ConfigurationBuilder().Build(); - - services.AddSingleton(subscriber); - services.AddSingleton(new RecordingPublisher()); - host.ConfigureServices(services, configuration); - - await using var provider = services.BuildServiceProvider(); - var application = new ApplicationBuilder(provider); - - host.ConfigureApplication(application, configuration); - host.ConfigureApplication(application, configuration); - - Assert.Equal(1, subscriber.SubscriptionCount()); - Assert.Equal(1, subscriber.SubscriptionCount()); - } - - private sealed class RecordingSubscriber : ISubscriber - { - private readonly Dictionary _subscriptions = []; - - public void Subscribe(IAsyncHandle handler) where T : Message => - _subscriptions[typeof(T)] = SubscriptionCount() + 1; - - public void Unsubscribe(IAsyncHandle handler) where T : Message => - _subscriptions.Remove(typeof(T)); - - public bool Has() where T : Message => SubscriptionCount() > 0; - - public int SubscriptionCount() where T : Message => - _subscriptions.GetValueOrDefault(typeof(T)); - } - - private sealed class RecordingPublisher : IPublisher - { - public void Publish(Message message) - { - } - } - - private sealed class FakeIndexingComponent(params IVirtualStreamReader[] virtualStreamReaders) : IIndexingComponent - { - public IIndexingProcessor Processor { get; } = new FakeIndexingProcessor(); - - public IReadOnlyList VirtualStreamReaders { get; } = virtualStreamReaders; - - public ValueTask Initialize(CancellationToken token) => ValueTask.CompletedTask; - - public ValueTask ReadCheckpoint(CancellationToken token) => ValueTask.FromResult(null); - - public ValueTask DisposeAsync() => ValueTask.CompletedTask; - } - - private sealed class MissingVirtualStreamReadersComponent : IIndexingComponent - { - public IIndexingProcessor Processor { get; } = new FakeIndexingProcessor(); - - public IReadOnlyList VirtualStreamReaders => null!; - - public ValueTask Initialize(CancellationToken token) => ValueTask.CompletedTask; - - public ValueTask ReadCheckpoint(CancellationToken token) => ValueTask.FromResult(null); - - public ValueTask DisposeAsync() => ValueTask.CompletedTask; - } - - private sealed class FakeIndexingProcessor : IIndexingProcessor - { - public ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken token) => ValueTask.CompletedTask; - - public ValueTask Commit(CancellationToken token) => ValueTask.CompletedTask; - } - - private sealed class FakeVirtualStreamReader(string streamId) : IVirtualStreamReader - { - public ValueTask ReadForwards( - ClientMessage.ReadStreamEventsForward msg, - CancellationToken token) => - throw new NotSupportedException(); - - public ValueTask ReadBackwards( - ClientMessage.ReadStreamEventsBackward msg, - CancellationToken token) => - throw new NotSupportedException(); - - public long GetLastEventNumber(string streamId) => 0; - - public long GetLastIndexedPosition(string streamId) => 0; - - public bool CanReadStream(string candidateStreamId) => candidateStreamId == streamId; - } -} diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingServiceTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingServiceTests.cs deleted file mode 100644 index 4e5051c4c..000000000 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingServiceTests.cs +++ /dev/null @@ -1,266 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using EventStore.Core.Bus; -using EventStore.Core.Data; -using EventStore.Core.Messages; -using EventStore.Core.Messaging; -using EventStore.Core.Services.Storage.Indexing; -using EventStore.Core.Services.Storage.InMemory; -using EventStore.Core.Services.Transport.Enumerators; -using Xunit; - -namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; - -public class IndexingServiceTests -{ - [Fact] - public void constructor_rejects_missing_component() - { - var exception = Assert.Throws(() => new IndexingService( - null!, - new FakeIndexingEventSourceFactory(new FakeIndexingEventSource()), - new RecordingSubscriber(), - IndexingSubscriptionOptions.Default)); - - Assert.Equal("component", exception.ParamName); - } - - [Fact] - public void constructor_rejects_missing_event_source_factory() - { - var exception = Assert.Throws(() => new IndexingService( - new FakeIndexingComponent(), - null!, - new RecordingSubscriber(), - IndexingSubscriptionOptions.Default)); - - Assert.Equal("eventSourceFactory", exception.ParamName); - } - - [Fact] - public void constructor_rejects_missing_subscriber() - { - var exception = Assert.Throws(() => new IndexingService( - new FakeIndexingComponent(), - new FakeIndexingEventSourceFactory(new FakeIndexingEventSource()), - null!, - IndexingSubscriptionOptions.Default)); - - Assert.Equal("subscriber", exception.ParamName); - } - - [Fact] - public void constructor_rejects_missing_options() - { - var exception = Assert.Throws(() => new IndexingService( - new FakeIndexingComponent(), - new FakeIndexingEventSourceFactory(new FakeIndexingEventSource()), - new RecordingSubscriber(), - null!)); - - Assert.Equal("options", exception.ParamName); - } - - [Fact] - public async Task shutdown_disposes_subscription_and_unsubscribes() - { - var subscriber = new RecordingSubscriber(); - var component = new FakeIndexingComponent(); - var eventSource = new FakeIndexingEventSource(); - var service = new IndexingService( - component, - new FakeIndexingEventSourceFactory(eventSource), - subscriber, - IndexingSubscriptionOptions.Default); - - service.Register(); - await service.HandleAsync(new SystemMessage.SystemReady(), CancellationToken.None); - await service.HandleAsync(new SystemMessage.BecomeShuttingDown(Guid.NewGuid(), false, false), CancellationToken.None); - - Assert.True(component.Disposed); - Assert.True(eventSource.Disposed); - Assert.False(subscriber.Has()); - Assert.False(subscriber.Has()); - } - - [Fact] - public async Task repeated_system_ready_does_not_restart_subscription() - { - var subscriber = new RecordingSubscriber(); - var component = new FakeIndexingComponent(); - var service = new IndexingService( - component, - new FakeIndexingEventSourceFactory(new FakeIndexingEventSource()), - subscriber, - IndexingSubscriptionOptions.Default); - - await service.HandleAsync(new SystemMessage.SystemReady(), CancellationToken.None); - await service.HandleAsync(new SystemMessage.SystemReady(), CancellationToken.None); - - Assert.Equal(1, component.InitializeCount); - - await service.DisposeAsync(); - } - - [Fact] - public async Task failed_system_ready_can_be_retried() - { - var subscriber = new RecordingSubscriber(); - var component = new FakeIndexingComponent(initializeFailures: 1); - var service = new IndexingService( - component, - new FakeIndexingEventSourceFactory(new FakeIndexingEventSource()), - subscriber, - IndexingSubscriptionOptions.Default); - - await Assert.ThrowsAsync(() => - service.HandleAsync(new SystemMessage.SystemReady(), CancellationToken.None).AsTask()); - await service.HandleAsync(new SystemMessage.SystemReady(), CancellationToken.None); - - Assert.Equal(2, component.InitializeCount); - - await service.DisposeAsync(); - } - - [Fact] - public async Task system_ready_after_dispose_is_rejected() - { - var service = new IndexingService( - new FakeIndexingComponent(), - new FakeIndexingEventSourceFactory(new FakeIndexingEventSource()), - new RecordingSubscriber(), - IndexingSubscriptionOptions.Default); - - await service.DisposeAsync(); - - var exception = await Assert.ThrowsAsync(() => - service.HandleAsync(new SystemMessage.SystemReady(), CancellationToken.None).AsTask()); - - Assert.Contains(nameof(IndexingService), exception.Message); - } - - [Fact] - public async Task system_ready_after_started_service_is_disposed_is_rejected() - { - var service = new IndexingService( - new FakeIndexingComponent(), - new FakeIndexingEventSourceFactory(new FakeIndexingEventSource()), - new RecordingSubscriber(), - IndexingSubscriptionOptions.Default); - - await service.HandleAsync(new SystemMessage.SystemReady(), CancellationToken.None); - await service.DisposeAsync(); - - var exception = await Assert.ThrowsAsync(() => - service.HandleAsync(new SystemMessage.SystemReady(), CancellationToken.None).AsTask()); - - Assert.Contains(nameof(IndexingService), exception.Message); - } - - [Fact] - public async Task dispose_unsubscribes_when_subscription_cleanup_fails() - { - var subscriber = new RecordingSubscriber(); - var service = new IndexingService( - new FakeIndexingComponent(throwOnDispose: true), - new FakeIndexingEventSourceFactory(new FakeIndexingEventSource()), - subscriber, - IndexingSubscriptionOptions.Default); - - service.Register(); - var exception = await Assert.ThrowsAsync( - () => service.DisposeAsync().AsTask()); - - Assert.Equal("dispose failed", exception.Message); - Assert.False(subscriber.Has()); - Assert.False(subscriber.Has()); - } - - private sealed class RecordingSubscriber : ISubscriber - { - private readonly HashSet _subscriptions = []; - - public void Subscribe(IAsyncHandle handler) where T : Message => - _subscriptions.Add(typeof(T)); - - public void Unsubscribe(IAsyncHandle handler) where T : Message => - _subscriptions.Remove(typeof(T)); - - public bool Has() where T : Message => _subscriptions.Contains(typeof(T)); - } - - private sealed class FakeIndexingComponent(bool throwOnDispose = false, int initializeFailures = 0) : IIndexingComponent - { - private int _initializeFailures = initializeFailures; - - public IIndexingProcessor Processor { get; } = new FakeIndexingProcessor(); - - public IReadOnlyList VirtualStreamReaders { get; } = []; - - public bool Disposed { get; private set; } - - public int InitializeCount { get; private set; } - - public ValueTask Initialize(CancellationToken token) - { - InitializeCount++; - if (Interlocked.Decrement(ref _initializeFailures) >= 0) - { - throw new InvalidOperationException("initialize failed"); - } - - return ValueTask.CompletedTask; - } - - public ValueTask ReadCheckpoint(CancellationToken token) => ValueTask.FromResult(null); - - public ValueTask DisposeAsync() - { - Disposed = true; - return throwOnDispose - ? ValueTask.FromException(new InvalidOperationException("dispose failed")) - : ValueTask.CompletedTask; - } - } - - private sealed class FakeIndexingProcessor : IIndexingProcessor - { - public ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken token) => ValueTask.CompletedTask; - - public ValueTask Commit(CancellationToken token) => ValueTask.CompletedTask; - } - - private sealed class FakeIndexingEventSourceFactory(FakeIndexingEventSource source) : IIndexingEventSourceFactory - { - public IIndexingEventSource Create(IndexCheckpoint? checkpoint, CancellationToken token) - { - source.Bind(token); - return source; - } - } - - private sealed class FakeIndexingEventSource : IIndexingEventSource - { - private CancellationToken _token; - - public ReadResponse Current => null; - - public bool Disposed { get; private set; } - - public void Bind(CancellationToken token) => _token = token; - - public async ValueTask MoveNextAsync() - { - await Task.Delay(Timeout.InfiniteTimeSpan, _token); - return false; - } - - public ValueTask DisposeAsync() - { - Disposed = true; - return ValueTask.CompletedTask; - } - } -} diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionOptionsTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionOptionsTests.cs deleted file mode 100644 index cc7151488..000000000 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionOptionsTests.cs +++ /dev/null @@ -1,39 +0,0 @@ -using System; -using EventStore.Core.Services.Storage.Indexing; -using Xunit; - -namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; - -public class IndexingSubscriptionOptionsTests -{ - [Fact] - public void constructor_accepts_valid_values() - { - var options = new IndexingSubscriptionOptions(100, TimeSpan.FromSeconds(5)); - - Assert.Equal(100, options.CheckpointCommitBatchSize); - Assert.Equal(TimeSpan.FromSeconds(5), options.CheckpointCommitDelay); - } - - [Theory] - [InlineData(0)] - [InlineData(-1)] - public void constructor_rejects_non_positive_checkpoint_commit_batch_size(int batchSize) - { - var exception = Assert.Throws(() => - new IndexingSubscriptionOptions(batchSize, TimeSpan.FromSeconds(5))); - - Assert.Equal("checkpointCommitBatchSize", exception.ParamName); - } - - [Theory] - [InlineData(0)] - [InlineData(-1)] - public void constructor_rejects_non_positive_checkpoint_commit_delay(int milliseconds) - { - var exception = Assert.Throws(() => - new IndexingSubscriptionOptions(100, TimeSpan.FromMilliseconds(milliseconds))); - - Assert.Equal("checkpointCommitDelay", exception.ParamName); - } -} diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs deleted file mode 100644 index 1636b0d3b..000000000 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs +++ /dev/null @@ -1,532 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using EventStore.Core.Data; -using EventStore.Core.Services.Storage.Indexing; -using EventStore.Core.Services.Storage.InMemory; -using EventStore.Core.Services.Transport.Common; -using EventStore.Core.Services.Transport.Enumerators; -using EventStore.Core.TransactionLog.LogRecords; -using Xunit; - -namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; - -public class IndexingSubscriptionTests -{ - [Fact] - public void constructor_rejects_missing_component() - { - var exception = Assert.Throws(() => new IndexingSubscription( - null!, - new FakeIndexingEventSourceFactory(new FakeIndexingEventSource()), - IndexingSubscriptionOptions.Default)); - - Assert.Equal("component", exception.ParamName); - } - - [Fact] - public void constructor_rejects_missing_event_source_factory() - { - var exception = Assert.Throws(() => new IndexingSubscription( - new FakeIndexingComponent(), - null!, - IndexingSubscriptionOptions.Default)); - - Assert.Equal("eventSourceFactory", exception.ParamName); - } - - [Fact] - public void constructor_rejects_missing_options() - { - var exception = Assert.Throws(() => new IndexingSubscription( - new FakeIndexingComponent(), - new FakeIndexingEventSourceFactory(new FakeIndexingEventSource()), - null!)); - - Assert.Equal("options", exception.ParamName); - } - - [Fact] - public async Task starts_from_component_checkpoint() - { - var checkpoint = new IndexCheckpoint(10, 5); - var component = new FakeIndexingComponent(checkpoint); - var eventSource = new FakeIndexingEventSource(); - var eventSources = new FakeIndexingEventSourceFactory(eventSource); - await using var subscription = new IndexingSubscription( - component, - eventSources, - IndexingSubscriptionOptions.Default); - - await subscription.Start(CancellationToken.None); - - Assert.Equal(checkpoint, eventSources.Checkpoint); - } - - [Fact] - public async Task can_start_again_after_startup_failure() - { - var component = new FakeIndexingComponent(initializeFailures: 1); - var eventSource = new FakeIndexingEventSource(); - await using var subscription = new IndexingSubscription( - component, - new FakeIndexingEventSourceFactory(eventSource), - IndexingSubscriptionOptions.Default); - - await Assert.ThrowsAsync(() => subscription.Start(CancellationToken.None).AsTask()); - - await subscription.Start(CancellationToken.None); - } - - [Fact] - public async Task start_honors_cancelled_token_before_activation() - { - var checkpoint = new IndexCheckpoint(10, 5); - var component = new FakeIndexingComponent(checkpoint); - var eventSources = new FakeIndexingEventSourceFactory(new FakeIndexingEventSource()); - using var cancellation = new CancellationTokenSource(); - await cancellation.CancelAsync(); - await using var subscription = new IndexingSubscription( - component, - eventSources, - IndexingSubscriptionOptions.Default); - - await Assert.ThrowsAsync(() => - subscription.Start(cancellation.Token).AsTask()); - - Assert.Null(eventSources.Checkpoint); - } - - [Fact] - public async Task start_disposes_event_source_when_cancelled_during_activation() - { - var eventSource = new FakeIndexingEventSource(); - using var cancellation = new CancellationTokenSource(); - await using var subscription = new IndexingSubscription( - new FakeIndexingComponent(), - new CancellingIndexingEventSourceFactory(eventSource, cancellation), - IndexingSubscriptionOptions.Default); - - await Assert.ThrowsAsync(() => - subscription.Start(cancellation.Token).AsTask()); - - Assert.True(eventSource.Disposed); - } - - [Fact] - public async Task start_rejects_missing_event_source() - { - await using var subscription = new IndexingSubscription( - new FakeIndexingComponent(), - new NullIndexingEventSourceFactory(), - IndexingSubscriptionOptions.Default); - - var exception = await Assert.ThrowsAsync(() => - subscription.Start(CancellationToken.None).AsTask()); - - Assert.Equal("Indexing event source factory returned null.", exception.Message); - } - - [Fact] - public async Task start_rejects_missing_processor() - { - await using var subscription = new IndexingSubscription( - new MissingProcessorComponent(), - new FakeIndexingEventSourceFactory(new FakeIndexingEventSource()), - IndexingSubscriptionOptions.Default); - - var exception = await Assert.ThrowsAsync(() => - subscription.Start(CancellationToken.None).AsTask()); - - Assert.Equal("Indexing component returned null processor.", exception.Message); - } - - [Fact] - public async Task waits_for_in_flight_start_before_disposing_component() - { - var component = new FakeIndexingComponent(pauseInitializeCompletion: true); - var eventSource = new FakeIndexingEventSource(); - var subscription = new IndexingSubscription( - component, - new FakeIndexingEventSourceFactory(eventSource), - IndexingSubscriptionOptions.Default); - - var startup = subscription.Start(CancellationToken.None).AsTask(); - await component.WaitForInitializeEntered(); - var disposal = subscription.DisposeAsync().AsTask(); - - component.ReleaseInitialize(); - await Assert.ThrowsAsync(() => startup.WaitAsync(Timeout)); - await disposal.WaitAsync(Timeout); - - Assert.False(component.DisposedBeforeInitializeCompleted); - Assert.True(component.Disposed); - } - - [Fact] - public async Task dispose_does_not_surface_failed_startup() - { - var component = new FakeIndexingComponent(initializeFailures: 1); - var eventSource = new FakeIndexingEventSource(); - var subscription = new IndexingSubscription( - component, - new FakeIndexingEventSourceFactory(eventSource), - IndexingSubscriptionOptions.Default); - - await Assert.ThrowsAsync(() => subscription.Start(CancellationToken.None).AsTask()); - await subscription.DisposeAsync(); - - Assert.True(component.Disposed); - } - - [Fact] - public async Task indexes_events_from_source() - { - var first = CreateResolvedEvent(1); - var second = CreateResolvedEvent(2); - var component = new FakeIndexingComponent(); - var eventSource = new FakeIndexingEventSource( - new ReadResponse.EventReceived(first), - new ReadResponse.SubscriptionCaughtUp(new TFPos(1, 1)), - new ReadResponse.EventReceived(second)); - await using var subscription = new IndexingSubscription( - component, - new FakeIndexingEventSourceFactory(eventSource), - new IndexingSubscriptionOptions(2, TimeSpan.FromSeconds(30))); - - await subscription.Start(CancellationToken.None); - await component.Processor.WaitForIndexed(2); - - Assert.Equal(new[] { first, second }, component.Processor.Indexed); - } - - [Fact] - public async Task commits_when_batch_size_is_reached() - { - var component = new FakeIndexingComponent(); - var eventSource = new FakeIndexingEventSource( - new ReadResponse.EventReceived(CreateResolvedEvent(1)), - new ReadResponse.EventReceived(CreateResolvedEvent(2))); - await using var subscription = new IndexingSubscription( - component, - new FakeIndexingEventSourceFactory(eventSource), - new IndexingSubscriptionOptions(2, TimeSpan.FromSeconds(30))); - - await subscription.Start(CancellationToken.None); - await component.Processor.WaitForCommits(1); - - Assert.Equal(1, component.Processor.CommitCount); - } - - [Fact] - public async Task commits_when_delay_elapses() - { - var component = new FakeIndexingComponent(); - var eventSource = new FakeIndexingEventSource(new ReadResponse.EventReceived(CreateResolvedEvent(1))); - await using var subscription = new IndexingSubscription( - component, - new FakeIndexingEventSourceFactory(eventSource), - new IndexingSubscriptionOptions(100, TimeSpan.FromMilliseconds(25))); - - await subscription.Start(CancellationToken.None); - await component.Processor.WaitForCommits(1); - - Assert.Equal(1, component.Processor.CommitCount); - } - - [Fact] - public async Task commits_pending_events_when_disposed() - { - var component = new FakeIndexingComponent(); - var eventSource = new FakeIndexingEventSource(new ReadResponse.EventReceived(CreateResolvedEvent(1))); - var subscription = new IndexingSubscription( - component, - new FakeIndexingEventSourceFactory(eventSource), - new IndexingSubscriptionOptions(100, TimeSpan.FromSeconds(30))); - - await subscription.Start(CancellationToken.None); - await component.Processor.WaitForIndexed(1); - await subscription.DisposeAsync(); - - Assert.Equal(1, component.Processor.CommitCount); - Assert.True(component.Disposed); - Assert.True(eventSource.Disposed); - } - - [Fact] - public async Task commits_in_flight_event_when_disposed() - { - var component = new FakeIndexingComponent(pauseIndexCompletion: true); - var eventSource = new FakeIndexingEventSource(new ReadResponse.EventReceived(CreateResolvedEvent(1))); - var subscription = new IndexingSubscription( - component, - new FakeIndexingEventSourceFactory(eventSource), - new IndexingSubscriptionOptions(100, TimeSpan.FromSeconds(30))); - - await subscription.Start(CancellationToken.None); - await component.Processor.WaitForIndexEntered(); - var disposal = subscription.DisposeAsync().AsTask(); - - component.Processor.ReleaseIndex(); - await disposal.WaitAsync(Timeout); - - Assert.Equal(1, component.Processor.CommitCount); - Assert.True(component.Disposed); - Assert.True(eventSource.Disposed); - } - - [Fact] - public async Task cleans_up_after_event_source_completion_faults_worker() - { - var component = new FakeIndexingComponent(); - var eventSource = new FakeIndexingEventSource(completeWhenDrained: true); - var subscription = new IndexingSubscription( - component, - new FakeIndexingEventSourceFactory(eventSource), - IndexingSubscriptionOptions.Default); - - await subscription.Start(CancellationToken.None); - await eventSource.WaitForDrained(); - await eventSource.WaitForDisposed(); - await component.WaitForDisposed(); - await subscription.DisposeAsync(); - - Assert.True(component.Disposed); - Assert.True(eventSource.Disposed); - } - - private static ResolvedEvent CreateResolvedEvent(long number) - { - var record = new EventRecord( - number, - number, - Guid.NewGuid(), - Guid.NewGuid(), - number, - 0, - $"stream-{number}", - number - 1, - DateTime.UtcNow, - PrepareFlags.SingleWrite | PrepareFlags.IsCommitted | PrepareFlags.Data, - $"event-{number}", - Array.Empty(), - Array.Empty()); - - return ResolvedEvent.ForUnresolvedEvent(record, number); - } - - private sealed class FakeIndexingComponent( - IndexCheckpoint? checkpoint = null, - int initializeFailures = 0, - bool pauseInitializeCompletion = false, - bool pauseIndexCompletion = false) : IIndexingComponent - { - private readonly TaskCompletionSource _initializeEntered = new(TaskCreationOptions.RunContinuationsAsynchronously); - private readonly TaskCompletionSource _releaseInitialize = new(TaskCreationOptions.RunContinuationsAsynchronously); - private readonly TaskCompletionSource _disposed = new(TaskCreationOptions.RunContinuationsAsynchronously); - private int _initializeFailures = initializeFailures; - - public FakeIndexingProcessor Processor { get; } = new(pauseIndexCompletion); - - IIndexingProcessor IIndexingComponent.Processor => Processor; - - public IReadOnlyList VirtualStreamReaders { get; } = []; - - public bool Disposed { get; private set; } - public bool DisposedBeforeInitializeCompleted { get; private set; } - - public async ValueTask Initialize(CancellationToken token) - { - if (Interlocked.Decrement(ref _initializeFailures) >= 0) - { - throw new InvalidOperationException("initialize failed"); - } - - if (pauseInitializeCompletion) - { - _initializeEntered.TrySetResult(); - await _releaseInitialize.Task; - } - } - - public ValueTask ReadCheckpoint(CancellationToken token) => ValueTask.FromResult(checkpoint); - - public ValueTask DisposeAsync() - { - DisposedBeforeInitializeCompleted = pauseInitializeCompletion && !_releaseInitialize.Task.IsCompleted; - Disposed = true; - _disposed.TrySetResult(); - return ValueTask.CompletedTask; - } - - public Task WaitForInitializeEntered() => _initializeEntered.Task.WaitAsync(Timeout); - - public Task WaitForDisposed() => _disposed.Task.WaitAsync(Timeout); - - public void ReleaseInitialize() => _releaseInitialize.TrySetResult(); - } - - private sealed class MissingProcessorComponent : IIndexingComponent - { - public IIndexingProcessor Processor => null!; - - public IReadOnlyList VirtualStreamReaders { get; } = []; - - public ValueTask Initialize(CancellationToken token) => ValueTask.CompletedTask; - - public ValueTask ReadCheckpoint(CancellationToken token) => ValueTask.FromResult(null); - - public ValueTask DisposeAsync() => ValueTask.CompletedTask; - } - - private sealed class FakeIndexingProcessor(bool pauseIndexCompletion = false) : IIndexingProcessor - { - private readonly List _indexed = []; - private readonly TaskCompletionSource _indexedEvents = new(TaskCreationOptions.RunContinuationsAsynchronously); - private readonly TaskCompletionSource _committed = new(TaskCreationOptions.RunContinuationsAsynchronously); - private readonly TaskCompletionSource _indexEntered = new(TaskCreationOptions.RunContinuationsAsynchronously); - private readonly TaskCompletionSource _releaseIndex = new(TaskCreationOptions.RunContinuationsAsynchronously); - private int _commitCount; - private int _waitForIndexed = int.MaxValue; - private int _waitForCommits = int.MaxValue; - - public IReadOnlyList Indexed => _indexed; - - public int CommitCount => Volatile.Read(ref _commitCount); - - public async ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken token) - { - lock (_indexed) - { - _indexed.Add(resolvedEvent); - if (_indexed.Count >= Volatile.Read(ref _waitForIndexed)) - { - _indexedEvents.TrySetResult(); - } - } - - if (pauseIndexCompletion) - { - _indexEntered.TrySetResult(); - await _releaseIndex.Task.WaitAsync(token); - } - } - - public ValueTask Commit(CancellationToken token) - { - if (Interlocked.Increment(ref _commitCount) >= Volatile.Read(ref _waitForCommits)) - { - _committed.TrySetResult(); - } - - return ValueTask.CompletedTask; - } - - public Task WaitForIndexed(int count) - { - lock (_indexed) - { - _waitForIndexed = count; - if (_indexed.Count >= count) - { - return Task.CompletedTask; - } - } - - return _indexedEvents.Task.WaitAsync(Timeout); - } - - public Task WaitForCommits(int count) - { - _waitForCommits = count; - return CommitCount >= count - ? Task.CompletedTask - : _committed.Task.WaitAsync(Timeout); - } - - public Task WaitForIndexEntered() => _indexEntered.Task.WaitAsync(Timeout); - - public void ReleaseIndex() => _releaseIndex.TrySetResult(); - } - - private sealed class NullIndexingEventSourceFactory : IIndexingEventSourceFactory - { - public IIndexingEventSource Create(IndexCheckpoint? checkpoint, CancellationToken token) => null!; - } - - private sealed class CancellingIndexingEventSourceFactory( - FakeIndexingEventSource source, - CancellationTokenSource startupCancellation) : IIndexingEventSourceFactory - { - public IIndexingEventSource Create(IndexCheckpoint? checkpoint, CancellationToken token) - { - source.Bind(token); - startupCancellation.Cancel(); - return source; - } - } - - private sealed class FakeIndexingEventSourceFactory(FakeIndexingEventSource source) : IIndexingEventSourceFactory - { - public IndexCheckpoint? Checkpoint { get; private set; } - - public IIndexingEventSource Create(IndexCheckpoint? checkpoint, CancellationToken token) - { - Checkpoint = checkpoint; - source.Bind(token); - return source; - } - } - - private sealed class FakeIndexingEventSource(params ReadResponse[] responses) : IIndexingEventSource - { - private readonly Queue _responses = new(responses); - private readonly TaskCompletionSource _drained = new(TaskCreationOptions.RunContinuationsAsynchronously); - private readonly TaskCompletionSource _disposed = new(TaskCreationOptions.RunContinuationsAsynchronously); - private readonly bool _completeWhenDrained; - private CancellationToken _token; - - public FakeIndexingEventSource(bool completeWhenDrained, params ReadResponse[] responses) : this(responses) - { - _completeWhenDrained = completeWhenDrained; - } - - public ReadResponse Current { get; private set; } - - public bool Disposed { get; private set; } - - public void Bind(CancellationToken token) => _token = token; - - public Task WaitForDrained() => _drained.Task.WaitAsync(Timeout); - - public Task WaitForDisposed() => _disposed.Task.WaitAsync(Timeout); - - public async ValueTask MoveNextAsync() - { - if (!_responses.TryDequeue(out var response)) - { - if (_completeWhenDrained) - { - _drained.TrySetResult(); - return false; - } - - await Task.Delay(global::System.Threading.Timeout.InfiniteTimeSpan, _token); - return false; - } - - Current = response; - return true; - } - - public ValueTask DisposeAsync() - { - Disposed = true; - _disposed.TrySetResult(); - return ValueTask.CompletedTask; - } - } - - private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(5); -} diff --git a/src/EventStore.Core/ClusterVNode.cs b/src/EventStore.Core/ClusterVNode.cs index eff049e40..625701159 100644 --- a/src/EventStore.Core/ClusterVNode.cs +++ b/src/EventStore.Core/ClusterVNode.cs @@ -97,7 +97,6 @@ public static ClusterVNode Create( IReadOnlyList factories = null, CertificateProvider certificateProvider = null, IConfiguration configuration = null, - IReadOnlyList additionalVirtualStreamReaders = null, Guid? instanceId = null, int debugIndex = 0) { @@ -110,7 +109,6 @@ public static ClusterVNode Create( factories, certificateProvider, configuration, - additionalVirtualStreamReaders: additionalVirtualStreamReaders, instanceId: instanceId, debugIndex: debugIndex); } @@ -247,8 +245,7 @@ public ClusterVNode(ClusterVNodeOptions options, IConfiguration configuration = null, IExpiryStrategy expiryStrategy = null, Guid? instanceId = null, int debugIndex = 0, - Action configureAdditionalNodeServices = null, - IReadOnlyList additionalVirtualStreamReaders = null) + Action configureAdditionalNodeServices = null) { configuration ??= new ConfigurationBuilder().Build(); @@ -814,11 +811,6 @@ void StartSubsystems() nodeStatusListener.Stream, }; - if (additionalVirtualStreamReaders is not null) - { - virtualStreamReaders.AddRange(additionalVirtualStreamReaders); - } - var virtualStreamReader = new VirtualStreamReader(virtualStreamReaders.ToArray()); // Storage Reader diff --git a/src/EventStore.Core/Services/Storage/InMemory/IVirtualStreamReader.cs b/src/EventStore.Core/Services/Storage/InMemory/IVirtualStreamReader.cs index 0b745b440..d9a62c28e 100644 --- a/src/EventStore.Core/Services/Storage/InMemory/IVirtualStreamReader.cs +++ b/src/EventStore.Core/Services/Storage/InMemory/IVirtualStreamReader.cs @@ -16,7 +16,5 @@ public interface IVirtualStreamReader long GetLastEventNumber(string streamId); - long GetLastIndexedPosition(string streamId); - bool CanReadStream(string streamId); } diff --git a/src/EventStore.Core/Services/Storage/InMemory/IVirtualStreamReaderProvider.cs b/src/EventStore.Core/Services/Storage/InMemory/IVirtualStreamReaderProvider.cs deleted file mode 100644 index 8f3f4c8fc..000000000 --- a/src/EventStore.Core/Services/Storage/InMemory/IVirtualStreamReaderProvider.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System.Collections.Generic; -using System.Linq; - -namespace EventStore.Core.Services.Storage.InMemory; - -public interface IVirtualStreamReaderProvider -{ - IReadOnlyList VirtualStreamReaders { get; } -} - -public static class VirtualStreamReaderProviderExtensions -{ - public static IReadOnlyList GetVirtualStreamReaders( - this IEnumerable providers) => - providers.SelectMany(provider => provider.VirtualStreamReaders).ToArray(); -} diff --git a/src/EventStore.Core/Services/Storage/InMemory/SingleEventInMemoryStream.cs b/src/EventStore.Core/Services/Storage/InMemory/SingleEventInMemoryStream.cs index 417164c77..cff43e2bf 100644 --- a/src/EventStore.Core/Services/Storage/InMemory/SingleEventInMemoryStream.cs +++ b/src/EventStore.Core/Services/Storage/InMemory/SingleEventInMemoryStream.cs @@ -135,8 +135,6 @@ public SingleEventInMemoryStream(IPublisher publisher, InMemoryLog memLog, strin public long GetLastEventNumber(string streamId) => _lastEvent?.EventNumber ?? ExpectedVersion.NoStream; - public long GetLastIndexedPosition(string streamId) => -1; - public bool CanReadStream(string streamId) => streamId == _streamName; public void Write(string eventType, ReadOnlyMemory data) diff --git a/src/EventStore.Core/Services/Storage/InMemory/VirtualStreamReader.cs b/src/EventStore.Core/Services/Storage/InMemory/VirtualStreamReader.cs index d8352bb9a..71cab8cec 100644 --- a/src/EventStore.Core/Services/Storage/InMemory/VirtualStreamReader.cs +++ b/src/EventStore.Core/Services/Storage/InMemory/VirtualStreamReader.cs @@ -74,9 +74,6 @@ public VirtualStreamReader(IVirtualStreamReader[] readers) public long GetLastEventNumber(string streamId) => TryGetReader(streamId, out var reader) ? reader.GetLastEventNumber(streamId) : ExpectedVersion.NoStream; - public long GetLastIndexedPosition(string streamId) => - TryGetReader(streamId, out var reader) ? reader.GetLastIndexedPosition(streamId) : -1; - public bool CanReadStream(string streamId) => TryGetReader(streamId, out _); private bool TryGetReader(string streamId, out IVirtualStreamReader reader) diff --git a/src/EventStore.Core/Services/Storage/Indexing/IIndexCheckpointStore.cs b/src/EventStore.Core/Services/Storage/Indexing/IIndexCheckpointStore.cs deleted file mode 100644 index fc21fc505..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/IIndexCheckpointStore.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System.Threading; -using System.Threading.Tasks; - -namespace EventStore.Core.Services.Storage.Indexing; - -public interface IIndexCheckpointStore -{ - ValueTask Read(CancellationToken token); - - ValueTask Write(IndexCheckpoint checkpoint, CancellationToken token); -} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IIndexDefinitionStore.cs b/src/EventStore.Core/Services/Storage/Indexing/IIndexDefinitionStore.cs deleted file mode 100644 index 9fd34cdfb..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/IIndexDefinitionStore.cs +++ /dev/null @@ -1,39 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -#nullable enable - -namespace EventStore.Core.Services.Storage.Indexing; - -public interface IIndexDefinitionStore -{ - ValueTask Create(IndexName name, IndexDefinition definition, CancellationToken token); - - ValueTask Read(IndexName name, CancellationToken token); - - ValueTask> List(CancellationToken token); - - ValueTask Delete(IndexName name, CancellationToken token); -} - -public enum IndexDefinitionCreateResult -{ - Created, - AlreadyExists, - Conflicts -} - -public sealed record StoredIndexDefinition -{ - public IndexName Name { get; } - - public IndexDefinition Definition { get; } - - public StoredIndexDefinition(IndexName name, IndexDefinition definition) - { - Name = name ?? throw new ArgumentNullException(nameof(name)); - Definition = definition ?? throw new ArgumentNullException(nameof(definition)); - } -} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IIndexingComponent.cs b/src/EventStore.Core/Services/Storage/Indexing/IIndexingComponent.cs deleted file mode 100644 index f1d62a06f..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/IIndexingComponent.cs +++ /dev/null @@ -1,23 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using EventStore.Core.Data; -using EventStore.Core.Services.Storage.InMemory; - -namespace EventStore.Core.Services.Storage.Indexing; - -public interface IIndexingComponent : IAsyncDisposable, IVirtualStreamReaderProvider -{ - ValueTask Initialize(CancellationToken token); - - ValueTask ReadCheckpoint(CancellationToken token); - - IIndexingProcessor Processor { get; } -} - -public interface IIndexingProcessor -{ - ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken token); - - ValueTask Commit(CancellationToken token); -} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IIndexingEventSource.cs b/src/EventStore.Core/Services/Storage/Indexing/IIndexingEventSource.cs deleted file mode 100644 index e4155ba94..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/IIndexingEventSource.cs +++ /dev/null @@ -1,53 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using EventStore.Core.Bus; -using EventStore.Core.Services.Storage.ReaderIndex; -using EventStore.Core.Services.Transport.Common; -using EventStore.Core.Services.Transport.Enumerators; -using EventStore.Core.Services.UserManagement; - -namespace EventStore.Core.Services.Storage.Indexing; - -public interface IIndexingEventSource : IAsyncDisposable -{ - ReadResponse Current { get; } - - ValueTask MoveNextAsync(); -} - -public interface IIndexingEventSourceFactory -{ - IIndexingEventSource Create(IndexCheckpoint? checkpoint, CancellationToken token); -} - -public sealed class AllStreamsIndexingEventSourceFactory : IIndexingEventSourceFactory -{ - private readonly IPublisher _publisher; - - public AllStreamsIndexingEventSourceFactory(IPublisher publisher) => - _publisher = publisher ?? throw new ArgumentNullException(nameof(publisher)); - - public IIndexingEventSource Create(IndexCheckpoint? checkpoint, CancellationToken token) - { - var subscription = new Enumerator.AllSubscription( - _publisher, - new DefaultExpiryStrategy(), - checkpoint?.ToPosition(), - resolveLinks: false, - SystemAccounts.System, - requiresLeader: false, - token); - - return new AllStreamsIndexingEventSource(subscription); - } - - private sealed class AllStreamsIndexingEventSource(Enumerator.AllSubscription subscription) : IIndexingEventSource - { - public ReadResponse Current => subscription.Current; - - public ValueTask MoveNextAsync() => subscription.MoveNextAsync(); - - public ValueTask DisposeAsync() => subscription.DisposeAsync(); - } -} diff --git a/src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexCheckpointStore.cs b/src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexCheckpointStore.cs deleted file mode 100644 index 2867f0d55..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexCheckpointStore.cs +++ /dev/null @@ -1,32 +0,0 @@ -using System.Threading; -using System.Threading.Tasks; - -namespace EventStore.Core.Services.Storage.Indexing; - -public sealed class InMemoryIndexCheckpointStore : IIndexCheckpointStore -{ - private readonly object _lock = new(); - private IndexCheckpoint? _checkpoint; - - public ValueTask Read(CancellationToken token) - { - token.ThrowIfCancellationRequested(); - - lock (_lock) - { - return ValueTask.FromResult(_checkpoint); - } - } - - public ValueTask Write(IndexCheckpoint checkpoint, CancellationToken token) - { - token.ThrowIfCancellationRequested(); - - lock (_lock) - { - _checkpoint = checkpoint; - } - - return ValueTask.CompletedTask; - } -} diff --git a/src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexDefinitionStore.cs b/src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexDefinitionStore.cs deleted file mode 100644 index 3a4053e51..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexDefinitionStore.cs +++ /dev/null @@ -1,69 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; - -#nullable enable - -namespace EventStore.Core.Services.Storage.Indexing; - -public sealed class InMemoryIndexDefinitionStore : IIndexDefinitionStore -{ - private readonly object _lock = new(); - private readonly Dictionary _definitions = new(StringComparer.Ordinal); - - public ValueTask Create(IndexName name, IndexDefinition definition, CancellationToken token) - { - ArgumentNullException.ThrowIfNull(name); - ArgumentNullException.ThrowIfNull(definition); - token.ThrowIfCancellationRequested(); - - lock (_lock) - { - if (!_definitions.TryGetValue(name.Value, out var existing)) - { - _definitions.Add(name.Value, new StoredIndexDefinition(name, definition)); - return ValueTask.FromResult(IndexDefinitionCreateResult.Created); - } - - return ValueTask.FromResult(existing.Definition == definition - ? IndexDefinitionCreateResult.AlreadyExists - : IndexDefinitionCreateResult.Conflicts); - } - } - - public ValueTask Read(IndexName name, CancellationToken token) - { - ArgumentNullException.ThrowIfNull(name); - token.ThrowIfCancellationRequested(); - - lock (_lock) - { - _definitions.TryGetValue(name.Value, out var definition); - return ValueTask.FromResult(definition); - } - } - - public ValueTask> List(CancellationToken token) - { - token.ThrowIfCancellationRequested(); - - lock (_lock) - { - return ValueTask.FromResult>( - _definitions.Values.OrderBy(static definition => definition.Name.Value, StringComparer.Ordinal).ToArray()); - } - } - - public ValueTask Delete(IndexName name, CancellationToken token) - { - ArgumentNullException.ThrowIfNull(name); - token.ThrowIfCancellationRequested(); - - lock (_lock) - { - return ValueTask.FromResult(_definitions.Remove(name.Value)); - } - } -} diff --git a/src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexEventBuffer.cs b/src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexEventBuffer.cs deleted file mode 100644 index f05347da3..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexEventBuffer.cs +++ /dev/null @@ -1,89 +0,0 @@ -using System; -using System.Collections.Generic; -using EventStore.Core.Data; - -namespace EventStore.Core.Services.Storage.Indexing; - -public readonly record struct InMemoryIndexEventBufferSnapshot( - ResolvedEvent[] Events, - long LastEventNumber, - long LastIndexedPosition); - -public sealed class InMemoryIndexEventBuffer -{ - private readonly object _lock = new(); - private readonly List _events = []; - private long _lastIndexedPosition = -1; - - public InMemoryIndexEventBuffer(IndexStreamId streamId) - { - ArgumentNullException.ThrowIfNull(streamId); - StreamId = streamId; - } - - public IndexStreamId StreamId { get; } - - public long LastEventNumber => - ReadLastEventNumber(); - - public long LastIndexedPosition - { - get - { - lock (_lock) - { - return _lastIndexedPosition; - } - } - } - - public void Append(EventRecord record, long? commitPosition) - { - ArgumentNullException.ThrowIfNull(record); - - if (record.EventStreamId != StreamId.Value) - { - throw new ArgumentException("Event stream id must match the index stream id.", nameof(record)); - } - - lock (_lock) - { - var expectedEventNumber = _events.Count; - if (record.EventNumber != expectedEventNumber) - { - throw new ArgumentException( - $"Event number must be appended sequentially starting at 0. Expected {expectedEventNumber}, got {record.EventNumber}.", - nameof(record)); - } - - _events.Add(ResolvedEvent.ForUnresolvedEvent(record, commitPosition)); - - if (commitPosition.HasValue) - { - _lastIndexedPosition = commitPosition.Value; - } - } - } - - public InMemoryIndexEventBufferSnapshot CreateSnapshot() - { - lock (_lock) - { - return new InMemoryIndexEventBufferSnapshot( - _events.ToArray(), - ReadLastEventNumberCore(), - _lastIndexedPosition); - } - } - - private long ReadLastEventNumber() - { - lock (_lock) - { - return ReadLastEventNumberCore(); - } - } - - private long ReadLastEventNumberCore() => - _events.Count == 0 ? ExpectedVersion.NoStream : _events[^1].Event.EventNumber; -} diff --git a/src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexVirtualStreamReader.cs b/src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexVirtualStreamReader.cs deleted file mode 100644 index 7d5dab1c9..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexVirtualStreamReader.cs +++ /dev/null @@ -1,166 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using EventStore.Core.Data; -using EventStore.Core.Messages; - -namespace EventStore.Core.Services.Storage.Indexing; - -public sealed class InMemoryIndexVirtualStreamReader : IndexVirtualStreamReader -{ - private readonly InMemoryIndexEventBuffer _buffer; - - public InMemoryIndexVirtualStreamReader(InMemoryIndexEventBuffer buffer) - : base(GetStreamId(buffer)) - { - _buffer = buffer; - } - - public override ValueTask ReadForwards( - ClientMessage.ReadStreamEventsForward msg, - CancellationToken token) - { - var snapshot = _buffer.CreateSnapshot(); - var lastEventNumber = snapshot.LastEventNumber; - var tfLastCommitPosition = snapshot.LastIndexedPosition; - - if (lastEventNumber == ExpectedVersion.NoStream) - { - return ValueTask.FromResult(new ClientMessage.ReadStreamEventsForwardCompleted( - msg.CorrelationId, - msg.EventStreamId, - msg.FromEventNumber, - msg.MaxCount, - ReadStreamResult.NoStream, - Array.Empty(), - StreamMetadata.Empty, - isCachePublic: false, - error: string.Empty, - nextEventNumber: -1, - lastEventNumber: ExpectedVersion.NoStream, - isEndOfStream: true, - tfLastCommitPosition: tfLastCommitPosition)); - } - - var fromEventNumber = msg.FromEventNumber < 0 ? 0 : msg.FromEventNumber; - var maxCount = Math.Max(0, msg.MaxCount); - - if (fromEventNumber > lastEventNumber || maxCount == 0) - { - var emptyNextEventNumber = fromEventNumber > lastEventNumber ? lastEventNumber + 1 : fromEventNumber; - var emptyIsEndOfStream = fromEventNumber > lastEventNumber; - - return ValueTask.FromResult(new ClientMessage.ReadStreamEventsForwardCompleted( - msg.CorrelationId, - msg.EventStreamId, - msg.FromEventNumber, - msg.MaxCount, - ReadStreamResult.Success, - Array.Empty(), - StreamMetadata.Empty, - isCachePublic: false, - error: string.Empty, - nextEventNumber: emptyNextEventNumber, - lastEventNumber: lastEventNumber, - isEndOfStream: emptyIsEndOfStream, - tfLastCommitPosition: tfLastCommitPosition)); - } - - var eventCount = (int)Math.Min(maxCount, lastEventNumber - fromEventNumber + 1); - var events = new ResolvedEvent[eventCount]; - for (var index = 0; index < events.Length; index++) - { - events[index] = snapshot.Events[(int)(fromEventNumber + index)]; - } - - var nextEventNumber = fromEventNumber + events.Length; - var isEndOfStream = nextEventNumber > lastEventNumber; - - return ValueTask.FromResult(new ClientMessage.ReadStreamEventsForwardCompleted( - msg.CorrelationId, - msg.EventStreamId, - msg.FromEventNumber, - msg.MaxCount, - ReadStreamResult.Success, - events, - StreamMetadata.Empty, - isCachePublic: false, - error: string.Empty, - nextEventNumber: nextEventNumber, - lastEventNumber: lastEventNumber, - isEndOfStream: isEndOfStream, - tfLastCommitPosition: tfLastCommitPosition)); - } - - public override ValueTask ReadBackwards( - ClientMessage.ReadStreamEventsBackward msg, - CancellationToken token) - { - var snapshot = _buffer.CreateSnapshot(); - var lastEventNumber = snapshot.LastEventNumber; - var tfLastCommitPosition = snapshot.LastIndexedPosition; - - if (lastEventNumber == ExpectedVersion.NoStream) - { - return ValueTask.FromResult(new ClientMessage.ReadStreamEventsBackwardCompleted( - msg.CorrelationId, - msg.EventStreamId, - msg.FromEventNumber, - msg.MaxCount, - ReadStreamResult.NoStream, - Array.Empty(), - StreamMetadata.Empty, - isCachePublic: false, - error: string.Empty, - nextEventNumber: -1, - lastEventNumber: ExpectedVersion.NoStream, - isEndOfStream: true, - tfLastCommitPosition: tfLastCommitPosition)); - } - - var requestedEventNumber = msg.FromEventNumber < 0 ? lastEventNumber : msg.FromEventNumber; - var readFromEventNumber = Math.Min(requestedEventNumber, lastEventNumber); - var maxCount = Math.Max(0, msg.MaxCount); - - var eventCount = (int)Math.Min(maxCount, readFromEventNumber + 1); - var events = new ResolvedEvent[eventCount]; - for (var index = 0; index < events.Length; index++) - { - events[index] = snapshot.Events[(int)(readFromEventNumber - index)]; - } - - var nextEventNumber = readFromEventNumber - events.Length; - var isEndOfStream = nextEventNumber < 0; - if (isEndOfStream) - { - nextEventNumber = -1; - } - - return ValueTask.FromResult(new ClientMessage.ReadStreamEventsBackwardCompleted( - msg.CorrelationId, - msg.EventStreamId, - requestedEventNumber, - msg.MaxCount, - ReadStreamResult.Success, - events, - StreamMetadata.Empty, - isCachePublic: false, - error: string.Empty, - nextEventNumber: nextEventNumber, - lastEventNumber: lastEventNumber, - isEndOfStream: isEndOfStream, - tfLastCommitPosition: tfLastCommitPosition)); - } - - public override long GetLastEventNumber(string streamId) => - CanReadStream(streamId) ? _buffer.LastEventNumber : ExpectedVersion.NoStream; - - public override long GetLastIndexedPosition(string streamId) => - CanReadStream(streamId) ? _buffer.LastIndexedPosition : -1; - - private static IndexStreamId GetStreamId(InMemoryIndexEventBuffer buffer) - { - ArgumentNullException.ThrowIfNull(buffer); - return buffer.StreamId; - } -} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpoint.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpoint.cs deleted file mode 100644 index be3dd478e..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpoint.cs +++ /dev/null @@ -1,27 +0,0 @@ -using System; -using EventStore.Core.Services.Transport.Common; - -namespace EventStore.Core.Services.Storage.Indexing; - -public readonly record struct IndexCheckpoint -{ - public long CommitPosition { get; } - public long PreparePosition { get; } - - public IndexCheckpoint(long commitPosition, long preparePosition) - { - ArgumentOutOfRangeException.ThrowIfNegative(commitPosition); - ArgumentOutOfRangeException.ThrowIfNegative(preparePosition); - - if (commitPosition < preparePosition) - { - throw new ArgumentOutOfRangeException(nameof(CommitPosition), - "The commit position cannot be less than the prepare position."); - } - - CommitPosition = commitPosition; - PreparePosition = preparePosition; - } - - public Position ToPosition() => Position.FromInt64(CommitPosition, PreparePosition); -} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpointCommitTracker.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpointCommitTracker.cs deleted file mode 100644 index de87f2a24..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpointCommitTracker.cs +++ /dev/null @@ -1,195 +0,0 @@ -using System; -using System.Runtime.ExceptionServices; -using System.Threading; -using System.Threading.Tasks; -using Serilog; - -namespace EventStore.Core.Services.Storage.Indexing; - -public sealed class IndexCheckpointCommitTracker : IAsyncDisposable -{ - private readonly int _batchSize; - private readonly TimeSpan _maxCommitDelay; - private readonly Func _commit; - private readonly object _stateLock = new(); - private readonly CancellationTokenSource _lifetime; - private readonly CancellationTokenRegistration _stopTracking; - private readonly SemaphoreSlim _commitRequested = new(0, 1); - private readonly TaskCompletionSource _disposeCompleted = new(TaskCreationOptions.RunContinuationsAsynchronously); - private readonly Task _runTask; - - private int _disposed; - private int _pending; - private bool _stopped; - - public IndexCheckpointCommitTracker( - int batchSize, - TimeSpan maxCommitDelay, - Func commit, - CancellationToken cancellationToken) - { - ArgumentOutOfRangeException.ThrowIfNegativeOrZero(batchSize); - ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(maxCommitDelay, TimeSpan.Zero); - - _batchSize = batchSize; - _maxCommitDelay = maxCommitDelay; - _commit = commit ?? throw new ArgumentNullException(nameof(commit)); - _lifetime = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - _stopTracking = _lifetime.Token.Register(static state => - ((IndexCheckpointCommitTracker)state!).StopTracking(), this); - _runTask = Task.Run(RunAsync); - } - - public void Track() - { - lock (_stateLock) - { - ObjectDisposedException.ThrowIf(_stopped, this); - - if (Interlocked.Increment(ref _pending) >= _batchSize) - RequestCommit(); - } - } - - public async ValueTask DisposeAsync() - { - var dispose = false; - lock (_stateLock) - { - if (_disposed is 0) - { - _disposed = 1; - _stopped = true; - dispose = true; - } - } - - if (!dispose) - { - await _disposeCompleted.Task; - return; - } - - Exception failure = null; - - try - { - StopTracking(); - await _lifetime.CancelAsync(); - - try - { - await _runTask; - } - catch (OperationCanceledException) - { - } - } - catch (Exception ex) - { - failure = ex; - } - - try - { - _stopTracking.Dispose(); - _commitRequested.Dispose(); - _lifetime.Dispose(); - } - catch (Exception ex) when (failure is null) - { - failure = ex; - } - - if (failure is null) - { - _disposeCompleted.SetResult(); - return; - } - - _disposeCompleted.SetException(failure); - ExceptionDispatchInfo.Capture(failure).Throw(); - } - - private async Task RunAsync() - { - var token = _lifetime.Token; - - try - { - while (!token.IsCancellationRequested) - { - try - { - await _commitRequested.WaitAsync(_maxCommitDelay, token); - } - catch (OperationCanceledException) when (token.IsCancellationRequested) - { - break; - } - - try - { - await CommitPending(token, requestRetry: true); - } - catch (OperationCanceledException) when (token.IsCancellationRequested) - { - break; - } - } - } - finally - { - StopTracking(); - await CommitPending(CancellationToken.None, requestRetry: false); - } - } - - private async ValueTask CommitPending(CancellationToken token, bool requestRetry) - { - var pending = Interlocked.Exchange(ref _pending, 0); - if (pending is 0) - return; - - try - { - await _commit(token); - } - catch (OperationCanceledException) when (token.IsCancellationRequested) - { - Interlocked.Add(ref _pending, pending); - throw; - } - catch (Exception ex) - { - var restoredPending = Interlocked.Add(ref _pending, pending); - if (!requestRetry) - { - Log.Error(ex, "Error while committing an index checkpoint"); - throw; - } - - if (restoredPending >= _batchSize) - RequestCommit(); - - Log.Error(ex, "Error while committing an index checkpoint"); - } - } - - private void StopTracking() - { - lock (_stateLock) - _stopped = true; - } - - private void RequestCommit() - { - try - { - _commitRequested.Release(); - } - catch (SemaphoreFullException) - { - } - } -} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpointWriter.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpointWriter.cs deleted file mode 100644 index 4652d60f1..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/IndexCheckpointWriter.cs +++ /dev/null @@ -1,131 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using EventStore.Core.Data; - -namespace EventStore.Core.Services.Storage.Indexing; - -public sealed class IndexCheckpointWriter -{ - private readonly IIndexCheckpointStore _store; - private readonly object _lock = new(); - private IndexCheckpoint? _latestCheckpoint; - private IndexCheckpoint? _persistedCheckpoint; - private IndexCheckpoint? _pendingCheckpoint; - - public IndexCheckpointWriter(IIndexCheckpointStore store) - { - _store = store ?? throw new ArgumentNullException(nameof(store)); - } - - public async ValueTask Read(CancellationToken token) - { - var checkpoint = await _store.Read(token); - if (checkpoint is not { } latest) - { - return checkpoint; - } - - lock (_lock) - { - if (IsAheadOfLatest(latest)) - { - _latestCheckpoint = latest; - } - - if (IsAheadOfPersisted(latest)) - { - _persistedCheckpoint = latest; - } - - if (_pendingCheckpoint is { } pending && !IsAheadOfPersisted(pending)) - { - _pendingCheckpoint = null; - } - } - - return checkpoint; - } - - public void Track(ResolvedEvent resolvedEvent) - { - if (!resolvedEvent.OriginalPosition.HasValue) - { - throw new InvalidOperationException( - "Cannot track index checkpoint progress for an event without an original position."); - } - - var position = resolvedEvent.OriginalPosition.Value; - var checkpoint = new IndexCheckpoint(position.CommitPosition, position.PreparePosition); - - lock (_lock) - { - if (_latestCheckpoint is { } latest) - { - var latestPosition = latest.ToPosition(); - var checkpointPosition = checkpoint.ToPosition(); - - if (checkpointPosition < latestPosition) - { - throw new InvalidOperationException( - $"Cannot track index checkpoint progress backwards from {latestPosition} to {checkpointPosition}."); - } - - if (checkpointPosition == latestPosition) - { - return; - } - } - - _latestCheckpoint = checkpoint; - _pendingCheckpoint = checkpoint; - } - } - - public async ValueTask Commit(CancellationToken token) - { - IndexCheckpoint checkpoint; - - lock (_lock) - { - if (_pendingCheckpoint is not { } pending) - { - return; - } - - if (!IsAheadOfPersisted(pending)) - { - _pendingCheckpoint = null; - return; - } - - checkpoint = pending; - } - - await _store.Write(checkpoint, token); - - lock (_lock) - { - if (IsAheadOfPersisted(checkpoint)) - { - _persistedCheckpoint = checkpoint; - } - - if (IsAheadOfLatest(checkpoint)) - { - _latestCheckpoint = checkpoint; - } - - if (_pendingCheckpoint == checkpoint) - { - _pendingCheckpoint = null; - } - } - } - - private bool IsAheadOfLatest(IndexCheckpoint checkpoint) => - _latestCheckpoint is not { } latest || checkpoint.ToPosition() > latest.ToPosition(); - - private bool IsAheadOfPersisted(IndexCheckpoint checkpoint) => - _persistedCheckpoint is not { } persisted || checkpoint.ToPosition() > persisted.ToPosition(); -} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexDefinition.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexDefinition.cs deleted file mode 100644 index 80ed66c45..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/IndexDefinition.cs +++ /dev/null @@ -1,102 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; - -namespace EventStore.Core.Services.Storage.Indexing; - -public sealed record IndexDefinition -{ - public IndexEventFilter Filter { get; } - - public IReadOnlyList Fields { get; } - - public IndexDefinition(IndexEventFilter filter, IReadOnlyList fields) - { - ArgumentNullException.ThrowIfNull(fields); - - if (fields.Any(static field => field is null)) - { - throw new ArgumentException("Index fields cannot contain null.", nameof(fields)); - } - - if (filter is null && fields.Count == 0) - { - throw new ArgumentException("Index definition must specify at least one filter or field.", nameof(fields)); - } - - Filter = filter; - Fields = fields.ToArray(); - } - - public bool Equals(IndexDefinition other) => - other is not null - && Equals(Filter, other.Filter) - && Fields.SequenceEqual(other.Fields); - - public override int GetHashCode() - { - var hash = new HashCode(); - hash.Add(Filter); - - foreach (var field in Fields) - { - hash.Add(field); - } - - return hash.ToHashCode(); - } -} - -public sealed record IndexEventFilter -{ - public string Value { get; } - - public IndexEventFilter(string value) - { - if (string.IsNullOrWhiteSpace(value)) - { - throw new ArgumentException("Index filter cannot be empty.", nameof(value)); - } - - Value = value; - } - - public override string ToString() => Value; -} - -public sealed record IndexFieldDefinition -{ - public string Name { get; } - - public IndexFieldSelector Selector { get; } - - public IndexFieldDefinition(string name, IndexFieldSelector selector) - { - if (string.IsNullOrWhiteSpace(name)) - { - throw new ArgumentException("Index field name cannot be empty.", nameof(name)); - } - - ArgumentNullException.ThrowIfNull(selector); - - Name = name; - Selector = selector; - } -} - -public sealed record IndexFieldSelector -{ - public string Value { get; } - - public IndexFieldSelector(string value) - { - if (string.IsNullOrWhiteSpace(value)) - { - throw new ArgumentException("Index field selector cannot be empty.", nameof(value)); - } - - Value = value; - } - - public override string ToString() => Value; -} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexName.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexName.cs deleted file mode 100644 index 6aaa4c727..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/IndexName.cs +++ /dev/null @@ -1,32 +0,0 @@ -using System; -using System.Linq; - -namespace EventStore.Core.Services.Storage.Indexing; - -public sealed record IndexName -{ - public string Value { get; } - - public IndexName(string value) - { - if (string.IsNullOrWhiteSpace(value)) - { - throw new ArgumentException("Index name cannot be empty.", nameof(value)); - } - - if (!value.All(IsValidCharacter)) - { - throw new ArgumentException("Index name can contain only lowercase alphanumeric characters, underscores and dashes.", nameof(value)); - } - - Value = value; - } - - public override string ToString() => Value; - - private static bool IsValidCharacter(char value) => - value is >= 'a' and <= 'z' - or >= '0' and <= '9' - or '_' - or '-'; -} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexStreamId.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexStreamId.cs deleted file mode 100644 index 9c3f869a1..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/IndexStreamId.cs +++ /dev/null @@ -1,26 +0,0 @@ -using System; -using EventStore.Core.Services; - -namespace EventStore.Core.Services.Storage.Indexing; - -public sealed record IndexStreamId -{ - public string Value { get; } - - public IndexStreamId(string value) - { - if (SystemStreams.IsInvalidStream(value)) - { - throw new ArgumentException("Index stream id must be a valid stream id.", nameof(value)); - } - - if (SystemStreams.IsMetastream(value)) - { - throw new ArgumentException("Index stream id cannot be a metastream id.", nameof(value)); - } - - Value = value; - } - - public override string ToString() => Value; -} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexVirtualStreamReader.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexVirtualStreamReader.cs deleted file mode 100644 index 171699899..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/IndexVirtualStreamReader.cs +++ /dev/null @@ -1,32 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using EventStore.Core.Messages; -using EventStore.Core.Services.Storage.InMemory; - -namespace EventStore.Core.Services.Storage.Indexing; - -public abstract class IndexVirtualStreamReader : IVirtualStreamReader -{ - protected IndexVirtualStreamReader(IndexStreamId streamId) - { - ArgumentNullException.ThrowIfNull(streamId); - StreamId = streamId; - } - - public IndexStreamId StreamId { get; } - - public abstract ValueTask ReadForwards( - ClientMessage.ReadStreamEventsForward msg, - CancellationToken token); - - public abstract ValueTask ReadBackwards( - ClientMessage.ReadStreamEventsBackward msg, - CancellationToken token); - - public abstract long GetLastEventNumber(string streamId); - - public abstract long GetLastIndexedPosition(string streamId); - - public bool CanReadStream(string streamId) => streamId == StreamId.Value; -} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexingComponentHost.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexingComponentHost.cs deleted file mode 100644 index 4167c5614..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/IndexingComponentHost.cs +++ /dev/null @@ -1,96 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using EventStore.Core.Bus; -using EventStore.Core.Services.Storage.InMemory; -using Microsoft.AspNetCore.Builder; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; - -namespace EventStore.Core.Services.Storage.Indexing; - -public sealed class IndexingComponentHost : IVirtualStreamReaderProvider -{ - private readonly IReadOnlyList _components; - private readonly IReadOnlyList _virtualStreamReaders; - private readonly IndexingSubscriptionOptions _options; - - public IndexingComponentHost(IIndexingComponent component) - : this(component, IndexingSubscriptionOptions.Default) - { - } - - public IndexingComponentHost(IIndexingComponent component, IndexingSubscriptionOptions options) - : this([component], options) - { - } - - public IndexingComponentHost(IReadOnlyList components) - : this(components, IndexingSubscriptionOptions.Default) - { - } - - public IndexingComponentHost(IReadOnlyList components, IndexingSubscriptionOptions options) - { - ArgumentNullException.ThrowIfNull(components); - ArgumentNullException.ThrowIfNull(options); - - var componentArray = components.ToArray(); - if (componentArray.Length == 0) - { - throw new ArgumentException("At least one indexing component is required.", nameof(components)); - } - - if (componentArray.Any(static component => component is null)) - { - throw new ArgumentException("Indexing components cannot contain null.", nameof(components)); - } - - _components = componentArray; - _options = options; - _virtualStreamReaders = GetVirtualStreamReaders(componentArray); - } - - public IReadOnlyList VirtualStreamReaders => _virtualStreamReaders; - - public void ConfigureServices(IServiceCollection services, IConfiguration configuration) - { - ArgumentNullException.ThrowIfNull(services); - ArgumentNullException.ThrowIfNull(configuration); - - foreach (var component in _components) - { - services.AddSingleton(serviceProvider => new IndexingService( - component, - new AllStreamsIndexingEventSourceFactory(serviceProvider.GetRequiredService()), - serviceProvider.GetRequiredService(), - _options)); - } - } - - public void ConfigureApplication(IApplicationBuilder builder, IConfiguration configuration) - { - ArgumentNullException.ThrowIfNull(builder); - ArgumentNullException.ThrowIfNull(configuration); - - foreach (var service in builder.ApplicationServices.GetServices()) - { - service.Register(); - } - } - - private static IReadOnlyList GetVirtualStreamReaders(IReadOnlyList components) - { - var readers = components.SelectMany(static component => - component.VirtualStreamReaders - ?? throw new ArgumentException("Indexing component virtual stream readers cannot be null.", nameof(components))) - .ToArray(); - - if (readers.Any(static reader => reader is null)) - { - throw new ArgumentException("Indexing component virtual stream readers cannot contain null.", nameof(components)); - } - - return readers; - } -} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexingService.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexingService.cs deleted file mode 100644 index c0db4ecba..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/IndexingService.cs +++ /dev/null @@ -1,102 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using EventStore.Core.Bus; -using EventStore.Core.Messages; - -namespace EventStore.Core.Services.Storage.Indexing; - -public sealed class IndexingService : IAsyncHandle, IAsyncHandle, IAsyncDisposable -{ - private readonly ISubscriber _subscriber; - private readonly IndexingSubscription _subscription; - private readonly object _registrationLock = new(); - private int _disposed; - private int _started; - private bool _registered; - - public IndexingService( - IIndexingComponent component, - IIndexingEventSourceFactory eventSourceFactory, - ISubscriber subscriber, - IndexingSubscriptionOptions options) - { - ArgumentNullException.ThrowIfNull(component); - ArgumentNullException.ThrowIfNull(eventSourceFactory); - ArgumentNullException.ThrowIfNull(subscriber); - ArgumentNullException.ThrowIfNull(options); - - _subscriber = subscriber; - _subscription = new IndexingSubscription(component, eventSourceFactory, options); - } - - public void Register() - { - lock (_registrationLock) - { - ObjectDisposedException.ThrowIf(_disposed is not 0, this); - if (_registered) - { - return; - } - - _subscriber.Subscribe(this); - _subscriber.Subscribe(this); - _registered = true; - } - } - - public async ValueTask HandleAsync(SystemMessage.SystemReady message, CancellationToken token) - { - ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposed) is not 0, this); - - if (Interlocked.CompareExchange(ref _started, 1, 0) is not 0) - { - return; - } - - try - { - await _subscription.Start(token); - } - catch - { - Volatile.Write(ref _started, 0); - throw; - } - } - - public ValueTask HandleAsync(SystemMessage.BecomeShuttingDown message, CancellationToken token) => - DisposeAsync(); - - public async ValueTask DisposeAsync() - { - if (Interlocked.Exchange(ref _disposed, 1) is not 0) - { - return; - } - - try - { - await _subscription.DisposeAsync(); - } - finally - { - var registered = false; - lock (_registrationLock) - { - if (_registered) - { - _registered = false; - registered = true; - } - } - - if (registered) - { - _subscriber.Unsubscribe(this); - _subscriber.Unsubscribe(this); - } - } - } -} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs deleted file mode 100644 index 41c9a168e..000000000 --- a/src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs +++ /dev/null @@ -1,316 +0,0 @@ -using System; -using System.Runtime.ExceptionServices; -using System.Threading; -using System.Threading.Tasks; -using EventStore.Core.Services.Transport.Enumerators; -using Serilog; - -namespace EventStore.Core.Services.Storage.Indexing; - -public sealed record IndexingSubscriptionOptions -{ - public static readonly IndexingSubscriptionOptions Default = new(50000, TimeSpan.FromSeconds(10)); - - public int CheckpointCommitBatchSize { get; } - public TimeSpan CheckpointCommitDelay { get; } - - public IndexingSubscriptionOptions(int checkpointCommitBatchSize, TimeSpan checkpointCommitDelay) - { - ArgumentOutOfRangeException.ThrowIfNegativeOrZero(checkpointCommitBatchSize); - ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(checkpointCommitDelay, TimeSpan.Zero); - - CheckpointCommitBatchSize = checkpointCommitBatchSize; - CheckpointCommitDelay = checkpointCommitDelay; - } -} - -public sealed class IndexingSubscription : IAsyncDisposable -{ - private static readonly ILogger Log = Serilog.Log.ForContext(); - - private readonly IIndexingComponent _component; - private readonly IIndexingEventSourceFactory _eventSourceFactory; - private readonly IndexingSubscriptionOptions _options; - private readonly CancellationTokenSource _stop = new(); - private readonly object _stateLock = new(); - - private IIndexingProcessor _processor; - private IIndexingEventSource _eventSource; - private IndexCheckpointCommitTracker _commitTracker; - private Task _startup; - private Task _processing; - private bool _starting; - private bool _started; - private bool _disposed; - - public IndexingSubscription( - IIndexingComponent component, - IIndexingEventSourceFactory eventSourceFactory, - IndexingSubscriptionOptions options) - { - _component = component ?? throw new ArgumentNullException(nameof(component)); - _eventSourceFactory = eventSourceFactory ?? throw new ArgumentNullException(nameof(eventSourceFactory)); - _options = options ?? throw new ArgumentNullException(nameof(options)); - } - - public ValueTask Start(CancellationToken token) - { - Task startup; - lock (_stateLock) - { - ObjectDisposedException.ThrowIf(_disposed, this); - if (_started || _starting) - { - throw new InvalidOperationException($"{nameof(IndexingSubscription)} has already been started."); - } - - _starting = true; - startup = StartCore(token); - _startup = startup; - } - - return new ValueTask(startup); - } - - private async Task StartCore(CancellationToken token) - { - using var linked = CancellationTokenSource.CreateLinkedTokenSource(token, _stop.Token); - - IIndexingEventSource eventSource = null; - IndexCheckpointCommitTracker commitTracker = null; - - try - { - token.ThrowIfCancellationRequested(); - await _component.Initialize(linked.Token); - var checkpoint = await _component.ReadCheckpoint(linked.Token); - token.ThrowIfCancellationRequested(); - - var processor = _component.Processor - ?? throw new InvalidOperationException("Indexing component returned null processor."); - - commitTracker = new IndexCheckpointCommitTracker( - _options.CheckpointCommitBatchSize, - _options.CheckpointCommitDelay, - processor.Commit, - CancellationToken.None); - - eventSource = _eventSourceFactory.Create(checkpoint, _stop.Token) - ?? throw new InvalidOperationException("Indexing event source factory returned null."); - token.ThrowIfCancellationRequested(); - - lock (_stateLock) - { - ObjectDisposedException.ThrowIf(_disposed, this); - - _commitTracker = commitTracker; - _processor = processor; - _eventSource = eventSource; - _processing = Task.Run(ProcessEvents); - ObserveProcessingFault(_processing); - _started = true; - _starting = false; - } - } - catch - { - lock (_stateLock) - { - _starting = false; - } - - if (commitTracker is not null) - { - await commitTracker.DisposeAsync(); - } - - if (eventSource is not null) - { - await eventSource.DisposeAsync(); - } - - throw; - } - } - - public ValueTask DisposeAsync() => DisposeAsync(ignoreProcessingFailure: false); - - private async ValueTask DisposeAsync(bool ignoreProcessingFailure) - { - Task startup; - Task processing; - IIndexingEventSource eventSource; - IndexCheckpointCommitTracker commitTracker; - - lock (_stateLock) - { - if (_disposed) - { - return; - } - - _disposed = true; - startup = _startup; - } - - await _stop.CancelAsync(); - - Exception failure = null; - - try - { - if (startup is not null) - { - await startup; - } - } - catch (OperationCanceledException) when (_stop.IsCancellationRequested) - { - } - catch (ObjectDisposedException) when (_stop.IsCancellationRequested) - { - } - catch - { - } - - lock (_stateLock) - { - processing = _processing; - eventSource = _eventSource; - commitTracker = _commitTracker; - } - - try - { - if (processing is not null) - { - try - { - await processing; - } - catch (OperationCanceledException) when (_stop.IsCancellationRequested) - { - } - } - } - catch (Exception) when (ignoreProcessingFailure) - { - } - catch (Exception ex) - { - failure = ex; - } - - try - { - if (commitTracker is not null) - { - await commitTracker.DisposeAsync(); - } - } - catch (Exception ex) when (failure is null) - { - failure = ex; - } - - try - { - if (eventSource is not null) - { - await eventSource.DisposeAsync(); - } - } - catch (Exception ex) when (failure is null) - { - failure = ex; - } - - try - { - await _component.DisposeAsync(); - } - catch (Exception ex) when (failure is null) - { - failure = ex; - } - - try - { - _stop.Dispose(); - } - catch (Exception ex) when (failure is null) - { - failure = ex; - } - - if (failure is not null) - { - ExceptionDispatchInfo.Capture(failure).Throw(); - } - } - - private void ObserveProcessingFault(Task processing) - { - processing.ContinueWith( - task => _ = DisposeAfterProcessingFault(task), - CancellationToken.None, - TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously, - TaskScheduler.Default); - } - - private async Task DisposeAfterProcessingFault(Task processing) - { - Log.Error(processing.Exception, "Indexing subscription stopped unexpectedly"); - - try - { - await DisposeAsync(ignoreProcessingFailure: true); - } - catch (Exception ex) - { - Log.Error(ex, "Error while disposing failed indexing subscription"); - } - } - - private async Task ProcessEvents() - { - while (!_stop.IsCancellationRequested) - { - bool hasNext; - try - { - hasNext = await _eventSource.MoveNextAsync(); - } - catch (OperationCanceledException) when (_stop.IsCancellationRequested) - { - return; - } - - if (!hasNext) - { - throw new InvalidOperationException("Indexing event source completed unexpectedly."); - } - - if (_eventSource.Current is not ReadResponse.EventReceived eventReceived) - { - continue; - } - - try - { - await _processor.Index(eventReceived.Event, CancellationToken.None); - _commitTracker.Track(); - } - catch (OperationCanceledException) when (_stop.IsCancellationRequested) - { - return; - } - catch (Exception ex) - { - Log.Error(ex, "Error while indexing event {eventType}", eventReceived.Event.Event.EventType); - throw; - } - } - } -} diff --git a/src/EventStore.Core/Services/SubscriptionsService.cs b/src/EventStore.Core/Services/SubscriptionsService.cs index db2da48c2..dba64708b 100644 --- a/src/EventStore.Core/Services/SubscriptionsService.cs +++ b/src/EventStore.Core/Services/SubscriptionsService.cs @@ -48,6 +48,7 @@ public class SubscriptionsService : IAsyncHandle { private const int DontReportCheckpointReached = -1; + private const long InMemoryStreamLastIndexedPosition = -1; private static readonly TimeSpan TimeoutPeriod = TimeSpan.FromSeconds(1); @@ -169,7 +170,7 @@ public void Handle(TcpMessage.ConnectionClosed message) } var lastIndexedPos = isVirtualStream - ? _virtualStreamReader.GetLastIndexedPosition(msg.EventStreamId) + ? InMemoryStreamLastIndexedPosition : _readIndex.LastIndexedPosition; SubscribeToStream(msg.CorrelationId, msg.Envelope, msg.ConnectionId, msg.EventStreamId, @@ -199,7 +200,7 @@ public void Handle(TcpMessage.ConnectionClosed message) } var lastIndexedPos = isVirtualStream - ? _virtualStreamReader.GetLastIndexedPosition(msg.EventStreamId) + ? InMemoryStreamLastIndexedPosition : _readIndex.LastIndexedPosition; SubscribeToStream(msg.CorrelationId, msg.Envelope, msg.ConnectionId, msg.EventStreamId,