From 96ea9c9f3dd48051e16a980e5edeffeded07da6c Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Sun, 28 Jun 2026 18:08:53 -0400 Subject: [PATCH] feat(indexing): support typed virtual stream routing Signed-off-by: Yordis Prieto --- .../Indexing/IndexVirtualStreamReaderTests.cs | 69 +++++++++++++++++++ .../Indexing/IndexVirtualStreamReader.cs | 32 +++++++++ 2 files changed, 101 insertions(+) create mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexVirtualStreamReaderTests.cs create mode 100644 src/EventStore.Core/Services/Storage/Indexing/IndexVirtualStreamReader.cs diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexVirtualStreamReaderTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexVirtualStreamReaderTests.cs new file mode 100644 index 000000000..76ae22ac5 --- /dev/null +++ b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexVirtualStreamReaderTests.cs @@ -0,0 +1,69 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using EventStore.Core.Messages; +using EventStore.Core.Services.Storage.Indexing; +using Xunit; + +namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; + +public class IndexVirtualStreamReaderTests +{ + [Fact] + public void constructor_rejects_missing_stream_id() + { + var exception = Assert.Throws(() => new TestIndexVirtualStreamReader(null!)); + + Assert.Equal("streamId", exception.ParamName); + } + + [Fact] + public void exposes_stream_id() + { + var streamId = new IndexStreamId("$index-stream"); + var reader = new TestIndexVirtualStreamReader(streamId); + + Assert.Same(streamId, reader.StreamId); + } + + [Fact] + public void can_read_stream_when_candidate_matches() + { + var reader = new TestIndexVirtualStreamReader(new IndexStreamId("$index-stream")); + + Assert.True(reader.CanReadStream("$index-stream")); + } + + [Fact] + public void cannot_read_stream_when_candidate_differs() + { + var reader = new TestIndexVirtualStreamReader(new IndexStreamId("$index-stream")); + + Assert.False(reader.CanReadStream("other-stream")); + } + + [Fact] + public void cannot_read_stream_when_candidate_is_null() + { + var reader = new TestIndexVirtualStreamReader(new IndexStreamId("$index-stream")); + + Assert.False(reader.CanReadStream(null!)); + } + + private sealed class TestIndexVirtualStreamReader(IndexStreamId streamId) : IndexVirtualStreamReader(streamId) + { + public override ValueTask ReadForwards( + ClientMessage.ReadStreamEventsForward msg, + CancellationToken token) => + throw new NotSupportedException(); + + public override ValueTask ReadBackwards( + ClientMessage.ReadStreamEventsBackward msg, + CancellationToken token) => + throw new NotSupportedException(); + + public override long GetLastEventNumber(string streamId) => throw new NotSupportedException(); + + public override long GetLastIndexedPosition(string streamId) => throw new NotSupportedException(); + } +} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IndexVirtualStreamReader.cs b/src/EventStore.Core/Services/Storage/Indexing/IndexVirtualStreamReader.cs new file mode 100644 index 000000000..171699899 --- /dev/null +++ b/src/EventStore.Core/Services/Storage/Indexing/IndexVirtualStreamReader.cs @@ -0,0 +1,32 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using EventStore.Core.Messages; +using EventStore.Core.Services.Storage.InMemory; + +namespace EventStore.Core.Services.Storage.Indexing; + +public abstract class IndexVirtualStreamReader : IVirtualStreamReader +{ + protected IndexVirtualStreamReader(IndexStreamId streamId) + { + ArgumentNullException.ThrowIfNull(streamId); + StreamId = streamId; + } + + public IndexStreamId StreamId { get; } + + public abstract ValueTask ReadForwards( + ClientMessage.ReadStreamEventsForward msg, + CancellationToken token); + + public abstract ValueTask ReadBackwards( + ClientMessage.ReadStreamEventsBackward msg, + CancellationToken token); + + public abstract long GetLastEventNumber(string streamId); + + public abstract long GetLastIndexedPosition(string streamId); + + public bool CanReadStream(string streamId) => streamId == StreamId.Value; +}