diff --git a/CHANGELOG.md b/CHANGELOG.md index dbf36269bc..f1ec8d96bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Drain the pending tx queue in merged batches with a durable WAL-backed ack, fixing severe queue backlog under heavy tx load. Tx dedup moved from the reaper cache into the sequencer queue [#3351](https://github.com/evstack/ev-node/pull/3351) + ## v1.1.2 ### Changes diff --git a/block/components.go b/block/components.go index 94e3c1fbc4..94085031c5 100644 --- a/block/components.go +++ b/block/components.go @@ -278,7 +278,6 @@ func newAggregatorComponents( sequencer, genesis, logger, - cacheManager, config.Node.ScrapeInterval.Duration, executor.NotifyNewTransactions, ) diff --git a/block/internal/cache/generic_cache.go b/block/internal/cache/generic_cache.go index dc3e1b7d14..0ef068c51a 100644 --- a/block/internal/cache/generic_cache.go +++ b/block/internal/cache/generic_cache.go @@ -58,19 +58,6 @@ func (c *Cache) isSeen(hash string) bool { return c.hashes[hash] } -// areSeen checks which hashes have been seen. Returns a boolean slice -// parallel to the input where result[i] is true if hashes[i] is in the -// cache. Acquires the read lock once for the entire batch. -func (c *Cache) areSeen(hashes []string) []bool { - c.mu.RLock() - defer c.mu.RUnlock() - result := make([]bool, len(hashes)) - for i, h := range hashes { - result[i] = c.hashes[h] - } - return result -} - func (c *Cache) setSeen(hash string, height uint64) { c.mu.Lock() defer c.mu.Unlock() @@ -82,37 +69,6 @@ func (c *Cache) setSeen(hash string, height uint64) { c.hashByHeight[height] = hash } -func (c *Cache) removeSeen(hash string) { - c.mu.Lock() - defer c.mu.Unlock() - delete(c.hashes, hash) -} - -// setSeenBatch marks all hashes as seen under a single write lock. -// For height 0 (transactions), the hashByHeight bookkeeping is skipped -// since all txs share the same sentinel height — the map lookup and -// overwrite on every entry is pure overhead with no benefit. -func (c *Cache) setSeenBatch(hashes []string, height uint64) { - c.mu.Lock() - defer c.mu.Unlock() - if height == 0 { - for _, h := range hashes { - c.hashes[h] = true - } - return - } - - // currently not used, but there for completeness against setSeen - for _, h := range hashes { - if existing, ok := c.hashByHeight[height]; ok && existing == h { - c.hashes[existing] = true - continue - } - c.hashes[h] = true - c.hashByHeight[height] = h - } -} - func (c *Cache) getDAIncluded(hash string) (uint64, bool) { c.mu.RLock() defer c.mu.RUnlock() diff --git a/block/internal/cache/generic_cache_test.go b/block/internal/cache/generic_cache_test.go index ec4e92e7f7..4af79f7b33 100644 --- a/block/internal/cache/generic_cache_test.go +++ b/block/internal/cache/generic_cache_test.go @@ -199,8 +199,6 @@ func TestCache_BasicOperations(t *testing.T) { assert.False(t, c.isSeen("hash1")) c.setSeen("hash1", 1) assert.True(t, c.isSeen("hash1")) - c.removeSeen("hash1") - assert.False(t, c.isSeen("hash1")) _, ok := c.getDAIncluded("hash2") assert.False(t, ok) diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index 4d95a7d7e5..cbf1b8934d 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -5,7 +5,6 @@ import ( "encoding/binary" "fmt" "sync" - "time" "github.com/rs/zerolog" @@ -24,10 +23,6 @@ const ( // DataDAIncludedPrefix is the store key prefix for data DA inclusion tracking. DataDAIncludedPrefix = "cache/data-da-included/" - - // DefaultTxCacheRetention is the default time to keep transaction hashes in cache. - // Keeping a too high value can lead to OOM during heavy transaction load. - DefaultTxCacheRetention = 30 * time.Minute ) // CacheManager provides thread-safe cache operations for tracking seen blocks @@ -51,13 +46,6 @@ type CacheManager interface { SetDataDAIncluded(daCommitmentHash string, daHeight uint64, blockHeight uint64) RemoveDataDAIncluded(hash string) - // Transaction operations - IsTxSeen(hash string) bool - AreTxsSeen(hashes []string) []bool - SetTxSeen(hash string) - SetTxsSeen(hashes []string) - CleanupOldTxs(olderThan time.Duration) int - // Pending events syncing coordination GetNextPendingEvent(blockHeight uint64) *common.DAHeightEvent SetPendingEvent(blockHeight uint64, event *common.DAHeightEvent) @@ -94,8 +82,6 @@ var _ Manager = (*implementation)(nil) type implementation struct { headerCache *Cache dataCache *Cache - txCache *Cache - txTimestamps *sync.Map // map[string]time.Time pendingEvents map[uint64]*common.DAHeightEvent pendingMu sync.Mutex pendingHeaders *PendingHeaders @@ -109,7 +95,6 @@ type implementation struct { func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manager, error) { headerCache := NewCache(st, HeaderDAIncludedPrefix) dataCache := NewCache(st, DataDAIncludedPrefix) - txCache := NewCache(nil, "") pendingHeaders, err := NewPendingHeaders(st, logger) if err != nil { @@ -124,8 +109,6 @@ func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manag impl := &implementation{ headerCache: headerCache, dataCache: dataCache, - txCache: txCache, - txTimestamps: new(sync.Map), pendingEvents: make(map[uint64]*common.DAHeightEvent), pendingHeaders: pendingHeaders, pendingData: pendingData, @@ -202,59 +185,6 @@ func (m *implementation) RemoveDataDAIncluded(hash string) { m.dataCache.removeDAIncluded(hash) } -func (m *implementation) IsTxSeen(hash string) bool { - return m.txCache.isSeen(hash) -} - -func (m *implementation) AreTxsSeen(hashes []string) []bool { - return m.txCache.areSeen(hashes) -} - -func (m *implementation) SetTxSeen(hash string) { - // Use 0 as height since transactions don't have a block height yet - m.txCache.setSeen(hash, 0) - // Track timestamp for cleanup purposes - m.txTimestamps.Store(hash, time.Now()) -} - -func (m *implementation) SetTxsSeen(hashes []string) { - m.txCache.setSeenBatch(hashes, 0) - now := time.Now() - for _, hash := range hashes { - m.txTimestamps.Store(hash, now) - } -} - -// CleanupOldTxs removes transaction hashes older than olderThan and returns -// the count removed. Defaults to DefaultTxCacheRetention if olderThan <= 0. -func (m *implementation) CleanupOldTxs(olderThan time.Duration) int { - if olderThan <= 0 { - olderThan = DefaultTxCacheRetention - } - - cutoff := time.Now().Add(-olderThan) - removed := 0 - - m.txTimestamps.Range(func(key, value any) bool { - hash, ok := key.(string) - if !ok { - return true - } - timestamp, ok := value.(time.Time) - if !ok { - return true - } - if timestamp.Before(cutoff) { - m.txCache.removeSeen(hash) - m.txTimestamps.Delete(hash) - removed++ - } - return true - }) - - return removed -} - // DeleteHeight removes from all caches the given height. // This can be done when a height has been da included. func (m *implementation) DeleteHeight(blockHeight uint64) { @@ -263,12 +193,6 @@ func (m *implementation) DeleteHeight(blockHeight uint64) { m.pendingMu.Lock() delete(m.pendingEvents, blockHeight) m.pendingMu.Unlock() - - // Note: txCache is intentionally NOT deleted here because: - // 1. Transactions are tracked by hash, not by block height (they use height 0) - // 2. A transaction seen at one height may be resubmitted at a different height - // 3. The cache prevents duplicate submissions across block heights - // 4. Cleanup is handled separately via CleanupOldTxs() based on time, not height } // Pending operations @@ -363,7 +287,7 @@ func (m *implementation) SaveToStore() error { return fmt.Errorf("failed to save data cache to store: %w", err) } - // TX cache and pending events are ephemeral - not persisted + // pending events are ephemeral - not persisted return nil } @@ -406,7 +330,6 @@ func (m *implementation) ClearFromStore() error { m.headerCache = NewCache(m.store, HeaderDAIncludedPrefix) m.dataCache = NewCache(m.store, DataDAIncludedPrefix) - m.txCache = NewCache(nil, "") m.pendingEvents = make(map[uint64]*common.DAHeightEvent) // Initialize DA height from store metadata to ensure DaHeight() is never 0. diff --git a/block/internal/cache/manager_test.go b/block/internal/cache/manager_test.go index fa5aebf34b..943c4cdaff 100644 --- a/block/internal/cache/manager_test.go +++ b/block/internal/cache/manager_test.go @@ -4,7 +4,6 @@ import ( "context" "encoding/binary" "testing" - "time" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -237,188 +236,6 @@ func TestPendingHeadersAndData_Flow(t *testing.T) { assert.Equal(t, uint64(1), cm.NumPendingData()) } -func TestManager_TxOperations(t *testing.T) { - t.Parallel() - cfg := tempConfig(t) - st := testMemStore(t) - - m, err := NewManager(cfg, st, zerolog.Nop()) - require.NoError(t, err) - - // Initially not seen - assert.False(t, m.IsTxSeen("tx1")) - assert.False(t, m.IsTxSeen("tx2")) - - // Mark as seen - m.SetTxSeen("tx1") - m.SetTxSeen("tx2") - - // Should now be seen - assert.True(t, m.IsTxSeen("tx1")) - assert.True(t, m.IsTxSeen("tx2")) -} - -func TestManager_CleanupOldTxs(t *testing.T) { - t.Parallel() - cfg := tempConfig(t) - st := testMemStore(t) - - m, err := NewManager(cfg, st, zerolog.Nop()) - require.NoError(t, err) - - // Add some transactions - m.SetTxSeen("tx1") - m.SetTxSeen("tx2") - m.SetTxSeen("tx3") - - // Verify all are seen - assert.True(t, m.IsTxSeen("tx1")) - assert.True(t, m.IsTxSeen("tx2")) - assert.True(t, m.IsTxSeen("tx3")) - - // Cleanup with a very short duration (should remove all) - removed := m.CleanupOldTxs(1 * time.Nanosecond) - assert.Equal(t, 3, removed) - - // All should now be gone - assert.False(t, m.IsTxSeen("tx1")) - assert.False(t, m.IsTxSeen("tx2")) - assert.False(t, m.IsTxSeen("tx3")) -} - -func TestManager_CleanupOldTxs_SelectiveRemoval(t *testing.T) { - t.Parallel() - cfg := tempConfig(t) - st := testMemStore(t) - - m, err := NewManager(cfg, st, zerolog.Nop()) - require.NoError(t, err) - - impl := m.(*implementation) - - // Add old transactions with backdated timestamps - oldTime := time.Now().Add(-48 * time.Hour) - impl.txCache.setSeen("old-tx1", 0) - impl.txTimestamps.Store("old-tx1", oldTime) - impl.txCache.setSeen("old-tx2", 0) - impl.txTimestamps.Store("old-tx2", oldTime) - - // Add recent transactions - m.SetTxSeen("new-tx1") - m.SetTxSeen("new-tx2") - - // Cleanup transactions older than 24 hours - removed := m.CleanupOldTxs(24 * time.Hour) - assert.Equal(t, 2, removed) - - // Old transactions should be gone - assert.False(t, m.IsTxSeen("old-tx1")) - assert.False(t, m.IsTxSeen("old-tx2")) - - // New transactions should still be present - assert.True(t, m.IsTxSeen("new-tx1")) - assert.True(t, m.IsTxSeen("new-tx2")) -} - -func TestManager_CleanupOldTxs_DefaultDuration(t *testing.T) { - t.Parallel() - cfg := tempConfig(t) - st := testMemStore(t) - - m, err := NewManager(cfg, st, zerolog.Nop()) - require.NoError(t, err) - - impl := m.(*implementation) - - // Add transactions with specific old timestamp (older than default 24h) - veryOldTime := time.Now().Add(-25 * time.Hour) - impl.txCache.setSeen("very-old-tx", 0) - impl.txTimestamps.Store("very-old-tx", veryOldTime) - - // Add recent transaction - m.SetTxSeen("recent-tx") - - // Call cleanup with 0 duration (should use default) - removed := m.CleanupOldTxs(0) - assert.Equal(t, 1, removed) - - // Very old transaction should be gone - assert.False(t, m.IsTxSeen("very-old-tx")) - - // Recent transaction should still be present - assert.True(t, m.IsTxSeen("recent-tx")) -} - -func TestManager_CleanupOldTxs_NoTransactions(t *testing.T) { - t.Parallel() - cfg := tempConfig(t) - st := testMemStore(t) - - m, err := NewManager(cfg, st, zerolog.Nop()) - require.NoError(t, err) - - // Cleanup on empty cache should return 0 - removed := m.CleanupOldTxs(24 * time.Hour) - assert.Equal(t, 0, removed) -} - -func TestManager_TxCache_NotPersistedToStore(t *testing.T) { - t.Parallel() - cfg := tempConfig(t) - st := testMemStore(t) - - // Create first manager and add transactions - m1, err := NewManager(cfg, st, zerolog.Nop()) - require.NoError(t, err) - - m1.SetTxSeen("persistent-tx1") - m1.SetTxSeen("persistent-tx2") - - assert.True(t, m1.IsTxSeen("persistent-tx1")) - assert.True(t, m1.IsTxSeen("persistent-tx2")) - - // Save to store - err = m1.SaveToStore() - require.NoError(t, err) - - // Create new manager - tx cache should be empty (not persisted) - m2, err := NewManager(cfg, st, zerolog.Nop()) - require.NoError(t, err) - - // TX cache is ephemeral and not persisted - assert.False(t, m2.IsTxSeen("persistent-tx1")) - assert.False(t, m2.IsTxSeen("persistent-tx2")) -} - -func TestManager_DeleteHeight_PreservesTxCache(t *testing.T) { - t.Parallel() - cfg := tempConfig(t) - st := testMemStore(t) - - m, err := NewManager(cfg, st, zerolog.Nop()) - require.NoError(t, err) - - // Add items to various caches at height 5 - m.SetHeaderSeen("header-5", 5) - m.SetDataSeen("data-5", 5) - m.SetTxSeen("tx-persistent") - - // Verify all are present - assert.True(t, m.IsHeaderSeen("header-5")) - assert.True(t, m.IsDataSeen("data-5")) - assert.True(t, m.IsTxSeen("tx-persistent")) - - // Delete height 5 - m.DeleteHeight(5) - - // Header and data should be gone - assert.False(t, m.IsHeaderSeen("header-5")) - assert.False(t, m.IsDataSeen("data-5")) - - // Transaction should still be present (height-independent) - assert.True(t, m.IsTxSeen("tx-persistent")) -} - func TestManager_DAInclusionPersistence(t *testing.T) { t.Parallel() cfg := tempConfig(t) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 753ec9b94b..088bc6d7be 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -87,11 +87,25 @@ type Executor struct { cancel context.CancelFunc wg sync.WaitGroup + // onBatchCommitted is called after a block is committed to ack drained queue entries. + // if the ack fails it is retried before the next block is produced. + onBatchCommitted func(ctx context.Context) error + + // pendingBatchAck is set when a committed block's batch ack failed and + // must be retried before producing the next block. + pendingBatchAck atomic.Bool + // blockProducer is the interface used for block production operations. // defaults to self, but can be wrapped with tracing. blockProducer BlockProducer } +// batchAcknowledger is implemented by sequencers whose drained queue +// entries must be acknowledged after the block is durably committed. +type batchAcknowledger interface { + AckBatch(ctx context.Context) error +} + // NewExecutor creates a new block executor. // The executor is responsible for: // - Block production from sequencer batches @@ -154,6 +168,17 @@ func NewExecutor( logger: logger.With().Str("component", "executor").Logger(), } e.blockProducer = e + + // wire the batch ack so drained queue entries are committed after block + // commit. tracing wrappers forward AckBatch to the underlying sequencer. + if acker, ok := sequencer.(batchAcknowledger); ok { + e.onBatchCommitted = acker.AckBatch + } else if !config.Node.BasedSequencer { + // without an ack, drained queue entries are rolled back on every + // retrieval and the same transactions would be re-included each block + e.logger.Warn().Msg("sequencer does not implement AckBatch; drained batch entries will not be acknowledged after block commit") + } + return e, nil } @@ -163,6 +188,20 @@ func (e *Executor) SetBlockProducer(bp BlockProducer) { e.blockProducer = bp } +// ackCommittedBatch invokes the batch ack callback and tracks failures so +// they can be retried before the next block is produced. +func (e *Executor) ackCommittedBatch(ctx context.Context) error { + if e.onBatchCommitted == nil { + return nil + } + if err := e.onBatchCommitted(ctx); err != nil { + e.pendingBatchAck.Store(true) + return err + } + e.pendingBatchAck.Store(false) + return nil +} + // Start begins the execution component func (e *Executor) Start(ctx context.Context) (err error) { if e.cancel != nil { @@ -471,6 +510,15 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { return errors.New("raft cluster does not have quorum") } + // retry a failed ack from the previous block before retrieving a new batch. + // without this, the un-acked drained entries would be rolled back and + // re-included in the next block. + if e.pendingBatchAck.Load() { + if err := e.ackCommittedBatch(ctx); err != nil { + return fmt.Errorf("failed to ack previously committed batch: %w", err) + } + } + currentState := e.getLastState() newHeight := currentState.LastBlockHeight + 1 @@ -621,6 +669,12 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { // Update in-memory state after successful commit e.setLastState(newState) + // ack the drained queue entries now that the block is durably committed. + // failure is not fatal: the ack is retried before the next block. + if err := e.ackCommittedBatch(ctx); err != nil { + e.logger.Warn().Err(err).Msg("failed to ack batch after commit, will retry before next block") + } + // Update last-block cache so the next CreateBlock avoids a store read. e.lastBlockInfo.Store(&lastBlockInfo{ headerHash: newState.LastHeaderHash, diff --git a/block/internal/executing/executor_logic_test.go b/block/internal/executing/executor_logic_test.go index 1498bf5f79..18ec951d3e 100644 --- a/block/internal/executing/executor_logic_test.go +++ b/block/internal/executing/executor_logic_test.go @@ -122,6 +122,61 @@ func TestPendingLimit_SkipsProduction(t *testing.T) { assert.Equal(t, h1, h2, "height should not change when production is skipped") } +func TestProduceBlock_RetriesFailedAckBeforeNextBlock(t *testing.T) { + fx := setupTestExecutor(t, 1000) + defer fx.Cancel() + + var ackCalls int + fx.Exec.onBatchCommitted = func(ctx context.Context) error { + ackCalls++ + if ackCalls <= 2 { + return assert.AnError + } + return nil + } + + fx.MockSeq.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")). + RunAndReturn(func(ctx context.Context, req coreseq.GetNextBatchRequest) (*coreseq.GetNextBatchResponse, error) { + return &coreseq.GetNextBatchResponse{Batch: &coreseq.Batch{Transactions: nil}, Timestamp: time.Now()}, nil + }).Once() + fx.MockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.AnythingOfType("time.Time"), fx.InitStateRoot). + Return([]byte("root1"), nil).Once() + fx.MockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() + + // block 1 commits, but the post-commit ack fails (non-fatal) + require.NoError(t, fx.Exec.ProduceBlock(fx.Exec.ctx)) + assert.Equal(t, 1, ackCalls) + assert.True(t, fx.Exec.pendingBatchAck.Load()) + + // the pending ack is retried before draining a new batch; if the retry + // fails again block production aborts without calling GetNextBatch + err := fx.Exec.ProduceBlock(fx.Exec.ctx) + require.Error(t, err) + assert.Equal(t, 2, ackCalls) + assert.True(t, fx.Exec.pendingBatchAck.Load()) + + h, err := fx.MemStore.Height(context.Background()) + require.NoError(t, err) + assert.Equal(t, uint64(1), h, "no block should be produced while the ack retry fails") + + // once the retry succeeds production resumes: retry ack + post-commit ack + fx.MockSeq.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")). + RunAndReturn(func(ctx context.Context, req coreseq.GetNextBatchRequest) (*coreseq.GetNextBatchResponse, error) { + return &coreseq.GetNextBatchResponse{Batch: &coreseq.Batch{Transactions: nil}, Timestamp: time.Now()}, nil + }).Once() + fx.MockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(2), mock.AnythingOfType("time.Time"), []byte("root1")). + Return([]byte("root2"), nil).Once() + fx.MockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() + + require.NoError(t, fx.Exec.ProduceBlock(fx.Exec.ctx)) + assert.Equal(t, 4, ackCalls) + assert.False(t, fx.Exec.pendingBatchAck.Load()) + + h, err = fx.MemStore.Height(context.Background()) + require.NoError(t, err) + assert.Equal(t, uint64(2), h) +} + func TestExecutor_executeTxsWithRetry(t *testing.T) { t.Parallel() diff --git a/block/internal/reaping/bench_test.go b/block/internal/reaping/bench_test.go index 5ec0aaa69d..a7a268f842 100644 --- a/block/internal/reaping/bench_test.go +++ b/block/internal/reaping/bench_test.go @@ -9,30 +9,13 @@ import ( "testing" "time" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" "github.com/rs/zerolog" - "github.com/evstack/ev-node/block/internal/cache" coreexecutor "github.com/evstack/ev-node/core/execution" coresequencer "github.com/evstack/ev-node/core/sequencer" - "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/store" ) -func newBenchCache(b *testing.B) cache.CacheManager { - b.Helper() - cfg := config.Config{RootDir: b.TempDir()} - memDS := dssync.MutexWrap(ds.NewMapDatastore()) - st := store.New(memDS) - cm, err := cache.NewManager(cfg, st, zerolog.Nop()) - if err != nil { - b.Fatal(err) - } - return cm -} - type infiniteExecutor struct { mu sync.Mutex batch [][]byte @@ -105,7 +88,6 @@ func benchmarkReaperFlow(b *testing.B, batchSize int, txSize int, feedInterval t exec := &infiniteExecutor{} seq := &countingSequencer{} - cm := newBenchCache(b) var notified atomic.Int64 r, err := NewReaper( @@ -113,7 +95,6 @@ func benchmarkReaperFlow(b *testing.B, batchSize int, txSize int, feedInterval t seq, genesis.Genesis{ChainID: "bench"}, zerolog.Nop(), - cm, 50*time.Millisecond, func() { notified.Add(1) }, ) @@ -191,13 +172,12 @@ func BenchmarkReaperFlow_Sustained(b *testing.B) { b.Run("steady_100txs_256B", func(b *testing.B) { exec := &infiniteExecutor{} seq := &countingSequencer{} - cm := newBenchCache(b) var notified atomic.Int64 r, err := NewReaper( exec, seq, genesis.Genesis{ChainID: "bench"}, - zerolog.Nop(), cm, + zerolog.Nop(), 10*time.Millisecond, func() { notified.Add(1) }, ) @@ -250,13 +230,12 @@ func BenchmarkReaperFlow_StartStop(b *testing.B) { b.Run("lifecycle", func(b *testing.B) { exec := &infiniteExecutor{} seq := &countingSequencer{} - cm := newBenchCache(b) for i := 0; i < b.N; i++ { r, err := NewReaper( exec, seq, genesis.Genesis{ChainID: "bench"}, - zerolog.Nop(), cm, + zerolog.Nop(), 100*time.Millisecond, func() {}, ) @@ -283,7 +262,6 @@ func BenchmarkReaperFlow_StartStop(b *testing.B) { exec := &infiniteExecutor{} seq := &countingSequencer{} - cm := newBenchCache(b) txs := make([][]byte, batchSize) for j := range txs { @@ -297,7 +275,7 @@ func BenchmarkReaperFlow_StartStop(b *testing.B) { r, err := NewReaper( exec, seq, genesis.Genesis{ChainID: "bench"}, - zerolog.Nop(), cm, + zerolog.Nop(), 10*time.Millisecond, func() {}, ) diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index 1f51c08f10..9616c8c5c9 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -2,17 +2,13 @@ package reaping import ( "context" - "crypto/sha256" - "encoding/hex" "errors" "fmt" - "runtime" "sync" "time" "github.com/rs/zerolog" - "github.com/evstack/ev-node/block/internal/cache" coreexecutor "github.com/evstack/ev-node/core/execution" coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/genesis" @@ -21,20 +17,15 @@ import ( const ( // MaxBackoffInterval is the maximum backoff interval for retries MaxBackoffInterval = 30 * time.Second - - // CleanupInterval is how often the reaper sweeps expired hashes - // out of the seen-tx cache. - CleanupInterval = max(cache.DefaultTxCacheRetention/10, 15*time.Second) ) -// Reaper is responsible for periodically retrieving transactions from the executor, -// filtering out already seen transactions, and submitting new transactions to the sequencer. +// Reaper is responsible for periodically retrieving transactions from the executor +// and submitting them to the sequencer. type Reaper struct { exec coreexecutor.Executor sequencer coresequencer.Sequencer chainID string interval time.Duration - cache cache.CacheManager onTxsSubmitted func() logger zerolog.Logger @@ -49,13 +40,9 @@ func NewReaper( sequencer coresequencer.Sequencer, genesis genesis.Genesis, logger zerolog.Logger, - cache cache.CacheManager, scrapeInterval time.Duration, onTxsSubmitted func(), ) (*Reaper, error) { - if cache == nil { - return nil, errors.New("cache cannot be nil") - } if scrapeInterval == 0 { return nil, errors.New("scrape interval cannot be empty") } @@ -66,7 +53,6 @@ func NewReaper( chainID: genesis.ChainID, interval: scrapeInterval, logger: logger.With().Str("component", "reaper").Logger(), - cache: cache, onTxsSubmitted: onTxsSubmitted, }, nil } @@ -85,13 +71,10 @@ func (r *Reaper) Start(ctx context.Context) error { } func (r *Reaper) reaperLoop() { - cleanupTicker := time.NewTicker(CleanupInterval) - defer cleanupTicker.Stop() - consecutiveFailures := 0 for { - submitted, err := r.drainMempool(cleanupTicker.C) + err := r.drainMempool() if err != nil && r.ctx.Err() == nil { consecutiveFailures++ @@ -113,11 +96,6 @@ func (r *Reaper) reaperLoop() { consecutiveFailures = 0 } - if submitted { - runtime.Gosched() - continue - } - if r.wait(r.interval) { return } @@ -146,78 +124,33 @@ func (r *Reaper) Stop() error { return nil } -func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) { - var totalSubmitted int - - defer func() { - if totalSubmitted > 0 && r.onTxsSubmitted != nil { - r.onTxsSubmitted() - } - }() - - for { - select { - case <-cleanupCh: - removed := r.cache.CleanupOldTxs(cache.DefaultTxCacheRetention) - if removed > 0 { - r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes") - } - default: - } - - txs, err := r.exec.GetTxs(r.ctx) - if err != nil { - return totalSubmitted > 0, fmt.Errorf("failed to get txs from executor: %w", err) - } - if len(txs) == 0 { - break - } - - hashes := hashTxs(txs) - seen := r.cache.AreTxsSeen(hashes) - - newTxs := make([][]byte, 0, len(txs)) - newHashes := make([]string, 0, len(txs)) - for i, tx := range txs { - if !seen[i] { - newTxs = append(newTxs, tx) - newHashes = append(newHashes, hashes[i]) - } - } - - if len(newTxs) == 0 { - break - } - - _, err = r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{ - Id: []byte(r.chainID), - Batch: &coresequencer.Batch{Transactions: newTxs}, - }) - if err != nil { - return totalSubmitted > 0, fmt.Errorf("failed to submit txs to sequencer: %w", err) - } - - r.cache.SetTxsSeen(newHashes) - totalSubmitted += len(newTxs) +func (r *Reaper) drainMempool() error { + txs, err := r.exec.GetTxs(r.ctx) + if err != nil { + return fmt.Errorf("failed to get txs from executor: %w", err) } - - if totalSubmitted > 0 { - r.logger.Debug().Int("total_txs", totalSubmitted).Msg("drained mempool") + if len(txs) == 0 { + return nil } - return totalSubmitted > 0, nil -} + _, err = r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte(r.chainID), + Batch: &coresequencer.Batch{Transactions: txs}, + }) + if err != nil { + return fmt.Errorf("failed to submit txs to sequencer: %w", err) + } -func hashTxs(txs [][]byte) []string { - hashes := make([]string, len(txs)) - for i, tx := range txs { - h := sha256.Sum256(tx) - hashes[i] = hex.EncodeToString(h[:]) + // the sequencer dedups resubmitted txs, so this may notify for txs that + // were already queued. at worst this triggers an unnecessary (possibly + // empty) block in lazy mode. + if r.onTxsSubmitted != nil { + r.onTxsSubmitted() } - return hashes -} -func hashTx(tx []byte) string { - h := sha256.Sum256(tx) - return hex.EncodeToString(h[:]) + r.logger.Debug(). + Int("seen_txs", len(txs)). + Msg("drained mempool") + + return nil } diff --git a/block/internal/reaping/reaper_test.go b/block/internal/reaping/reaper_test.go index 6bf4426d4a..6aa682bf76 100644 --- a/block/internal/reaping/reaper_test.go +++ b/block/internal/reaping/reaper_test.go @@ -6,35 +6,19 @@ import ( "testing" "time" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/evstack/ev-node/block/internal/cache" coresequencer "github.com/evstack/ev-node/core/sequencer" - "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/store" testmocks "github.com/evstack/ev-node/test/mocks" ) -func newTestCache(t *testing.T) cache.CacheManager { - t.Helper() - cfg := config.Config{RootDir: t.TempDir()} - memDS := dssync.MutexWrap(ds.NewMapDatastore()) - st := store.New(memDS) - cm, err := cache.NewManager(cfg, st, zerolog.Nop()) - require.NoError(t, err) - return cm -} - type testEnv struct { execMock *testmocks.MockExecutor seqMock *testmocks.MockSequencer - cache cache.CacheManager reaper *Reaper notified atomic.Bool } @@ -43,18 +27,16 @@ func newTestEnv(t *testing.T) *testEnv { t.Helper() mockExec := testmocks.NewMockExecutor(t) mockSeq := testmocks.NewMockSequencer(t) - cm := newTestCache(t) env := &testEnv{ execMock: mockExec, seqMock: mockSeq, - cache: cm, } r, err := NewReaper( mockExec, mockSeq, genesis.Genesis{ChainID: "test-chain"}, - zerolog.Nop(), cm, + zerolog.Nop(), 100*time.Millisecond, env.notify, ) @@ -72,14 +54,13 @@ func (e *testEnv) wasNotified() bool { return e.notified.Load() } -func TestReaper_NewTxs_SubmitsAndPersistsAndNotifies(t *testing.T) { +func TestReaper_NewTxs_SubmitsAndNotifies(t *testing.T) { env := newTestEnv(t) tx1 := []byte("tx1") tx2 := []byte("tx2") env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2}, nil).Once() - env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). RunAndReturn(func(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { @@ -87,57 +68,12 @@ func TestReaper_NewTxs_SubmitsAndPersistsAndNotifies(t *testing.T) { return &coresequencer.SubmitBatchTxsResponse{}, nil }).Once() - submitted, err := env.reaper.drainMempool(nil) + err := env.reaper.drainMempool() assert.NoError(t, err) - assert.True(t, submitted) - assert.True(t, env.cache.IsTxSeen(hashTx(tx1))) - assert.True(t, env.cache.IsTxSeen(hashTx(tx2))) assert.True(t, env.wasNotified()) } -func TestReaper_AllSeen_NoSubmit(t *testing.T) { - env := newTestEnv(t) - - tx1 := []byte("tx1") - tx2 := []byte("tx2") - - env.cache.SetTxSeen(hashTx(tx1)) - env.cache.SetTxSeen(hashTx(tx2)) - - env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2}, nil).Once() - - submitted, err := env.reaper.drainMempool(nil) - assert.NoError(t, err) - assert.False(t, submitted) - assert.False(t, env.wasNotified()) -} - -func TestReaper_PartialSeen_FiltersAndPersists(t *testing.T) { - env := newTestEnv(t) - - txOld := []byte("old") - txNew := []byte("new") - - env.cache.SetTxSeen(hashTx(txOld)) - - env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{txOld, txNew}, nil).Once() - env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() - - env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). - RunAndReturn(func(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { - assert.Equal(t, [][]byte{txNew}, req.Batch.Transactions) - return &coresequencer.SubmitBatchTxsResponse{}, nil - }).Once() - - submitted, err := env.reaper.drainMempool(nil) - assert.NoError(t, err) - assert.True(t, submitted) - assert.True(t, env.cache.IsTxSeen(hashTx(txOld))) - assert.True(t, env.cache.IsTxSeen(hashTx(txNew))) - assert.True(t, env.wasNotified()) -} - -func TestReaper_SequencerError_NoPersistence_NoNotify(t *testing.T) { +func TestReaper_SequencerError_NoNotify(t *testing.T) { env := newTestEnv(t) tx := []byte("oops") @@ -147,75 +83,50 @@ func TestReaper_SequencerError_NoPersistence_NoNotify(t *testing.T) { env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). Return((*coresequencer.SubmitBatchTxsResponse)(nil), assert.AnError).Once() - _, err := env.reaper.drainMempool(nil) + err := env.reaper.drainMempool() assert.Error(t, err) - assert.False(t, env.cache.IsTxSeen(hashTx(tx))) assert.False(t, env.wasNotified()) } -func TestReaper_DrainsMempoolInMultipleRounds(t *testing.T) { - env := newTestEnv(t) - - tx1 := []byte("tx1") - tx2 := []byte("tx2") - tx3 := []byte("tx3") - - env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2}, nil).Once() - env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx3}, nil).Once() - env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() - - env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). - RunAndReturn(func(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { - return &coresequencer.SubmitBatchTxsResponse{}, nil - }).Twice() - - submitted, err := env.reaper.drainMempool(nil) - assert.NoError(t, err) - assert.True(t, submitted) - assert.True(t, env.cache.IsTxSeen(hashTx(tx1))) - assert.True(t, env.cache.IsTxSeen(hashTx(tx2))) - assert.True(t, env.cache.IsTxSeen(hashTx(tx3))) - assert.True(t, env.wasNotified()) -} - func TestReaper_EmptyMempool_NoAction(t *testing.T) { env := newTestEnv(t) env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() - submitted, err := env.reaper.drainMempool(nil) + err := env.reaper.drainMempool() assert.NoError(t, err) - assert.False(t, submitted) assert.False(t, env.wasNotified()) } -func TestReaper_HashComputedOnce(t *testing.T) { +func TestReaper_SinglePass_SubmitsAll(t *testing.T) { env := newTestEnv(t) - tx := []byte("unique-tx") - expectedHash := hashTx(tx) + tx1 := []byte("tx1") + tx2 := []byte("tx2") + tx3 := []byte("tx3") - env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx}, nil).Once() - env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() + // single GetTxs call returns all txs + env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2, tx3}, nil).Once() env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). - Return(&coresequencer.SubmitBatchTxsResponse{}, nil).Once() + RunAndReturn(func(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { + assert.Equal(t, [][]byte{tx1, tx2, tx3}, req.Batch.Transactions) + return &coresequencer.SubmitBatchTxsResponse{}, nil + }).Once() - submitted, err := env.reaper.drainMempool(nil) + err := env.reaper.drainMempool() assert.NoError(t, err) - assert.True(t, submitted) - assert.True(t, env.cache.IsTxSeen(expectedHash)) + assert.True(t, env.wasNotified()) } func TestReaper_NilCallback_NoPanic(t *testing.T) { mockExec := testmocks.NewMockExecutor(t) mockSeq := testmocks.NewMockSequencer(t) - cm := newTestCache(t) r, err := NewReaper( mockExec, mockSeq, genesis.Genesis{ChainID: "test-chain"}, - zerolog.Nop(), cm, + zerolog.Nop(), 100*time.Millisecond, nil, ) @@ -223,13 +134,11 @@ func TestReaper_NilCallback_NoPanic(t *testing.T) { tx := []byte("tx") mockExec.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx}, nil).Once() - mockExec.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() mockSeq.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). Return(&coresequencer.SubmitBatchTxsResponse{}, nil).Once() - submitted, err := r.drainMempool(nil) + err = r.drainMempool() assert.NoError(t, err) - assert.True(t, submitted) } func TestReaper_StopTerminates(t *testing.T) { diff --git a/pkg/sequencers/single/queue.go b/pkg/sequencers/single/queue.go index 6940e66c7f..1aad952dc8 100644 --- a/pkg/sequencers/single/queue.go +++ b/pkg/sequencers/single/queue.go @@ -2,14 +2,17 @@ package single import ( "context" + "crypto/sha256" "encoding/binary" "encoding/hex" "fmt" "strconv" "sync" + "time" ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" + "github.com/rs/zerolog" "google.golang.org/protobuf/proto" coresequencer "github.com/evstack/ev-node/core/sequencer" @@ -37,24 +40,48 @@ type BatchQueue struct { head int // index of the first element in the queue maxQueueSize int // maximum number of batches allowed in queue (0 = unlimited) + // inFlight holds items returned by Drain that haven't been acked yet. + // A subsequent Drain rolls these back to the front of the queue. + inFlight []queuedItem + + // inFlightPostponed holds txs that should be requeued on Ack. + // Set via SetPostponed between Drain and Ack. Cleared only on successful Ack. + inFlightPostponed [][]byte + // postponedItem holds a postponed batch persisted to the WAL during Ack. + // It is only prepended to the in-memory queue once Ack fully succeeds, so + // a direct Ack retry does not persist a duplicate entry. If a Drain rolls + // the in-flight state back instead, the entry is discarded again because + // its txs are still covered by the rolled-back WAL entries. + postponedItem *queuedItem + + // txSeen is an in-memory dedup set keyed by sha256 hash of each tx. + // hashes are added in AddBatch and removed on successful Ack. + // prevents the reaper from enqueuing the same tx multiple scrape cycles. + // its size tracks queued + in-flight txs, so it is bounded indirectly + // by maxQueueSize (unbounded when maxQueueSize is 0). + txSeen map[[32]byte]struct{} + // Sequence numbers for generating new keys nextAddSeq uint64 nextPrependSeq uint64 - mu sync.Mutex - db ds.Batching + mu sync.Mutex + db ds.Batching + logger zerolog.Logger } // NewBatchQueue creates a new BatchQueue with the specified maximum size. // If maxSize is 0, the queue will be unlimited. -func NewBatchQueue(db ds.Batching, prefix string, maxSize int) *BatchQueue { +func NewBatchQueue(db ds.Batching, prefix string, maxSize int, logger zerolog.Logger) *BatchQueue { return &BatchQueue{ queue: make([]queuedItem, 0), head: 0, maxQueueSize: maxSize, + txSeen: make(map[[32]byte]struct{}), db: store.NewPrefixKVStore(db, prefix), nextAddSeq: initialSeqNum, nextPrependSeq: initialSeqNum - 1, + logger: logger, } } @@ -66,99 +93,348 @@ func seqToKey(seq uint64) string { return hex.EncodeToString(b) } -// AddBatch adds a new transaction to the queue and writes it to the WAL. +// txHash returns the sha256 of a transaction. +func txHash(tx []byte) [32]byte { + return sha256.Sum256(tx) +} + +// AddBatch adds a new transaction batch to the queue and writes it to the WAL. +// Duplicate transactions (by hash) already in the queue or in-flight are silently skipped. // Returns ErrQueueFull if the queue has reached its maximum size. func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) error { bq.mu.Lock() defer bq.mu.Unlock() + // dedup: skip txs already queued or in-flight, track accepted hashes immediately + unique := make([][]byte, 0, len(batch.Transactions)) + hashes := make([][32]byte, 0, len(batch.Transactions)) + for _, tx := range batch.Transactions { + h := txHash(tx) + if _, dup := bq.txSeen[h]; dup { + continue + } + unique = append(unique, tx) + hashes = append(hashes, h) + bq.txSeen[h] = struct{}{} + } + if len(unique) == 0 { + return nil + } + batch = coresequencer.Batch{Transactions: unique} + // Check if queue is full (maxQueueSize of 0 means unlimited) - // Use effective queue size (total length minus processed head items) - effectiveSize := len(bq.queue) - bq.head + // effective size includes both queued and drained-but-unacked entries + effectiveSize := len(bq.queue) - bq.head + len(bq.inFlight) if bq.maxQueueSize > 0 && effectiveSize >= bq.maxQueueSize { + bq.rollbackSeenLocked(hashes) return ErrQueueFull } key := seqToKey(bq.nextAddSeq) if err := bq.persistBatch(ctx, batch, key); err != nil { - return err + bq.rollbackSeenLocked(hashes) + return fmt.Errorf("failed to persist batch %s to WAL: %w", key, err) } bq.nextAddSeq++ - // Then add to in-memory queue bq.queue = append(bq.queue, queuedItem{Batch: batch, Key: key}) return nil } -// Prepend adds a batch to the front of the queue (before head position). -// This is used to return transactions that couldn't fit in the current batch. -// The batch is persisted to the DB to ensure durability in case of crashes. -// -// NOTE: Prepend intentionally bypasses the maxQueueSize limit to ensure high-priority -// transactions can always be re-queued. -func (bq *BatchQueue) Prepend(ctx context.Context, batch coresequencer.Batch) error { +// rollbackSeenLocked removes the given hashes from the dedup set. +// Must be called with bq.mu held. +func (bq *BatchQueue) rollbackSeenLocked(hashes [][32]byte) { + for _, h := range hashes { + delete(bq.txSeen, h) + } +} + +// Drain merges multiple queue entries into a single batch up to maxBytes. +// If maxBytes is 0, all available entries are drained. +// Any previously un-acked inFlight entries are rolled back to the front first. +// Drained entries move to inFlight state; WAL entries are NOT deleted until Ack. +func (bq *BatchQueue) Drain(ctx context.Context, maxBytes uint64) (*coresequencer.Batch, error) { bq.mu.Lock() defer bq.mu.Unlock() - key := seqToKey(bq.nextPrependSeq) - if err := bq.persistBatch(ctx, batch, key); err != nil { - return err + bq.rollbackInFlightLocked(ctx) + + if bq.head >= len(bq.queue) { + return &coresequencer.Batch{Transactions: nil}, nil + } + + var totalBytes uint64 + var allTxs [][]byte + + for bq.head < len(bq.queue) { + item := bq.queue[bq.head] + + var entryBytes uint64 + for _, tx := range item.Batch.Transactions { + entryBytes += uint64(len(tx)) + } + + if maxBytes > 0 && totalBytes+entryBytes > maxBytes && len(allTxs) > 0 { + break + } + + allTxs = append(allTxs, item.Batch.Transactions...) + totalBytes += entryBytes + bq.inFlight = append(bq.inFlight, item) + bq.queue[bq.head] = queuedItem{} + bq.head++ + } + + bq.compactLocked() + + if len(allTxs) == 0 { + return &coresequencer.Batch{Transactions: nil}, nil + } + + return &coresequencer.Batch{Transactions: allTxs}, nil +} + +// SetPostponed records txs that should be requeued on the next Ack. +// Must be called at most once between a Drain and its Ack — a later call in +// the same cycle would overwrite the recorded txs. The postponedItem guard +// makes a repeated call after a failed Ack a no-op, so the already-persisted +// entry is not replaced. The queue owns this state so it is only cleared on +// successful Ack — no data loss on failure. +func (bq *BatchQueue) SetPostponed(txs [][]byte) { + bq.mu.Lock() + defer bq.mu.Unlock() + if bq.postponedItem != nil { + return + } + bq.inFlightPostponed = txs +} + +// Ack commits the current inFlight entries: durably requeues any postponed +// transactions first, then deletes committed WAL entries. On failure neither +// inFlight nor inFlightPostponed is cleared, so the next Drain will roll +// entries back and a retry is safe. +func (bq *BatchQueue) Ack(ctx context.Context) error { + bq.mu.Lock() + defer bq.mu.Unlock() + + // persist postponed txs BEFORE deleting source WAL entries. + // if this fails the original entries still exist — no data loss. + // the item is only prepended to the in-memory queue after the WAL + // deletes succeed, so a rollback never sees its txs twice. + if len(bq.inFlightPostponed) > 0 && bq.postponedItem == nil { + batch := coresequencer.Batch{Transactions: bq.inFlightPostponed} + key := seqToKey(bq.nextPrependSeq) + if err := bq.persistBatch(ctx, batch, key); err != nil { + return fmt.Errorf("failed to persist postponed txs: %w", err) + } + bq.nextPrependSeq-- + bq.postponedItem = &queuedItem{Batch: batch, Key: key} + } + + // delete WAL entries for committed inFlight items in one batch. + // on failure, return error WITHOUT clearing state so next Drain + // rolls them back and they can be retried. + if len(bq.inFlight) > 0 { + b, err := bq.db.Batch(ctx) + if err != nil { + return fmt.Errorf("failed to create WAL delete batch: %w", err) + } + for _, item := range bq.inFlight { + if err := b.Delete(ctx, ds.NewKey(item.Key)); err != nil { + return fmt.Errorf("failed to delete committed WAL entry %s: %w", item.Key, err) + } + } + if err := b.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit WAL deletes: %w", err) + } } - bq.nextPrependSeq-- - item := queuedItem{Batch: batch, Key: key} + // success — remove committed tx hashes from dedup set. + // postponed txs are a subset of inFlight but stay in txSeen + // since they're re-queued via the prepended item. + var postponed map[[32]byte]struct{} + if len(bq.inFlightPostponed) > 0 { + postponed = make(map[[32]byte]struct{}, len(bq.inFlightPostponed)) + for _, tx := range bq.inFlightPostponed { + postponed[txHash(tx)] = struct{}{} + } + } + for _, item := range bq.inFlight { + for _, tx := range item.Batch.Transactions { + h := txHash(tx) + if _, ok := postponed[h]; ok { + continue + } + delete(bq.txSeen, h) + } + } + + // requeue the persisted postponed entry now that the commit is durable + if bq.postponedItem != nil { + bq.prependItemLocked(*bq.postponedItem) + } - // Then add to in-memory queue - // If we have room before head, use it + clear(bq.inFlight) + bq.inFlight = bq.inFlight[:0] + bq.inFlightPostponed = nil + bq.postponedItem = nil + + return nil +} + +// prependItemLocked inserts an item at the front of the queue. +// Must be called with bq.mu held. +func (bq *BatchQueue) prependItemLocked(item queuedItem) { if bq.head > 0 { bq.head-- bq.queue[bq.head] = item } else { - // Need to expand the queue at the front bq.queue = append([]queuedItem{item}, bq.queue...) } - - return nil } -// Next extracts a batch of transactions from the queue and marks it as processed in the WAL -func (bq *BatchQueue) Next(ctx context.Context) (*coresequencer.Batch, error) { - bq.mu.Lock() - defer bq.mu.Unlock() +// rollbackInFlightLocked moves un-acked inFlight items back to the front of the queue. +// Postponed state is discarded: the postponed txs are still covered by the +// rolled-back WAL entries, so a persisted postponed entry would duplicate +// them and is deleted (best-effort; Load dedups any leftover on restart). +// The caller is expected to make a fresh SetPostponed decision after the +// next Drain. Must be called with bq.mu held. +func (bq *BatchQueue) rollbackInFlightLocked(ctx context.Context) { + if len(bq.inFlight) == 0 { + return + } - // Check if queue is empty - if bq.head >= len(bq.queue) { - return &coresequencer.Batch{Transactions: nil}, nil + if bq.postponedItem != nil { + // detach from the caller's context so this best-effort cleanup still + // runs during graceful shutdown when the drain context is cancelled + delCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second) + defer cancel() + if err := bq.db.Delete(delCtx, ds.NewKey(bq.postponedItem.Key)); err != nil { + bq.logger.Warn().Err(err).Str("key", bq.postponedItem.Key). + Msg("failed to delete rolled-back postponed WAL entry") + } + bq.postponedItem = nil } + bq.inFlightPostponed = nil - item := bq.queue[bq.head] - // Release memory for the dequeued element - bq.queue[bq.head] = queuedItem{} - bq.head++ + if bq.head >= len(bq.inFlight) { + // enough head slots — fill them directly + for i := len(bq.inFlight) - 1; i >= 0; i-- { + bq.head-- + bq.queue[bq.head] = bq.inFlight[i] + } + } else { + // not enough head slots — single bulk prepend O(n) + tail := bq.queue[bq.head:] + newQueue := make([]queuedItem, 0, len(bq.inFlight)+len(tail)) + newQueue = append(newQueue, bq.inFlight...) + newQueue = append(newQueue, tail...) + bq.queue = newQueue + bq.head = 0 + } + + clear(bq.inFlight) + bq.inFlight = bq.inFlight[:0] +} - // Compact when head gets too large to prevent memory leaks - // Only compact when we have significant waste (more than half processed) - // and when we have a reasonable number of processed items to avoid - // frequent compactions on small queues +// compactLocked compacts the queue when head gets too large. +// Must be called with bq.mu held. +func (bq *BatchQueue) compactLocked() { if bq.head > len(bq.queue)/2 && bq.head > 100 { remaining := copy(bq.queue, bq.queue[bq.head:]) - // Zero out the rest of the slice for i := remaining; i < len(bq.queue); i++ { bq.queue[i] = queuedItem{} } bq.queue = bq.queue[:remaining] bq.head = 0 } +} - // Delete the batch from the WAL since it's been processed - // Use the stored key directly - if err := bq.db.Delete(ctx, ds.NewKey(item.Key)); err != nil { - // Log the error but continue - fmt.Printf("Error deleting processed batch: %v\n", err) +// dedupAndEnqueueLocked filters duplicate txs from batch and enqueues the remainder. +// The WAL is kept in sync: fully-duplicate entries are deleted and partially-duplicate +// entries are rewritten, so stale duplicate txs cannot be resurrected by a later reload. +// Cleanup failures are non-fatal — the filtered in-memory state stays authoritative. +// Must be called with bq.mu held. +func (bq *BatchQueue) dedupAndEnqueueLocked(ctx context.Context, batch coresequencer.Batch, key string) { + filtered := make([][]byte, 0, len(batch.Transactions)) + for _, tx := range batch.Transactions { + h := txHash(tx) + if _, dup := bq.txSeen[h]; dup { + continue + } + filtered = append(filtered, tx) + bq.txSeen[h] = struct{}{} + } + + switch { + case len(filtered) == 0: + if err := bq.db.Delete(ctx, ds.NewKey(key)); err != nil { + bq.logger.Error().Err(err).Str("key", key).Msg("failed to delete duplicate WAL entry") + } + return + case len(filtered) < len(batch.Transactions): + batch = coresequencer.Batch{Transactions: filtered} + if err := bq.persistBatch(ctx, batch, key); err != nil { + bq.logger.Error().Err(err).Str("key", key).Msg("failed to rewrite partially duplicate WAL entry") + } } - return &item.Batch, nil + bq.queue = append(bq.queue, queuedItem{Batch: batch, Key: key}) +} + +// DropIncluded removes the given transactions from queued entries and the WAL. +// It reconciles a crash between block commit and Ack: after a restart the WAL +// may still hold entries whose txs were already committed in the last block. +// Entries are rewritten in place (or deleted when emptied) so a subsequent +// reload stays consistent. Returns the number of dropped transactions. +// It must be called on a freshly loaded queue: only queued entries are +// scanned, so any in-flight entries would be missed. +func (bq *BatchQueue) DropIncluded(ctx context.Context, included [][]byte) (int, error) { + bq.mu.Lock() + defer bq.mu.Unlock() + + includedSet := make(map[[32]byte]struct{}, len(included)) + for _, tx := range included { + includedSet[txHash(tx)] = struct{}{} + } + + var dropped int + // in-place compaction: kept aliases bq.queue's backing array. This is safe + // because range evaluates the slice header once and writes always trail + // reads (at most one item is appended per item iterated). + kept := bq.queue[:bq.head] + for _, item := range bq.queue[bq.head:] { + remaining := make([][]byte, 0, len(item.Batch.Transactions)) + for _, tx := range item.Batch.Transactions { + h := txHash(tx) + if _, ok := includedSet[h]; ok { + delete(bq.txSeen, h) + dropped++ + continue + } + remaining = append(remaining, tx) + } + + switch { + case len(remaining) == len(item.Batch.Transactions): + // nothing dropped, keep as is + case len(remaining) == 0: + if err := bq.db.Delete(ctx, ds.NewKey(item.Key)); err != nil { + return dropped, fmt.Errorf("failed to delete included WAL entry %s: %w", item.Key, err) + } + continue + default: + item.Batch = coresequencer.Batch{Transactions: remaining} + if err := bq.persistBatch(ctx, item.Batch, item.Key); err != nil { + return dropped, fmt.Errorf("failed to rewrite WAL entry %s: %w", item.Key, err) + } + } + kept = append(kept, item) + } + bq.queue = kept + + return dropped, nil } // Load reloads all batches from WAL file into the in-memory queue after a crash or restart @@ -166,9 +442,13 @@ func (bq *BatchQueue) Load(ctx context.Context) error { bq.mu.Lock() defer bq.mu.Unlock() - // Clear the current queue and reset sequences + // Clear the current queue, dedup set, and reset sequences bq.queue = make([]queuedItem, 0) bq.head = 0 + bq.txSeen = make(map[[32]byte]struct{}) + bq.inFlight = nil + bq.inFlightPostponed = nil + bq.postponedItem = nil bq.nextAddSeq = initialSeqNum bq.nextPrependSeq = initialSeqNum - 1 @@ -184,8 +464,9 @@ func (bq *BatchQueue) Load(ctx context.Context) error { var legacyItems []queuedItem for result := range results.Next() { if result.Error != nil { - fmt.Printf("Error reading entry from datastore: %v\n", result.Error) - continue + // a datastore read failure means the WAL cannot be trusted as + // loaded — fail startup rather than silently dropping txs. + return fmt.Errorf("failed to read WAL entry from datastore: %w", result.Error) } // We care about the last part of the key (the sequence number) // ds.Key usually has a leading slash. @@ -194,7 +475,7 @@ func (bq *BatchQueue) Load(ctx context.Context) error { var pbBatch pb.Batch err := proto.Unmarshal(result.Value, &pbBatch) if err != nil { - fmt.Printf("Error decoding batch for key '%s': %v. Skipping entry.\n", keyName, err) + bq.logger.Error().Err(err).Str("key", keyName).Msg("failed to decode batch, skipping entry") continue } @@ -215,7 +496,7 @@ func (bq *BatchQueue) Load(ctx context.Context) error { } } if isValid { - bq.queue = append(bq.queue, queuedItem{Batch: batch, Key: keyName}) + bq.dedupAndEnqueueLocked(ctx, batch, keyName) } else { legacyItems = append(legacyItems, queuedItem{Batch: batch, Key: result.Key}) } @@ -223,33 +504,32 @@ func (bq *BatchQueue) Load(ctx context.Context) error { if len(legacyItems) == 0 { return nil } - fmt.Printf("Found %d legacy items to migrate...\n", len(legacyItems)) + bq.logger.Info().Int("count", len(legacyItems)).Msg("found legacy items to migrate") for _, item := range legacyItems { newKeyName := seqToKey(bq.nextAddSeq) if err := bq.persistBatch(ctx, item.Batch, newKeyName); err != nil { - fmt.Printf("Failed to migrate legacy item %s: %v\n", item.Key, err) + bq.logger.Error().Err(err).Str("key", item.Key).Msg("failed to migrate legacy item") continue } if err := bq.db.Delete(ctx, ds.NewKey(item.Key)); err != nil { - fmt.Printf("Failed to delete legacy key %s after migration: %v\n", item.Key, err) + bq.logger.Error().Err(err).Str("key", item.Key).Msg("failed to delete legacy key after migration") } - bq.queue = append(bq.queue, queuedItem{Batch: item.Batch, Key: newKeyName}) + bq.dedupAndEnqueueLocked(ctx, item.Batch, newKeyName) bq.nextAddSeq++ } return nil } -// Size returns the effective number of batches in the queue -// This method is primarily for testing and monitoring purposes +// Size returns the total number of pending batches (queued + drained-but-unacked). func (bq *BatchQueue) Size() int { bq.mu.Lock() defer bq.mu.Unlock() - return len(bq.queue) - bq.head + return len(bq.queue) - bq.head + len(bq.inFlight) } // persistBatch persists a batch to the datastore with the given key diff --git a/pkg/sequencers/single/queue_migration_test.go b/pkg/sequencers/single/queue_migration_test.go index 3556765534..c6e85ca97a 100644 --- a/pkg/sequencers/single/queue_migration_test.go +++ b/pkg/sequencers/single/queue_migration_test.go @@ -6,6 +6,7 @@ import ( ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" + "github.com/rs/zerolog" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -53,7 +54,7 @@ func TestLoad_MigratesLegacyKeys(t *testing.T) { require.NoError(err) // 2. Create Queue and call Load - bq := NewBatchQueue(memdb, prefix, 0) + bq := NewBatchQueue(memdb, prefix, 0, zerolog.Nop()) err = bq.Load(ctx) require.NoError(err) @@ -64,7 +65,7 @@ func TestLoad_MigratesLegacyKeys(t *testing.T) { // Check Order: // Appended items should be LAST. // Valid strings are < Appended strings (because we use nextAddSeq which is > validSeq) - // So Next() should return Valid item first. + // So draining should return Valid item first. // Between legacy items, the order depends on how they were iterated from DB and appended. // Iteration order: L2 (11...), L1 (aa...) // Process L2: append -> [Valid, L2] @@ -73,18 +74,15 @@ func TestLoad_MigratesLegacyKeys(t *testing.T) { // So expected retrieval order: Valid, L2, L1. // 1st Item (Valid) - item1, err := bq.Next(ctx) - require.NoError(err) + item1 := drainOne(ctx, t, bq) require.Equal(validBatch.Transactions, item1.Transactions) // 2nd Item (L2) - item2, err := bq.Next(ctx) - require.NoError(err) + item2 := drainOne(ctx, t, bq) require.Equal(legacyBatch2.Transactions, item2.Transactions) // 3rd Item (L1) - item3, err := bq.Next(ctx) - require.NoError(err) + item3 := drainOne(ctx, t, bq) require.Equal(legacyBatch.Transactions, item3.Transactions) // Queue empty @@ -104,7 +102,7 @@ func TestLoad_MigratesLegacyKeys(t *testing.T) { keys = append(keys, "found") } - // We expect 0 keys in DB because we called Next() 3 times which deletes them from DB as well! + // We expect 0 keys in DB because draining and acking all 3 items deletes them from DB as well! require.Empty(keys, "Expected DB to be empty after processing all items") } @@ -132,7 +130,7 @@ func TestLoad_Migration_DBCheck(t *testing.T) { require.NoError(err) // Load - bq := NewBatchQueue(memdb, prefix, 0) + bq := NewBatchQueue(memdb, prefix, 0, zerolog.Nop()) require.NoError(bq.Load(ctx)) // Verify DB keys diff --git a/pkg/sequencers/single/queue_test.go b/pkg/sequencers/single/queue_test.go index 259b96aa23..2c0461f3b7 100644 --- a/pkg/sequencers/single/queue_test.go +++ b/pkg/sequencers/single/queue_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/hex" "errors" + "fmt" "sync" "sync/atomic" "testing" @@ -12,6 +13,7 @@ import ( ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" dssync "github.com/ipfs/go-datastore/sync" + "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -21,11 +23,43 @@ import ( pb "github.com/evstack/ev-node/types/pb/evnode/v1" ) -// createTestBatch creates a batch with dummy transactions for testing +var testBatchNonce atomic.Uint64 + +// failingDeleteOnceDatastore fails the first batched WAL delete commit. +type failingDeleteOnceDatastore struct { + ds.Batching + failed bool +} + +func (d *failingDeleteOnceDatastore) Batch(ctx context.Context) (ds.Batch, error) { + b, err := d.Batching.Batch(ctx) + if err != nil { + return nil, err + } + return &failingOnceBatch{Batch: b, parent: d}, nil +} + +// failingOnceBatch fails its first Commit, then delegates. +type failingOnceBatch struct { + ds.Batch + parent *failingDeleteOnceDatastore +} + +func (b *failingOnceBatch) Commit(ctx context.Context) error { + if !b.parent.failed { + b.parent.failed = true + return assert.AnError + } + return b.Batch.Commit(ctx) +} + +// createTestBatch creates a batch with unique dummy transactions for testing. +// each tx is globally unique via an incrementing nonce. func createTestBatch(t *testing.T, txCount int) coresequencer.Batch { txs := make([][]byte, txCount) for i := range txCount { - txs[i] = []byte{byte(i), byte(i + 1), byte(i + 2)} + n := testBatchNonce.Add(1) + txs[i] = []byte(fmt.Sprintf("tx-%d-%d", n, i)) } return coresequencer.Batch{Transactions: txs} } @@ -33,7 +67,16 @@ func createTestBatch(t *testing.T, txCount int) coresequencer.Batch { func setupTestQueue(t *testing.T) *BatchQueue { // Create an in-memory thread-safe datastore memdb := store.NewPrefixKVStore(ds.NewMapDatastore(), "single") - return NewBatchQueue(memdb, "batching", 0) // 0 = unlimited for existing tests + return NewBatchQueue(memdb, "batching", 0, zerolog.Nop()) // 0 = unlimited for existing tests +} + +// drainOne pops exactly one queue entry and acks it immediately. +func drainOne(ctx context.Context, t *testing.T, bq *BatchQueue) *coresequencer.Batch { + t.Helper() + batch, err := bq.Drain(ctx, 1) + require.NoError(t, err) + require.NoError(t, bq.Ack(ctx)) + return batch } func TestNewBatchQueue(t *testing.T) { @@ -85,7 +128,7 @@ func TestAddBatch(t *testing.T) { { name: "add empty batch", batchesToAdd: []int{0}, - expectQueueLen: 1, + expectQueueLen: 0, // empty batches are no-ops after dedup expectErr: false, }, } @@ -130,12 +173,12 @@ func TestAddBatch(t *testing.T) { } } -func TestNextBatch(t *testing.T) { +func TestDrainOneByOne(t *testing.T) { tests := []struct { name string batchesToAdd []int callNextCount int - expectEmptyAt int // At which call to Next() we expect an empty batch + expectEmptyAt int // At which drain we expect an empty batch expectErrors []bool }{ { @@ -177,9 +220,12 @@ func TestNextBatch(t *testing.T) { } } - // Call Next the specified number of times + // Drain one entry at a time the specified number of times for i := 0; i < tc.callNextCount; i++ { - batch, err := bq.Next(ctx) + batch, err := bq.Drain(ctx, 1) + if err == nil { + err = bq.Ack(ctx) + } // Check error as expected if i < len(tc.expectErrors) && tc.expectErrors[i] { @@ -224,7 +270,7 @@ func TestLoad_WithMixedData(t *testing.T) { queuePrefix := "/batches/" // Define a specific prefix for the queue // Create the BatchQueue using the raw DB and the prefix - bq := NewBatchQueue(rawDB, queuePrefix, 0) // 0 = unlimited for test + bq := NewBatchQueue(rawDB, queuePrefix, 0, zerolog.Nop()) // 0 = unlimited for test require.NotNil(bq) // 1. Add valid batch data under the correct prefix @@ -311,19 +357,19 @@ func TestBatchQueue_Load_SetsSequencesProperly(t *testing.T) { db := ds.NewMapDatastore() prefix := "test-load-sequences" - // Build some persisted state with both AddBatch and Prepend so we have - // keys on both sides of the initialSeqNum. - q1 := NewBatchQueue(db, prefix, 0) + // Build some persisted state with keys on both sides of the initialSeqNum. + // Prepend-side keys are written the same way the postponed-ack path does. + q1 := NewBatchQueue(db, prefix, 0, zerolog.Nop()) require.NoError(t, q1.Load(ctx)) require.NoError(t, q1.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("add-1")}})) // initialSeqNum require.NoError(t, q1.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("add-2")}})) // initialSeqNum+1 - require.NoError(t, q1.Prepend(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("pre-1")}})) // initialSeqNum-1 - require.NoError(t, q1.Prepend(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("pre-2")}})) // initialSeqNum-2 + require.NoError(t, q1.persistBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("pre-1")}}, seqToKey(initialSeqNum-1))) + require.NoError(t, q1.persistBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("pre-2")}}, seqToKey(initialSeqNum-2))) // Simulate restart. - q2 := NewBatchQueue(db, prefix, 0) + q2 := NewBatchQueue(db, prefix, 0, zerolog.Nop()) require.NoError(t, q2.Load(ctx)) // After Load(), the sequencers should be positioned to avoid collisions: @@ -337,9 +383,13 @@ func TestBatchQueue_Load_SetsSequencesProperly(t *testing.T) { _, err := q2.db.Get(ctx, ds.NewKey(seqToKey(initialSeqNum+2))) require.NoError(t, err, "expected AddBatch after Load to persist using nextAddSeq key") - require.NoError(t, q2.Prepend(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("pre-after-load")}})) + // postponed txs are requeued during Ack using nextPrependSeq + _, err = q2.Drain(ctx, 1) + require.NoError(t, err) + q2.SetPostponed([][]byte{[]byte("pre-after-load")}) + require.NoError(t, q2.Ack(ctx)) _, err = q2.db.Get(ctx, ds.NewKey(seqToKey(initialSeqNum-3))) - require.NoError(t, err, "expected Prepend after Load to persist using nextPrependSeq key") + require.NoError(t, err, "expected postponed requeue after Load to persist using nextPrependSeq key") } func TestConcurrency(t *testing.T) { @@ -372,27 +422,15 @@ func TestConcurrency(t *testing.T) { t.Errorf("expected %d batches, got %d", numOperations, bq.Size()) } - // Next operations concurrently (only half) - nextWg := new(sync.WaitGroup) + // drain half the entries one at a time (single consumer) nextCount := numOperations / 2 - nextWg.Add(nextCount) - for range nextCount { - go func() { - defer nextWg.Done() - batch, err := bq.Next(ctx) - if err != nil { - t.Errorf("unexpected error getting batch: %v", err) - } - if batch == nil { - t.Error("expected non-nil batch") - } - }() + batch := drainOne(ctx, t, bq) + if len(batch.Transactions) == 0 { + t.Error("expected non-empty batch") + } } - // Wait for all nexts to complete - nextWg.Wait() - // Verify we have expected number of batches left if bq.Size() != numOperations-nextCount { t.Errorf("expected %d batches left, got %d", numOperations-nextCount, bq.Size()) @@ -447,7 +485,7 @@ func TestBatchQueue_QueueLimit(t *testing.T) { t.Run(tc.name, func(t *testing.T) { // Create in-memory datastore and queue with specified limit memdb := store.NewPrefixKVStore(ds.NewMapDatastore(), "single") - bq := NewBatchQueue(memdb, "batching", tc.maxSize) + bq := NewBatchQueue(memdb, "batching", tc.maxSize, zerolog.Nop()) ctx := context.Background() var lastErr error @@ -491,11 +529,11 @@ func TestBatchQueue_QueueLimit(t *testing.T) { } } -func TestBatchQueue_QueueLimit_WithNext(t *testing.T) { - // Test that removing batches with Next() allows adding more batches +func TestBatchQueue_QueueLimit_WithDrain(t *testing.T) { + // Test that removing batches with Drain+Ack allows adding more batches maxSize := 3 memdb := store.NewPrefixKVStore(ds.NewMapDatastore(), "single") - bq := NewBatchQueue(memdb, "batching", maxSize) + bq := NewBatchQueue(memdb, "batching", maxSize, zerolog.Nop()) ctx := context.Background() // Fill the queue to capacity @@ -519,25 +557,22 @@ func TestBatchQueue_QueueLimit_WithNext(t *testing.T) { t.Errorf("expected ErrQueueFull, got %v", err) } - // Remove one batch using Next() - batch, err := bq.Next(ctx) - if err != nil { - t.Fatalf("unexpected error in Next(): %v", err) - } - if batch == nil || len(batch.Transactions) == 0 { - t.Error("expected non-empty batch from Next()") + // Remove one batch + batch := drainOne(ctx, t, bq) + if len(batch.Transactions) == 0 { + t.Error("expected non-empty batch from drain") } // Verify queue size decreased if bq.Size() != maxSize-1 { - t.Errorf("expected queue size %d after Next(), got %d", maxSize-1, bq.Size()) + t.Errorf("expected queue size %d after drain, got %d", maxSize-1, bq.Size()) } // Now adding a batch should succeed newBatch := createTestBatch(t, 1000) err = bq.AddBatch(ctx, newBatch) if err != nil { - t.Errorf("unexpected error adding batch after Next(): %v", err) + t.Errorf("unexpected error adding batch after drain: %v", err) } // Verify queue is full again @@ -550,7 +585,7 @@ func TestBatchQueue_QueueLimit_Concurrency(t *testing.T) { // Test thread safety of queue limits under concurrent access maxSize := 10 memdb := dssync.MutexWrap(ds.NewMapDatastore()) // Thread-safe datastore - bq := NewBatchQueue(memdb, "batching", maxSize) + bq := NewBatchQueue(memdb, "batching", maxSize, zerolog.Nop()) ctx := context.Background() numWorkers := 20 @@ -609,204 +644,566 @@ func TestBatchQueue_QueueLimit_Concurrency(t *testing.T) { t.Logf("Successfully added %d batches, rejected %d due to queue being full", addedCount, errorCount) } -func TestBatchQueue_Prepend(t *testing.T) { +func TestBatchQueue_Drain_MergesMultipleEntries(t *testing.T) { ctx := context.Background() db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-drain-merge", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}})) + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("tx2")}})) + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("tx3")}})) + + batch, err := queue.Drain(ctx, 0) + require.NoError(t, err) + assert.Equal(t, 3, len(batch.Transactions)) + assert.Equal(t, []byte("tx1"), batch.Transactions[0]) + assert.Equal(t, []byte("tx2"), batch.Transactions[1]) + assert.Equal(t, []byte("tx3"), batch.Transactions[2]) + + // Size includes inFlight (3 drained entries) + assert.Equal(t, 3, queue.Size()) +} - t.Run("prepend to empty queue", func(t *testing.T) { - queue := NewBatchQueue(db, "test-prepend-empty", 0) - err := queue.Load(ctx) - require.NoError(t, err) +func TestBatchQueue_Drain_RespectsMaxBytes(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-drain-maxbytes", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + // each tx is 3 bytes + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("aaa")}})) + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("bbb")}})) + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("ccc")}})) + + // maxBytes=5 should only fit the first entry (3 bytes), second would exceed + batch, err := queue.Drain(ctx, 5) + require.NoError(t, err) + assert.Equal(t, 1, len(batch.Transactions)) + assert.Equal(t, []byte("aaa"), batch.Transactions[0]) + + // 2 entries in queue + 1 inFlight = 3 total + assert.Equal(t, 3, queue.Size()) +} - batch := coresequencer.Batch{ - Transactions: [][]byte{[]byte("tx1"), []byte("tx2")}, - } +func TestBatchQueue_Drain_AlwaysTakesAtLeastOne(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-drain-atleastone", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + // entry is 10 bytes, maxBytes is 1 + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("0123456789")}})) + + batch, err := queue.Drain(ctx, 1) + require.NoError(t, err) + assert.Equal(t, 1, len(batch.Transactions)) + assert.Equal(t, 1, queue.Size()) // inFlight counts +} + +func TestBatchQueue_Drain_EmptyQueue(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-drain-empty", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + batch, err := queue.Drain(ctx, 0) + require.NoError(t, err) + assert.Nil(t, batch.Transactions) +} + +func TestBatchQueue_Drain_RollsBackUnackedInFlight(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-drain-rollback", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}})) + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("tx2")}})) + + // first drain takes both entries + batch1, err := queue.Drain(ctx, 0) + require.NoError(t, err) + assert.Equal(t, 2, len(batch1.Transactions)) + assert.Equal(t, 2, queue.Size()) // inFlight counts + + // second drain without ack should roll back inFlight + batch2, err := queue.Drain(ctx, 0) + require.NoError(t, err) + assert.Equal(t, 2, len(batch2.Transactions)) + assert.Equal(t, []byte("tx1"), batch2.Transactions[0]) + assert.Equal(t, []byte("tx2"), batch2.Transactions[1]) +} + +func TestBatchQueue_Drain_RollbackBulkPrependAfterCompact(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-drain-rollback-bulk", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + // drain >100 entries so compactLocked resets head to 0 while they + // are in flight, forcing the bulk-prepend rollback path on next drain + const n = 150 + for i := range n { + tx := []byte(fmt.Sprintf("tx-%03d", i)) + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{tx}})) + } + + batch1, err := queue.Drain(ctx, 0) + require.NoError(t, err) + require.Len(t, batch1.Transactions, n) + require.Equal(t, 0, queue.head, "compact should reset head while entries are in flight") + + // enqueue one more so the rollback has a tail to prepend in front of + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("tx-new")}})) + + // second drain without ack rolls back via the bulk-prepend path + batch2, err := queue.Drain(ctx, 0) + require.NoError(t, err) + require.Len(t, batch2.Transactions, n+1) + for i := range n { + assert.Equal(t, []byte(fmt.Sprintf("tx-%03d", i)), batch2.Transactions[i]) + } + assert.Equal(t, []byte("tx-new"), batch2.Transactions[n]) +} + +func TestBatchQueue_Ack_DeletesWALEntries(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-ack-wal", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}})) + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("tx2")}})) + + _, err := queue.Drain(ctx, 0) + require.NoError(t, err) + + require.NoError(t, queue.Ack(ctx)) + + // WAL should be empty — reload and verify + queue2 := NewBatchQueue(db, "test-ack-wal", 0, zerolog.Nop()) + require.NoError(t, queue2.Load(ctx)) + assert.Equal(t, 0, queue2.Size()) +} + +func TestBatchQueue_Ack_RequeuesPostponedTxs(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-ack-postponed", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("tx1"), []byte("tx2")}})) + + _, err := queue.Drain(ctx, 0) + require.NoError(t, err) + + // set postponed txs and ack + queue.SetPostponed([][]byte{[]byte("postponed1"), []byte("postponed2")}) + require.NoError(t, queue.Ack(ctx)) + + // queue should have the postponed txs + assert.Equal(t, 1, queue.Size()) + + batch, err := queue.Drain(ctx, 0) + require.NoError(t, err) + assert.Equal(t, 2, len(batch.Transactions)) + assert.Equal(t, []byte("postponed1"), batch.Transactions[0]) + assert.Equal(t, []byte("postponed2"), batch.Transactions[1]) +} + +func TestBatchQueue_Ack_PostponedTxsStayDeduped(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-ack-postponed-dedup", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + postponedTx := []byte("postponed") + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("tx1"), postponedTx}})) + + _, err := queue.Drain(ctx, 0) + require.NoError(t, err) + + queue.SetPostponed([][]byte{postponedTx}) + require.NoError(t, queue.Ack(ctx)) + assert.Equal(t, 1, queue.Size()) + + // re-submitting the postponed tx (e.g. reaper rescrape) must be deduped, + // otherwise it would be included twice in the next drained batch + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{postponedTx}})) + assert.Equal(t, 1, queue.Size()) + + batch, err := queue.Drain(ctx, 0) + require.NoError(t, err) + assert.Equal(t, [][]byte{postponedTx}, batch.Transactions) + + // committed (non-postponed) txs are released from the dedup set on ack + require.NoError(t, queue.Ack(ctx)) + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}})) + assert.Equal(t, 1, queue.Size()) +} + +func TestBatchQueue_InFlight_SurvivesRestart(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-inflight-restart", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}})) + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("tx2")}})) + + // drain without ack + _, err := queue.Drain(ctx, 0) + require.NoError(t, err) + + // simulate restart — WAL entries should still exist since no ack + queue2 := NewBatchQueue(db, "test-inflight-restart", 0, zerolog.Nop()) + require.NoError(t, queue2.Load(ctx)) + assert.Equal(t, 2, queue2.Size()) + + batch, err := queue2.Drain(ctx, 1) + require.NoError(t, err) + assert.Equal(t, []byte("tx1"), batch.Transactions[0]) +} + +func TestBatchQueue_InFlight_CountsTowardQueueLimit(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + maxSize := 3 + queue := NewBatchQueue(db, "test-inflight-limit", maxSize, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + // fill to capacity + for i := range maxSize { + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{ + Transactions: [][]byte{[]byte(fmt.Sprintf("tx%d", i))}, + })) + } + + // drain all — moves entries to inFlight + _, err := queue.Drain(ctx, 0) + require.NoError(t, err) + + // Size includes inFlight + assert.Equal(t, 3, queue.Size()) + + // adding should still be rejected because inFlight counts + err = queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("overflow")}}) + assert.ErrorIs(t, err, ErrQueueFull) + + // after ack, adding should succeed + require.NoError(t, queue.Ack(ctx)) + err = queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("ok")}}) + assert.NoError(t, err) +} + +func TestBatchQueue_Ack_PersistsPostponedBeforeDeletingWAL(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-ack-order", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}})) + + _, err := queue.Drain(ctx, 0) + require.NoError(t, err) + + // set postponed txs and ack + queue.SetPostponed([][]byte{[]byte("postponed")}) + require.NoError(t, queue.Ack(ctx)) + + // simulate restart — postponed tx should survive + queue2 := NewBatchQueue(db, "test-ack-order", 0, zerolog.Nop()) + require.NoError(t, queue2.Load(ctx)) + assert.Equal(t, 1, queue2.Size()) + + batch, err := queue2.Drain(ctx, 0) + require.NoError(t, err) + assert.Equal(t, []byte("postponed"), batch.Transactions[0]) +} + +func TestBatchQueue_AckRetry_DoesNotDuplicatePostponed(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-ack-retry-postponed", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}})) + _, err := queue.Drain(ctx, 0) + require.NoError(t, err) + queue.SetPostponed([][]byte{[]byte("postponed")}) + + // Simulate the first WAL delete failing after postponed txs were already persisted. + originalDB := queue.db + queue.db = &failingDeleteOnceDatastore{Batching: originalDB} + err = queue.Ack(ctx) + require.Error(t, err) + + // Retry with the original datastore. The postponed batch should not be persisted twice. + queue.db = originalDB + require.NoError(t, queue.Ack(ctx)) + + reloaded := NewBatchQueue(db, "test-ack-retry-postponed", 0, zerolog.Nop()) + require.NoError(t, reloaded.Load(ctx)) + assert.Equal(t, 1, reloaded.Size()) + + batch, err := reloaded.Drain(ctx, 0) + require.NoError(t, err) + assert.Equal(t, [][]byte{[]byte("postponed")}, batch.Transactions) +} + +func TestBatchQueue_AckFailureThenDrain_AllowsNewPostponedDecision(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-ack-fail-drain", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + tx1, tx2 := []byte("tx1"), []byte("tx2") + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{tx1, tx2}})) + + _, err := queue.Drain(ctx, 0) + require.NoError(t, err) + queue.SetPostponed([][]byte{tx2}) + + // postponed entry is persisted, then the WAL delete fails + originalDB := queue.db + queue.db = &failingDeleteOnceDatastore{Batching: originalDB} + require.Error(t, queue.Ack(ctx)) + queue.db = originalDB + + // a new drain rolls the in-flight entries back; each tx must appear + // exactly once even though a postponed entry was persisted + batch, err := queue.Drain(ctx, 0) + require.NoError(t, err) + assert.Equal(t, [][]byte{tx1, tx2}, batch.Transactions) + + // the filter decision may differ on retry — it must not be ignored + queue.SetPostponed([][]byte{tx1}) + require.NoError(t, queue.Ack(ctx)) + + // only the new postponed tx is re-queued + batch, err = queue.Drain(ctx, 0) + require.NoError(t, err) + assert.Equal(t, [][]byte{tx1}, batch.Transactions) + require.NoError(t, queue.Ack(ctx)) + + // tx2 was committed: its hash must be released for re-submission + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{tx2}})) + assert.Equal(t, 1, queue.Size()) + + // a reload must not resurrect the stale postponed WAL entry + reloaded := NewBatchQueue(db, "test-ack-fail-drain", 0, zerolog.Nop()) + require.NoError(t, reloaded.Load(ctx)) + batch, err = reloaded.Drain(ctx, 0) + require.NoError(t, err) + assert.Equal(t, [][]byte{tx2}, batch.Transactions) +} + +func TestBatchQueue_DropIncluded_CrashBetweenCommitAndAck(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-drop-included", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + committed1 := []byte("committed1") + committed2 := []byte("committed2") + postponed := []byte("postponed") + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{committed1, postponed}})) + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{committed2}})) + + // drain, block commits with committed1+committed2, then crash before Ack + _, err := queue.Drain(ctx, 0) + require.NoError(t, err) + + // restart — WAL reloads all entries + queue2 := NewBatchQueue(db, "test-drop-included", 0, zerolog.Nop()) + require.NoError(t, queue2.Load(ctx)) + assert.Equal(t, 2, queue2.Size()) + + dropped, err := queue2.DropIncluded(ctx, [][]byte{committed1, committed2}) + require.NoError(t, err) + assert.Equal(t, 2, dropped) + + // only the postponed tx remains + batch, err := queue2.Drain(ctx, 0) + require.NoError(t, err) + assert.Equal(t, [][]byte{postponed}, batch.Transactions) + require.NoError(t, queue2.Ack(ctx)) + + // dropped txs are released from the dedup set + require.NoError(t, queue2.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{committed1}})) + assert.Equal(t, 1, queue2.Size()) + + // rewritten WAL is consistent across another reload + queue3 := NewBatchQueue(db, "test-drop-included", 0, zerolog.Nop()) + require.NoError(t, queue3.Load(ctx)) + batch, err = queue3.Drain(ctx, 0) + require.NoError(t, err) + assert.Equal(t, [][]byte{committed1}, batch.Transactions) +} + +func TestBatchQueue_DropIncluded_NoMatches(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-drop-none", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}})) + + dropped, err := queue.DropIncluded(ctx, [][]byte{[]byte("other")}) + require.NoError(t, err) + assert.Equal(t, 0, dropped) + assert.Equal(t, 1, queue.Size()) +} + +// countWALEntries returns the number of WAL keys stored under the queue's prefix. +func countWALEntries(t *testing.T, bq *BatchQueue) int { + t.Helper() + results, err := bq.db.Query(context.Background(), query.Query{}) + require.NoError(t, err) + defer results.Close() + count := 0 + for result := range results.Next() { + require.NoError(t, result.Error) + count++ + } + return count +} + +func TestBatchQueue_Load_DeletesFullyDuplicateWALEntries(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-load-dup-clean", 0, zerolog.Nop()) + + // craft a WAL with the same tx persisted under two keys, + // simulating a partially failed ack/cleanup from a previous run + tx := []byte("dup-tx") + batch := coresequencer.Batch{Transactions: [][]byte{tx}} + require.NoError(t, queue.persistBatch(ctx, batch, seqToKey(initialSeqNum))) + require.NoError(t, queue.persistBatch(ctx, batch, seqToKey(initialSeqNum+1))) + + require.NoError(t, queue.Load(ctx)) + assert.Equal(t, 1, queue.Size()) + + // the duplicate WAL entry must be deleted, not just dropped in memory + assert.Equal(t, 1, countWALEntries(t, queue)) + + // a fresh reload sees the same clean state + queue2 := NewBatchQueue(db, "test-load-dup-clean", 0, zerolog.Nop()) + require.NoError(t, queue2.Load(ctx)) + assert.Equal(t, 1, queue2.Size()) +} + +func TestBatchQueue_Load_RewritesPartiallyDuplicateWALEntries(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-load-partial-clean", 0, zerolog.Nop()) + + // a prepend-keyed entry (loads first) shares one tx with an add-keyed entry + shared := []byte("shared") + other := []byte("other") + require.NoError(t, queue.persistBatch(ctx, coresequencer.Batch{Transactions: [][]byte{shared}}, seqToKey(initialSeqNum-1))) + require.NoError(t, queue.persistBatch(ctx, coresequencer.Batch{Transactions: [][]byte{shared, other}}, seqToKey(initialSeqNum))) + + require.NoError(t, queue.Load(ctx)) + assert.Equal(t, 2, queue.Size()) + + // consume and ack the entry holding the shared tx, releasing its hash + batch, err := queue.Drain(ctx, 1) // small cap drains only the first entry + require.NoError(t, err) + assert.Equal(t, [][]byte{shared}, batch.Transactions) + require.NoError(t, queue.Ack(ctx)) + + // reload: the second entry's WAL value must have been rewritten without + // the shared tx, so it cannot be resurrected after the first was consumed + queue2 := NewBatchQueue(db, "test-load-partial-clean", 0, zerolog.Nop()) + require.NoError(t, queue2.Load(ctx)) + nextBatch, err := queue2.Drain(ctx, 0) + require.NoError(t, err) + assert.Equal(t, [][]byte{other}, nextBatch.Transactions) +} + +func TestBatchQueue_Dedup_SkipsDuplicateTxs(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-dedup", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + tx := []byte("same-tx") + + // first add succeeds + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{tx}})) + assert.Equal(t, 1, queue.Size()) + + // second add of same tx is silently skipped (dedup) + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{tx}})) + assert.Equal(t, 1, queue.Size()) + + // after drain + ack, the hash is freed and the tx can be re-enqueued + _, err := queue.Drain(ctx, 0) + require.NoError(t, err) + require.NoError(t, queue.Ack(ctx)) + + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{tx}})) + assert.Equal(t, 1, queue.Size()) +} + +func TestBatchQueue_Dedup_PartialBatch(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-dedup-partial", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + tx1 := []byte("tx1") + tx2 := []byte("tx2") + + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{tx1}})) + + // batch with one dup and one new tx: only new tx enqueued + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{tx1, tx2}})) + assert.Equal(t, 2, queue.Size()) + + // drain entries one at a time: batch1 has tx1, batch2 has only tx2 + batch1 := drainOne(ctx, t, queue) + assert.Equal(t, [][]byte{tx1}, batch1.Transactions) + + batch2 := drainOne(ctx, t, queue) + assert.Equal(t, [][]byte{tx2}, batch2.Transactions) +} + +func TestBatchQueue_Dedup_InFlightBlocksReenqueue(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-dedup-inflight", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + tx := []byte("inflight-tx") + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{tx}})) + + // drain moves to inFlight — tx hash stays in dedup set + _, err := queue.Drain(ctx, 0) + require.NoError(t, err) + + // re-add same tx while in-flight: silently skipped + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{tx}})) + // Size is 1 (inFlight), not 2 + assert.Equal(t, 1, queue.Size()) +} + +func TestBatchQueue_Dedup_SurvivesLoad(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + queue := NewBatchQueue(db, "test-dedup-load", 0, zerolog.Nop()) + require.NoError(t, queue.Load(ctx)) + + tx := []byte("persist-tx") + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{tx}})) + + // simulate restart + queue2 := NewBatchQueue(db, "test-dedup-load", 0, zerolog.Nop()) + require.NoError(t, queue2.Load(ctx)) - err = queue.Prepend(ctx, batch) - require.NoError(t, err) - - assert.Equal(t, 1, queue.Size()) - - // Next should return the prepended batch - nextBatch, err := queue.Next(ctx) - require.NoError(t, err) - assert.Equal(t, 2, len(nextBatch.Transactions)) - assert.Equal(t, []byte("tx1"), nextBatch.Transactions[0]) - }) - - t.Run("prepend to queue with items", func(t *testing.T) { - queue := NewBatchQueue(db, "test-prepend-with-items", 0) - err := queue.Load(ctx) - require.NoError(t, err) - - // Add some batches first - batch1 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}} - batch2 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx2")}} - err = queue.AddBatch(ctx, batch1) - require.NoError(t, err) - err = queue.AddBatch(ctx, batch2) - require.NoError(t, err) - - assert.Equal(t, 2, queue.Size()) - - // Prepend a batch - prependedBatch := coresequencer.Batch{Transactions: [][]byte{[]byte("prepended")}} - err = queue.Prepend(ctx, prependedBatch) - require.NoError(t, err) - - assert.Equal(t, 3, queue.Size()) - - // Next should return the prepended batch first - nextBatch, err := queue.Next(ctx) - require.NoError(t, err) - assert.Equal(t, 1, len(nextBatch.Transactions)) - assert.Equal(t, []byte("prepended"), nextBatch.Transactions[0]) - - // Then the original batches - nextBatch, err = queue.Next(ctx) - require.NoError(t, err) - assert.Equal(t, []byte("tx1"), nextBatch.Transactions[0]) - - nextBatch, err = queue.Next(ctx) - require.NoError(t, err) - assert.Equal(t, []byte("tx2"), nextBatch.Transactions[0]) - }) - - t.Run("prepend after consuming some items", func(t *testing.T) { - queue := NewBatchQueue(db, "test-prepend-after-consume", 0) - err := queue.Load(ctx) - require.NoError(t, err) - - // Add batches - batch1 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}} - batch2 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx2")}} - batch3 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx3")}} - err = queue.AddBatch(ctx, batch1) - require.NoError(t, err) - err = queue.AddBatch(ctx, batch2) - require.NoError(t, err) - err = queue.AddBatch(ctx, batch3) - require.NoError(t, err) - - assert.Equal(t, 3, queue.Size()) - - // Consume first batch - nextBatch, err := queue.Next(ctx) - require.NoError(t, err) - assert.Equal(t, []byte("tx1"), nextBatch.Transactions[0]) - assert.Equal(t, 2, queue.Size()) - - // Prepend - should reuse the head position - prependedBatch := coresequencer.Batch{Transactions: [][]byte{[]byte("prepended")}} - err = queue.Prepend(ctx, prependedBatch) - require.NoError(t, err) - - assert.Equal(t, 3, queue.Size()) - - // Should get prepended, then tx2, then tx3 - nextBatch, err = queue.Next(ctx) - require.NoError(t, err) - assert.Equal(t, []byte("prepended"), nextBatch.Transactions[0]) - - nextBatch, err = queue.Next(ctx) - require.NoError(t, err) - assert.Equal(t, []byte("tx2"), nextBatch.Transactions[0]) - - nextBatch, err = queue.Next(ctx) - require.NoError(t, err) - assert.Equal(t, []byte("tx3"), nextBatch.Transactions[0]) - - assert.Equal(t, 0, queue.Size()) - }) - - t.Run("multiple prepends", func(t *testing.T) { - queue := NewBatchQueue(db, "test-multiple-prepends", 0) - err := queue.Load(ctx) - require.NoError(t, err) - - // Add a batch - batch1 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}} - err = queue.AddBatch(ctx, batch1) - require.NoError(t, err) - - // Prepend multiple batches - prepend1 := coresequencer.Batch{Transactions: [][]byte{[]byte("prepend1")}} - prepend2 := coresequencer.Batch{Transactions: [][]byte{[]byte("prepend2")}} - prepend3 := coresequencer.Batch{Transactions: [][]byte{[]byte("prepend3")}} - - err = queue.Prepend(ctx, prepend1) - require.NoError(t, err) - err = queue.Prepend(ctx, prepend2) - require.NoError(t, err) - err = queue.Prepend(ctx, prepend3) - require.NoError(t, err) - - assert.Equal(t, 4, queue.Size()) - - // Should get in reverse order of prepending (LIFO for prepended items) - nextBatch, err := queue.Next(ctx) - require.NoError(t, err) - assert.Equal(t, []byte("prepend3"), nextBatch.Transactions[0]) - - nextBatch, err = queue.Next(ctx) - require.NoError(t, err) - assert.Equal(t, []byte("prepend2"), nextBatch.Transactions[0]) - - nextBatch, err = queue.Next(ctx) - require.NoError(t, err) - assert.Equal(t, []byte("prepend1"), nextBatch.Transactions[0]) - - nextBatch, err = queue.Next(ctx) - require.NoError(t, err) - assert.Equal(t, []byte("tx1"), nextBatch.Transactions[0]) - }) - - t.Run("prepend persistence across restarts", func(t *testing.T) { - prefix := "test-prepend-persistence" - queue := NewBatchQueue(db, prefix, 0) - err := queue.Load(ctx) - require.NoError(t, err) - - // Add some batches - batch1 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}} - batch2 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx2")}} - err = queue.AddBatch(ctx, batch1) - require.NoError(t, err) - err = queue.AddBatch(ctx, batch2) - require.NoError(t, err) - - // Consume first batch - _, err = queue.Next(ctx) - require.NoError(t, err) - - // Prepend a batch (simulating transactions that couldn't fit) - prependedBatch := coresequencer.Batch{Transactions: [][]byte{[]byte("prepended")}} - err = queue.Prepend(ctx, prependedBatch) - require.NoError(t, err) - - assert.Equal(t, 2, queue.Size()) - - // Simulate restart by creating a new queue with same prefix - queue2 := NewBatchQueue(db, prefix, 0) - err = queue2.Load(ctx) - require.NoError(t, err) - - // Should have both the prepended batch and tx2 - assert.Equal(t, 2, queue2.Size()) - - // First should be prepended batch - nextBatch, err := queue2.Next(ctx) - require.NoError(t, err) - assert.Equal(t, 1, len(nextBatch.Transactions)) - assert.Contains(t, nextBatch.Transactions, []byte("prepended")) - - // Then tx2 - nextBatch, err = queue2.Next(ctx) - require.NoError(t, err) - assert.Equal(t, 1, len(nextBatch.Transactions)) - assert.Contains(t, nextBatch.Transactions, []byte("tx2")) - - // Queue should be empty now - assert.Equal(t, 0, queue2.Size()) - }) + // dedup set is rebuilt from WAL — re-add should be skipped + require.NoError(t, queue2.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{tx}})) + assert.Equal(t, 1, queue2.Size()) } diff --git a/pkg/sequencers/single/sequencer.go b/pkg/sequencers/single/sequencer.go index 7b63334703..dd30e73f41 100644 --- a/pkg/sequencers/single/sequencer.go +++ b/pkg/sequencers/single/sequencer.go @@ -85,7 +85,7 @@ func NewSequencer( cfg: cfg, batchTime: cfg.Node.BlockTime.Duration, Id: id, - queue: NewBatchQueue(db, "batches", maxQueueSize), + queue: NewBatchQueue(db, "batches", maxQueueSize, logger), checkpointStore: seqcommon.NewCheckpointStore(db, ds.NewKey("/single/checkpoint")), genesis: genesis, currentDAEndTime: genesis.StartTime.UTC(), @@ -94,7 +94,9 @@ func NewSequencer( s.SetDAHeight(genesis.DAStartHeight) // default value, will be overridden by executor or submitter s.daStartHeight.Store(genesis.DAStartHeight) - loadCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + // generous timeout: loading and reconciling a large WAL backlog + // (e.g. after a burst followed by a crash) can take a while + loadCtx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) defer cancel() // Load batch queue from DB @@ -102,6 +104,12 @@ func NewSequencer( return nil, fmt.Errorf("failed to load batch queue from DB: %w", err) } + // reconcile a crash between block commit and queue ack: drop reloaded + // WAL entries whose txs were already committed in the last block + if err := s.reconcileQueueWithLastBlock(loadCtx); err != nil { + return nil, fmt.Errorf("failed to reconcile batch queue with last committed block: %w", err) + } + // Load checkpoint from DB or initialize checkpoint, err := s.checkpointStore.Load(loadCtx) if err != nil { @@ -134,6 +142,53 @@ func NewSequencer( return s, nil } +// reconcileQueueWithLastBlock drops queued txs that were already committed in +// the last block. This covers a crash between block commit and queue Ack: +// the un-acked WAL entries are reloaded by Load, but their txs are already +// on chain. Only the last block needs checking — block production retries a +// failed ack before draining again, so at most one committed block can be +// un-acked at any time. Postponed txs were not included in the block and +// therefore survive reconciliation. +func (c *Sequencer) reconcileQueueWithLastBlock(ctx context.Context) error { + if c.queue.Size() == 0 { + return nil + } + + s := store.New(store.NewEvNodeKVStore(c.db)) + height, err := s.Height(ctx) + if err != nil { + return fmt.Errorf("failed to read chain height: %w", err) + } + if height == 0 { + return nil + } + + _, data, err := s.GetBlockData(ctx, height) + if err != nil { + return fmt.Errorf("failed to read block data at height %d: %w", height, err) + } + if data == nil || len(data.Txs) == 0 { + return nil + } + + included := make([][]byte, len(data.Txs)) + for i, tx := range data.Txs { + included[i] = tx + } + + dropped, err := c.queue.DropIncluded(ctx, included) + if err != nil { + return err + } + if dropped > 0 { + c.logger.Info(). + Int("dropped_txs", dropped). + Uint64("height", height). + Msg("dropped queued txs already committed in last block (crash recovery)") + } + return nil +} + // getInitialDAStartHeight retrieves the DA height of the first included chain height from store. func (c *Sequencer) getInitialDAStartHeight(ctx context.Context) uint64 { if daStartHeight := c.daStartHeight.Load(); daStartHeight != 0 { @@ -226,7 +281,7 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB var mempoolBatch *coresequencer.Batch if c.catchUpState.Load() != catchUpInProgress { var err error - mempoolBatch, err = c.queue.Next(ctx) + mempoolBatch, err = c.queue.Drain(ctx, req.MaxBytes) if err != nil { return nil, err } @@ -268,6 +323,7 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB // TxIndex tracks consumed txs from the start of the epoch, so we must process in order. var validForcedTxs [][]byte var validMempoolTxs [][]byte + var postponedMempoolTxs [][]byte var forcedTxConsumedCount uint64 var forcedTxPostponed bool @@ -294,25 +350,16 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB switch status { case execution.FilterOK: validMempoolTxs = append(validMempoolTxs, allTxs[i]) - case execution.FilterPostpone, execution.FilterRemove: - // Mempool txs that are postponed/removed are handled separately + case execution.FilterPostpone: + // requeued at ack time (after block commit) + postponedMempoolTxs = append(postponedMempoolTxs, allTxs[i]) + case execution.FilterRemove: + // dropped permanently } } } - // Return any postponed mempool txs to the queue for the next batch - // (they were valid but didn't fit due to size/gas limits) - var postponedMempoolTxs [][]byte - for i, status := range filterStatuses { - if i >= forcedTxCount && status == execution.FilterPostpone { - postponedMempoolTxs = append(postponedMempoolTxs, allTxs[i]) - } - } - if len(postponedMempoolTxs) > 0 { - if err := c.queue.Prepend(ctx, coresequencer.Batch{Transactions: postponedMempoolTxs}); err != nil { - c.logger.Error().Err(err).Int("count", len(postponedMempoolTxs)).Msg("failed to prepend postponed mempool txs") - } - } + c.queue.SetPostponed(postponedMempoolTxs) // Update checkpoint after consuming forced inclusion transactions. // txIndexForTimestamp is captured before the epoch-boundary reset so the @@ -373,6 +420,11 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB }, nil } +// AckBatch commits the current in-flight queue entries and requeues any postponed txs. +func (c *Sequencer) AckBatch(ctx context.Context) error { + return c.queue.Ack(ctx) +} + // VerifyBatch implements sequencing.Sequencer. func (c *Sequencer) VerifyBatch(ctx context.Context, req coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) { if !c.isValid(req.Id) { diff --git a/pkg/sequencers/single/sequencer_test.go b/pkg/sequencers/single/sequencer_test.go index 5395aabc16..a2374bb192 100644 --- a/pkg/sequencers/single/sequencer_test.go +++ b/pkg/sequencers/single/sequencer_test.go @@ -19,8 +19,10 @@ import ( "github.com/evstack/ev-node/pkg/config" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/test/mocks" "github.com/evstack/ev-node/test/testda" + "github.com/evstack/ev-node/types" ) // MockFullDAClient combines MockClient and MockVerifier to implement FullDAClient @@ -623,6 +625,9 @@ func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) assert.Equal(t, uint64(1), seq.checkpoint.TxIndex, "TxIndex should be 1 (consumed first forced tx)") assert.Equal(t, 2, len(seq.cachedForcedInclusionTxs), "Cache should still contain all original txs") + // ack the first batch so drained queue entries are committed + require.NoError(t, seq.AckBatch(ctx)) + // Second call with larger maxBytes = 200 // Should process pending forced tx first getReq2 := coresequencer.GetNextBatchRequest{ @@ -718,7 +723,7 @@ func TestSequencer_QueueLimit_Integration(t *testing.T) { t.Error("expected nil response when submission fails") } - // Test that getting a batch frees up space + // Test that getting a batch + acking frees up space nextResp, err := seq.GetNextBatch(ctx, coresequencer.GetNextBatchRequest{Id: seq.Id}) if err != nil { t.Fatalf("unexpected error getting next batch: %v", err) @@ -726,6 +731,8 @@ func TestSequencer_QueueLimit_Integration(t *testing.T) { if nextResp == nil || nextResp.Batch == nil { t.Fatal("expected non-nil batch response") } + // ack the drained batch to free queue capacity + require.NoError(t, seq.AckBatch(ctx)) // Now the third batch should succeed resp3_retry, err := seq.SubmitBatchTxs(ctx, req3) @@ -829,6 +836,8 @@ func TestSequencer_DAFailureAndQueueThrottling_Integration(t *testing.T) { require.NoError(t, err) require.NotNil(t, nextResp) require.NotNil(t, nextResp.Batch) + // ack the drained batch to free queue capacity + require.NoError(t, seq.AckBatch(ctx)) // Now we should be able to add the overflow batch resp, err = seq.SubmitBatchTxs(ctx, overflowReq) @@ -2302,3 +2311,35 @@ func TestSequencer_GetNextBatch_GasFilteringPreservesUnprocessedTxs(t *testing.T assert.True(t, txFound[string(tx2)], "tx2 should have been processed (must not be lost)") assert.True(t, txFound[string(tx3)], "tx3 should have been processed (must not be lost)") } + +func TestSequencer_ReconcileQueueWithLastBlock_CrashRecovery(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + + // commit a block at height 1 containing two txs + header, data := types.GetRandomBlock(1, 2, "test") + evStore := store.New(store.NewEvNodeKVStore(db)) + batch, err := evStore.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(header, data, &header.Signature)) + require.NoError(t, batch.SetHeight(1)) + require.NoError(t, batch.Commit()) + + committed1 := []byte(data.Txs[0]) + committed2 := []byte(data.Txs[1]) + pending := []byte("tx-still-pending") + + // simulate a crash between block commit and queue ack: WAL still holds + // the committed txs alongside a tx that was never included + queue := NewBatchQueue(db, "batches", 0, zerolog.Nop()) + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{committed1, committed2}})) + require.NoError(t, queue.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{pending}})) + + // restart: NewSequencer loads the queue and reconciles against the last block + seq := newTestSequencer(t, db, newDummyDA(100_000_000)) + + drained, err := seq.queue.Drain(ctx, 0) + require.NoError(t, err) + require.Len(t, drained.Transactions, 1, "committed txs should have been dropped on reconcile") + assert.Equal(t, pending, drained.Transactions[0]) +} diff --git a/pkg/telemetry/sequencer_tracing.go b/pkg/telemetry/sequencer_tracing.go index bd1d8b24a5..a28258ca61 100644 --- a/pkg/telemetry/sequencer_tracing.go +++ b/pkg/telemetry/sequencer_tracing.go @@ -20,12 +20,35 @@ type tracedSequencer struct { tracer trace.Tracer } +// batchAcknowledger is implemented by sequencers that require an ack +// after drained queue entries are durably committed in a block. +type batchAcknowledger interface { + AckBatch(ctx context.Context) error +} + // WithTracingSequencer decorates the provided Sequencer with tracing spans. +// If the inner sequencer implements AckBatch, the returned sequencer +// forwards it so consumers can still detect and use the ack capability. func WithTracingSequencer(inner coresequencer.Sequencer) coresequencer.Sequencer { - return &tracedSequencer{ + ts := &tracedSequencer{ inner: inner, tracer: otel.Tracer("ev-node/sequencer"), } + if acker, ok := inner.(batchAcknowledger); ok { + return &tracedAckSequencer{tracedSequencer: ts, acker: acker} + } + return ts +} + +// tracedAckSequencer is a tracedSequencer whose inner sequencer also +// implements AckBatch. +type tracedAckSequencer struct { + *tracedSequencer + acker batchAcknowledger +} + +func (t *tracedAckSequencer) AckBatch(ctx context.Context) error { + return t.acker.AckBatch(ctx) } func (t *tracedSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) {