Skip to content

feat(indexing): support durable event processing#387

Open
yordis wants to merge 8 commits into
masterfrom
yordis/feat-indexing-event-consumer
Open

feat(indexing): support durable event processing#387
yordis wants to merge 8 commits into
masterfrom
yordis/feat-indexing-event-consumer

Conversation

@yordis

@yordis yordis commented Jun 25, 2026

Copy link
Copy Markdown
Member
  • Internal indexes need a durable way to keep their state aligned with committed events before query surfaces can rely on them.

Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
@cursor

cursor Bot commented Jun 25, 2026

Copy link
Copy Markdown

PR Summary

Medium Risk
Touches core storage indexing and shutdown paths; incorrect checkpoint or commit ordering could affect query consistency, though behavior is isolated behind new interfaces and heavily unit-tested.

Overview
Adds a durable indexing pipeline so internal indexes can catch up from a stored checkpoint, process committed events from an all-streams subscription, and persist progress via batched or time-based commits.

IndexingSubscription initializes an IIndexingComponent, resumes from IndexCheckpoint, feeds EventReceived items through IIndexingProcessor.Index, and drives commits through IndexCheckpointCommitTracker (batch size and delay from IndexingSubscriptionOptions). Startup, shutdown, and processing faults are handled with ordered disposal, flush-on-dispose, and self-cleanup when the event source ends unexpectedly.

IndexingService starts the subscription on SystemReady and disposes it on BecomeShuttingDown. AllStreamsIndexingEventSourceFactory wraps Enumerator.AllSubscription at the checkpoint position for the live event source.

New abstractions: IIndexingComponent / IIndexingProcessor, IIndexingEventSource / factory, and IndexCheckpoint (commit/prepare positions with validation). IndexingSubscriptionTests covers checkpoint resume, commit triggers, dispose semantics, and concurrency edge cases. ThreadBasedSchedulerTests only widens async wait timeouts to a shared 5-second constant.

Reviewed by Cursor Bugbot for commit 016775f. Bugbot is set up for automated code reviews on this repo. Configure here.

@coderabbitai

coderabbitai Bot commented Jun 25, 2026

Copy link
Copy Markdown

Review Change Stack

Warning

Review limit reached

@yordis, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 39 minutes and 14 seconds. Learn how PR review limits work.

Your organization has used up its prepaid credits, and credit purchases are no longer available. Enable the review add-on in the billing tab to keep reviews running — you're only billed for reviews past your plan's rate limits ($0.25/file).

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

To avoid repeated limits, reduce automatic review volume by pausing incremental auto-reviews earlier, using label-based review opt-in, excluding WIP or generated PR titles, or requesting reviews manually when the PR is ready. If your team needs uninterrupted high-volume reviews, an organization admin can enable usage-based credits.

🚦 How do rate limits work?

CodeRabbit enforces per-developer PR review limits for each organization. Most developers receive the normal plan review availability.

For paid Pro and Pro+ PR reviews, CodeRabbit uses adaptive limits for sustained high-volume activity. When a developer's recent PR review activity reaches the 95th percentile or higher among CodeRabbit users, additional reviews become available more gradually as earlier reviews age out of the rolling window.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ef1b94a4-4fea-41ee-ba5a-403b1ec9791a

📥 Commits

Reviewing files that changed from the base of the PR and between abd1bf9 and 016775f.

📒 Files selected for processing (2)
  • src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs
  • src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs

Walkthrough

Introduces indexing checkpoint and source abstractions, an indexing subscription with lifecycle and commit handling, a service that starts and stops it on system bus events, and tests covering checkpoint selection, event ordering, commit timing, and disposal.

Changes

Indexing subscription stack

Layer / File(s) Summary
Checkpoint and interface contracts
src/EventStore.Core/Services/Storage/Indexing/IndexCheckpoint.cs, src/EventStore.Core/Services/Storage/Indexing/IIndexingComponent.cs, src/EventStore.Core/Services/Storage/Indexing/IIndexingEventSource.cs
IndexCheckpoint validates commit and prepare positions, and the new component, processor, event-source, and factory interfaces define checkpoint reads, iteration, indexing, and commits.
All-streams event source adapter
src/EventStore.Core/Services/Storage/Indexing/IIndexingEventSource.cs
AllStreamsIndexingEventSourceFactory builds an Enumerator.AllSubscription from an optional checkpoint and exposes it through an IIndexingEventSource wrapper.
Subscription lifecycle and validation
src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs, src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs
IndexingSubscription initializes the component, reads checkpoints, indexes events, commits on batch or delay, and disposes resources; the tests use in-file fakes to cover checkpoint selection, retry, disposal, commit timing, and fault handling.
Service wiring
src/EventStore.Core/Services/Storage/Indexing/IndexingService.cs
IndexingService registers SystemReady and BecomeShuttingDown handlers, starts the subscription on readiness, and disposes it during shutdown.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

I hopped through checkpoints, crisp and neat,
With tiny commits on bunny feet.
When shutdown came, I tucked things in,
Closed the burrow, smiled a grin.
🐇✨ The index hums beneath the moon.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: durable event processing for indexing.
Description check ✅ Passed The description is clearly related to the indexing durability goal and matches the changeset.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch yordis/feat-indexing-event-consumer

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

Comment thread src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs Outdated
Comment thread src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs`:
- Line 67: The background worker started in IndexingSubscription.ProcessEvents
is faulting asynchronously and only surfacing during DisposeAsync, which can
skip cleanup. Update the Task.Run(ProcessEvents) flow in IndexingSubscription so
worker exceptions are observed/reported as soon as they occur, and wrap the
await of _processing in DisposeAsync with a try/finally so _commitTracker,
_eventSource, and component still get disposed even when ProcessEvents fails.
- Around line 44-67: Move the `_started` state change in
`IndexingSubscription.Start` so it only happens after `component.Initialize`,
`component.ReadCheckpoint`, `_commitTracker` creation, and `_eventSource`
creation all succeed. If startup can fail after the lock is released, either set
`_started = true` at the end of the successful setup or add a `catch` that rolls
it back and disposes any partially created state before rethrowing. Use the
`Start` method and `_started` field in `IndexingSubscription` as the main points
to update.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c6664e69-f34d-41f2-b9b4-7fa59893fa1e

📥 Commits

Reviewing files that changed from the base of the PR and between dee51c3 and 7690ef6.

📒 Files selected for processing (6)
  • src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/IndexingSubscriptionTests.cs
  • src/EventStore.Core/Services/Storage/Indexing/IIndexingComponent.cs
  • src/EventStore.Core/Services/Storage/Indexing/IIndexingEventSource.cs
  • src/EventStore.Core/Services/Storage/Indexing/IndexCheckpoint.cs
  • src/EventStore.Core/Services/Storage/Indexing/IndexingService.cs
  • src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs

Comment thread src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs Outdated
Comment thread src/EventStore.Core/Services/Storage/Indexing/IndexingSubscription.cs Outdated
yordis added 3 commits June 25, 2026 03:30
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 12af056. Configure here.

Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant