Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ It is built with stateless vertical and horizontal scaling in cloud-native envir
> **The framework is async-only.** Aggregates, repositories, handlers, the commit
> path, and the service bus are all `async`. There is no synchronous repository or
> bus API. Persistence adapters (Postgres, SQLite) and transports (NATS, RabbitMQ,
> Kafka, Knative) implement the async traits directly with no blocking shims.
> Kafka, Knative) expose async traits directly; broker/client blocking primitives,
> where unavoidable, stay internal to async transport methods.

## At a Glance

Expand Down Expand Up @@ -284,7 +285,7 @@ Every infrastructure concern in `distributed` follows the same pattern: an **asy
| Messaging | `Bus` + `BusConsumer` | `InMemoryBus` | `NatsBus`, `PostgresBus`, `RabbitBus`, `KafkaBus`, `KnativeBus` |
| Read model rows | `ReadModelWritePlanStore` + `RelationalReadModelQueryStore` | `InMemoryReadModelStore` | Postgres, SQLite |
| Snapshot store | `SnapshotStore` | `InMemorySnapshotStore` | Postgres, SQLite, … |
| Outbox publishing | `AsyncMessagePublisher` (production; the extension point) — sync `OutboxPublisher` is dev/test only | `LogPublisher` (dev/test) | Any `AsyncMessagePublisher` (e.g. `BusPublisher` over a real `Bus`) |
| Outbox publishing | `OutboxStore` + async `AsyncMessagePublisher` / `OutboxPublisher` | `LogPublisher` (dev/test) | Any `AsyncMessagePublisher` (e.g. `BusPublisher` over a real `Bus`) |
| Locking | `AsyncLock` + `AsyncLockManager` | `InMemoryAsyncLockManager` | `PostgresLockManager`, `SqliteLockManager` (durable leases), Redis, … |

All in-memory defaults are `Clone` and `Send + Sync`, so they work in single-task tests and multi-task servers alike. When you're ready for production, implement the trait for your infrastructure and plug it in — handler code does not change.
Expand Down
9 changes: 4 additions & 5 deletions docs/async-transports.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Async Microservice Transports

Distributed (published from the `distributed` crate) keeps the synchronous
in-memory bus intact and adds an async transport layer under
`bus`. The design line is:
Distributed (published from the `distributed` crate) exposes an async-only
transport layer under `bus`. The in-memory bus is the dev/test implementation of
the same async contracts. The design line is:

- **`microsvc`** owns handler registration, guards, typed input decoding,
dispatch, and handler metadata;
Expand Down Expand Up @@ -202,6 +202,5 @@ outbox dispatcher, the conformance harness, the Postgres / NATS / RabbitMQ /
Kafka adapters, the Knative ingress, and the **bus facade** (`Bus` +
`BusConsumer` with `InMemoryBus` / `NatsBus` / `PostgresBus` / `RabbitBus` /
`KafkaBus` / `KnativeBus`, each with real-broker competing-vs-fan-out tests).
Still open: migrating the in-repo examples to showcase these APIs and removing the
legacy synchronous bus paths (a breaking change). See
Still open: migrating the in-repo examples to showcase these APIs. See
`tasks/transport-docs-examples-cutover`.
19 changes: 5 additions & 14 deletions docs/postgres-event-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,19 +323,10 @@ legacy import path. Newly written Postgres rows must always populate

## Async Posture

Do not hide production Postgres access behind the current synchronous repository
traits by blocking inside Tokio worker threads. The production Postgres
repository should be async-first, preferably backed by `sqlx`.

Because the current public traits are synchronous, the Postgres implementation is
deferred behind async trait work. Acceptable interim choices:

- keep `HashMapRepository` as the synchronous behavioral reference;
- design async traits that mirror `Get`, `Commit`, `TransactionalCommit`,
read-model, snapshot, and outbox operations;
- add an explicitly named blocking adapter only for tests or transitional use.

The blocking adapter must not be the final production contract.
Postgres access is async-first through the public repository, read-model,
snapshot, inbox, lock, and outbox traits. Do not add production Postgres access
behind blocking adapters or synchronous repository shims; SQL I/O belongs in
async `sqlx` paths.

## TypeORM Lineage

Expand Down Expand Up @@ -398,4 +389,4 @@ Postgres-specific tests should add:
- Implementing the actual Postgres repository.
- Designing the final read-model table layout.
- Designing the outbox message table in detail.
- Replacing synchronous traits in this document.
- Changing the established async repository trait surface.
5 changes: 1 addition & 4 deletions docs/repositories.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,10 @@ aggregate_type = "...")`, `aggregate!(..., aggregate_type = "..." { ... })`, or
identity. The record envelope carries stream identity, covered event version,
snapshot payload type/version, payload codec metadata, cache metadata, and
timestamp.
- `AsyncOutboxStore` exposes async claim/update operations for durable outbox
- `OutboxStore` exposes async claim/update operations for durable outbox
table stores. Aggregate repositories commit outbox rows transactionally, but
workers do not hydrate outbox messages through aggregate repositories.

`AsyncOutboxStore` keeps its prefix because `OutboxStore` remains the synchronous
worker-facing API for in-memory/local publisher workflows.

## In-Memory Reference

`HashMapRepository`, `InMemoryReadModelStore`, and `InMemorySnapshotStore`
Expand Down
4 changes: 2 additions & 2 deletions docs/research-and-roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ Based on a review of the Rust ES ecosystem (cqrs-es, disintegrate, esrs, eventua
### Core Improvements

#### Async traits
All competing frameworks are async-first. The original traits (`Repository`, `Commit`, `Get`, etc.) are synchronous, which blocks direct integration with:
All competing frameworks are async-first. The current public persistence and transport traits are async-first, which supports direct integration with:
- Async databases (sqlx, sea-orm)
- Async message brokers (rdkafka, lapin)
- Async web frameworks (axum handlers)

Foundation status: stream-aware repository/read-model/snapshot traits now form the async-only persistence surface. See [Repository Boundary](repositories.md). The SQL backends implement those traits directly instead of wrapping database I/O behind synchronous repository traits.
Foundation status: stream-aware repository/read-model/snapshot/outbox traits now form the async-only persistence surface. See [Repository Boundary](repositories.md). The SQL backends implement those traits directly instead of wrapping database I/O behind synchronous repository traits.

### Later

Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ pub use outbox::{

// Outbox Worker: drain and publish concerns
pub use outbox_worker::{
AsyncOutboxStore,
ClaimOutboxMessages,
// Worker
DrainResult,
Expand Down
3 changes: 1 addition & 2 deletions src/microsvc/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use crate::aggregate::AggregateRepository;
use crate::outbox::OutboxPublisherConfig;
use crate::outbox_worker::AsyncOutboxStore;
use crate::repository::{ReadModelWritePlanStore, RelationalReadModelQueryStore, Repository};

/// Dependency capability for services that expose an aggregate repository.
Expand Down Expand Up @@ -59,7 +58,7 @@ impl<R: HasOutboxStore, S> HasOutboxStore for RepoReadModelDependencies<R, S> {
/// (`AggregateRepository` -> `QueuedRepository` -> the leaf SQL/in-memory repo).
pub trait HasOutboxStore {
/// The concrete outbox store this repository produces.
type OutboxStore: AsyncOutboxStore;
type OutboxStore: crate::outbox_worker::OutboxStore;

/// Produce a handle to the durable outbox store.
fn outbox_store(&self) -> Self::OutboxStore;
Expand Down
14 changes: 7 additions & 7 deletions src/microsvc/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ mod tests {

use crate::bus::{Bus, InMemoryBus, RunOptions};
use crate::microsvc::{Context, HandlerError, HasOutboxStore, Service, Session};
use crate::outbox_worker::AsyncOutboxStore;
use crate::outbox_worker::OutboxStore;
use crate::{
sourced, AggregateBuilder, AggregateRepository, Entity, HashMapRepository, OutboxMessage,
OutboxMessageStatus, Queueable, QueuedRepository, Snapshot,
Expand Down Expand Up @@ -180,12 +180,12 @@ mod tests {
assert_eq!(receipt.outbox_message_ids(), ["evt-1".to_string()]);

let published = store
.messages_by_status_async(OutboxMessageStatus::Published)
.messages_by_status(OutboxMessageStatus::Published)
.await
.unwrap();
assert_eq!(published.len(), 1, "row should be published at commit time");
assert_eq!(published[0].id(), "evt-1");
assert!(store.pending_async().await.unwrap().is_empty());
assert!(store.pending().await.unwrap().is_empty());
}

type TouchRepo = AggregateRepository<QueuedRepository<HashMapRepository>, Dummy>;
Expand Down Expand Up @@ -217,12 +217,12 @@ mod tests {

let store = service.repo().outbox_store();
let published = store
.messages_by_status_async(OutboxMessageStatus::Published)
.messages_by_status(OutboxMessageStatus::Published)
.await
.unwrap();
assert_eq!(published.len(), 1, "row should be published immediately");
assert_eq!(published[0].id(), "evt-1");
assert!(store.pending_async().await.unwrap().is_empty());
assert!(store.pending().await.unwrap().is_empty());
}

#[tokio::test]
Expand All @@ -244,7 +244,7 @@ mod tests {
service.run(RunOptions::idempotent()).await.unwrap();

let published = store
.messages_by_status_async(OutboxMessageStatus::Published)
.messages_by_status(OutboxMessageStatus::Published)
.await
.unwrap();
assert_eq!(
Expand Down Expand Up @@ -303,7 +303,7 @@ mod tests {

let store = service.repo().outbox_store();
let published = store
.messages_by_status_async(OutboxMessageStatus::Published)
.messages_by_status(OutboxMessageStatus::Published)
.await
.unwrap();
assert_eq!(
Expand Down
4 changes: 2 additions & 2 deletions src/outbox/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ mod tests {
assert!(receipt.has_outbox_messages());
assert_eq!(receipt.outbox_message_ids(), ["msg-1".to_string()]);

let pending = repo.repo().outbox_store().pending().unwrap();
let pending = repo.repo().outbox_store().pending().await.unwrap();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].id(), "msg-1");
}
Expand Down Expand Up @@ -404,7 +404,7 @@ mod tests {
);

// 2) outbox row present (pending — no bus attached here)
let pending = repo.repo().outbox_store().pending().unwrap();
let pending = repo.repo().outbox_store().pending().await.unwrap();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].id(), "evt-c1");

Expand Down
36 changes: 7 additions & 29 deletions src/outbox_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,11 @@
//!
//! This module provides the worker infrastructure for processing outbox messages.
//!
//! There are two drain paths:
//! - **Production / extension point**: the async `OutboxDispatcher` +
//! `AsyncMessagePublisher` (`BusPublisher` over a `Bus`), wired by
//! `service.with_bus(bus)`.
//! - **Dev/test**: the synchronous `OutboxWorker` + `OutboxPublisher` +
//! `LogPublisher` trio — no async runtime, no real transport.
//!
//! Items:
//! - `OutboxStore` - Store operations for claiming and completing messages
//! - `OutboxDispatcher` / `BusPublisher` - the async production drain path
//! - `OutboxWorker` - synchronous dev/test message processor
//! - `OutboxPublisher` - synchronous dev/test publish trait
//! - `OutboxWorker` - async loaded-message processor
//! - `OutboxPublisher` - async loaded-message publish trait
//! - `LogPublisher` - simple logging publisher for tests
//! - `LocalEmitterPublisher` - In-process event emitter (requires `emitter` feature)
//!
Expand All @@ -26,25 +19,12 @@
//! ## Example
//!
//! ```ignore
//! use distributed::{ClaimOutboxMessages, OutboxClaimRef, OutboxStore, OutboxWorker, LogPublisher};
//! use distributed::OutboxDispatcher;
//! use std::time::Duration;
//!
//! // Claim pending messages
//! let worker_id = "worker-1";
//! let messages = outbox.claim(ClaimOutboxMessages::new(worker_id, 10, Duration::from_secs(60)))?;
//!
//! // Process with a worker
//! let mut worker = OutboxWorker::new(LogPublisher::default()).with_worker_id(worker_id);
//! for mut msg in messages {
//! let claim = OutboxClaimRef::from_message(&msg)?;
//! let result = worker.process_message(&mut msg)?;
//! if result.completed {
//! outbox.complete(&claim)?;
//! } else if result.released || result.failed {
//! let error = msg.last_error.as_deref().unwrap_or("publish failed");
//! outbox.record_failure(&claim, error, 3)?;
//! }
//! }
//! let dispatcher =
//! OutboxDispatcher::new(outbox, publisher, "worker-1", Duration::from_secs(60), 3);
//! let outcome = dispatcher.dispatch_batch(10).await?;
//! ```

mod bus_publisher;
Expand All @@ -63,9 +43,7 @@ pub use publisher::{LogPublisher, LogPublisherError, OutboxPublisher};
// Repository helpers
#[cfg(any(feature = "postgres", feature = "sqlite"))]
pub(crate) use store::ensure_active_claim;
pub use store::{
AsyncOutboxStore, ClaimOutboxMessages, OutboxClaimRef, OutboxPublishFailureAction, OutboxStore,
};
pub use store::{ClaimOutboxMessages, OutboxClaimRef, OutboxPublishFailureAction, OutboxStore};

// Worker
pub use worker::{DrainResult, OutboxWorker, ProcessOneResult};
Expand Down
12 changes: 6 additions & 6 deletions src/outbox_worker/outbox_dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

use std::time::Duration;

use super::{AsyncOutboxStore, ClaimOutboxMessages, OutboxClaimRef, OutboxPublishFailureAction};
use super::{ClaimOutboxMessages, OutboxClaimRef, OutboxPublishFailureAction, OutboxStore};
use crate::bus::{AsyncMessagePublisher, Message, MessageKind, TransportError, TransportErrorKind};
use crate::outbox::OutboxMessage;
use crate::repository::RepositoryError;
Expand Down Expand Up @@ -149,7 +149,7 @@ pub struct OutboxDispatcher<S, P> {

impl<S, P> OutboxDispatcher<S, P>
where
S: AsyncOutboxStore,
S: OutboxStore,
P: AsyncMessagePublisher,
{
/// Create a dispatcher. `worker_id` scopes claims (use a synthetic id such as
Expand Down Expand Up @@ -190,7 +190,7 @@ where
) -> Result<OutboxDispatchOutcome, TransportError> {
let request =
ClaimOutboxMessages::for_ids(self.worker_id.clone(), ids.to_vec(), self.lease);
let claimed = self.store.claim_async(request).await?;
let claimed = self.store.claim(request).await?;
let mut outcome = self.dispatch_claimed(claimed).await?;
outcome.requested = ids.len();
Ok(outcome)
Expand All @@ -202,7 +202,7 @@ where
batch_size: usize,
) -> Result<OutboxDispatchOutcome, TransportError> {
let request = ClaimOutboxMessages::new(self.worker_id.clone(), batch_size, self.lease);
let claimed = self.store.claim_async(request).await?;
let claimed = self.store.claim(request).await?;
let mut outcome = self.dispatch_claimed(claimed).await?;
outcome.requested = batch_size;
Ok(outcome)
Expand All @@ -224,13 +224,13 @@ where
let transport_message = Message::from(&message);
match self.publisher.publish(transport_message).await {
Ok(()) => {
self.store.complete_async(&claim).await?;
self.store.complete(&claim).await?;
outcome.published += 1;
}
Err(publish_error) => {
match self
.store
.record_failure_async(&claim, &publish_error.to_string(), self.max_attempts)
.record_failure(&claim, &publish_error.to_string(), self.max_attempts)
.await?
{
OutboxPublishFailureAction::Released => outcome.released += 1,
Expand Down
Loading
Loading