From 26825b266de67c0d315e600388a30dcae583cf80 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Fri, 12 Jun 2026 15:14:48 +0200 Subject: [PATCH 1/5] WIP --- sender/sharded_sender.go | 2 - sender/worker.go | 106 +++++++++++++++------------------------ sender/worker_test.go | 34 +++++++++++++ 3 files changed, 75 insertions(+), 67 deletions(-) diff --git a/sender/sharded_sender.go b/sender/sharded_sender.go index 8bc7c8d..a285106 100644 --- a/sender/sharded_sender.go +++ b/sender/sharded_sender.go @@ -23,7 +23,6 @@ type ShardedSender struct { mu sync.RWMutex collector *stats.Collector logger *stats.Logger - limiter *rate.Limiter // Shared rate limiter for all workers } // NewShardedSender creates a new sharded sender with workers for each endpoint @@ -41,7 +40,6 @@ func NewShardedSender(cfg *config.LoadConfig, bufferSize int, workers int, limit workers: workerList, numShards: len(cfg.Endpoints), bufferSize: bufferSize, - limiter: limiter, }, nil } diff --git a/sender/worker.go b/sender/worker.go index 9b48a8a..75aedf7 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -1,18 +1,18 @@ package sender import ( - "bytes" "context" "errors" "fmt" - "io" "log" "net" "net/http" + "net/url" "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/rpc" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -90,6 +90,29 @@ func newHttpClient(opts ...HttpClientOption) *http.Client { } } +// newRPCClient returns a go-ethereum client configured for the endpoint scheme. +// HTTP(S) endpoints reuse the tuned otelhttp-backed transport; WS(S) endpoints +// use the default go-ethereum WebSocket transport. +func newRPCClient(ctx context.Context, endpoint string, opts ...HttpClientOption) (*ethclient.Client, error) { + u, err := url.Parse(endpoint) + if err != nil { + return nil, fmt.Errorf("parse endpoint %q: %w", endpoint, err) + } + + switch u.Scheme { + case "http", "https": + rpcClient, err := rpc.DialOptions(ctx, endpoint, rpc.WithHTTPClient(newHttpClient(opts...))) + if err != nil { + return nil, err + } + return ethclient.NewClient(rpcClient), nil + case "ws", "wss", "": + return ethclient.DialContext(ctx, endpoint) + default: + return nil, fmt.Errorf("unsupported RPC scheme %q for endpoint %s", u.Scheme, endpoint) + } +} + // NewWorker creates a new worker for a specific endpoint func NewWorker(id int, seiChainID string, endpoint string, bufferSize int, workers int, limiter *rate.Limiter) *Worker { w := &Worker{ @@ -115,12 +138,17 @@ func (w *Worker) SetStatsCollector(collector *stats.Collector, logger *stats.Log // Start begins the worker's processing loop func (w *Worker) Run(ctx context.Context) error { return service.Run(ctx, func(ctx context.Context, s service.Scope) error { - // Start multiple worker goroutines that share the same channel - client := newHttpClient() + client, err := newRPCClient(ctx, w.endpoint) + if err != nil { + return fmt.Errorf("dial %s: %w", w.endpoint, err) + } + defer client.Close() + + // Start multiple worker goroutines that share the same channel and RPC client. for range w.workers { s.Spawn(func() error { return w.processTransactions(ctx, client) }) } - return w.watchTransactions(ctx) + return w.watchTransactions(ctx, client) }) } @@ -144,22 +172,10 @@ func (w *Worker) SetTrackReceipts(trackReceipts bool) { w.trackReceipts = trackReceipts } -func (w *Worker) watchTransactions(ctx context.Context) error { +func (w *Worker) watchTransactions(ctx context.Context, eth *ethclient.Client) error { if w.dryRun || !w.trackReceipts { return nil } - dialCtx, dialSpan := tracer.Start(ctx, "sender.dial_endpoint", trace.WithAttributes( - attribute.String("seiload.endpoint", w.endpoint), - attribute.String("seiload.chain_id", w.seiChainID), - attribute.Int("seiload.worker_id", w.id), - )) - eth, err := ethclient.DialContext(dialCtx, w.endpoint) - if err != nil { - dialSpan.RecordError(err) - dialSpan.End() - return fmt.Errorf("ethclient.Dial(%q): %w", w.endpoint, err) - } - dialSpan.End() for ctx.Err() == nil { tx, err := utils.Recv(ctx, w.sentTxs) if err != nil { @@ -224,13 +240,11 @@ func (w *Worker) waitForReceipt(ctx context.Context, eth *ethclient.Client, tx * } // processTransactions is the main worker loop that processes transactions -func (w *Worker) processTransactions(ctx context.Context, client *http.Client) error { +func (w *Worker) processTransactions(ctx context.Context, client *ethclient.Client) error { for ctx.Err() == nil { // Apply rate limiting before getting the next transaction - if w.limiter != nil { - if !w.limiter.Allow() { - continue - } + if err:=w.limiter.Wait(ctx); err!=nil { + return err } tx, err := utils.Recv(ctx, w.txChan) @@ -255,7 +269,7 @@ func (w *Worker) processTransactions(ctx context.Context, client *http.Client) e } // sendTransaction sends a single transaction to the endpoint -func (w *Worker) sendTransaction(ctx context.Context, client *http.Client, tx *types.LoadTx) (_err error) { +func (w *Worker) sendTransaction(ctx context.Context, client *ethclient.Client, tx *types.LoadTx) (_err error) { ctx, span := tracer.Start(ctx, "sender.send_tx", trace.WithAttributes( attribute.String("seiload.scenario", tx.Scenario.Name), attribute.String("seiload.endpoint", w.endpoint), @@ -282,53 +296,15 @@ func (w *Worker) sendTransaction(ctx context.Context, client *http.Client, tx *t return utils.Sleep(ctx, 10*time.Microsecond) // Much faster simulation } - // Create HTTP request with JSON-RPC payload - req, err := http.NewRequestWithContext(ctx, "POST", w.endpoint, bytes.NewReader(tx.JSONRPCPayload)) - if err != nil { - return fmt.Errorf("Worker %d: Failed to create request: %w", w.id, err) - } - - // Set headers for JSON-RPC - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Accept", "application/json") - - // Send the request - resp, err := client.Do(req) - if err != nil { + // Send through go-ethereum so the same code path supports both HTTP(S) and WS(S) RPC. + if err := client.SendTransaction(ctx, tx.EthTx); err != nil { txsRejected.Add(ctx, 1, metric.WithAttributes( attribute.String("endpoint", w.endpoint), attribute.String("scenario", tx.Scenario.Name), - attribute.String("reason", "transport"), + attribute.String("reason", "rpc"), )) return fmt.Errorf("Worker %d: Failed to send transaction: %w", w.id, err) } - defer func() { - // Limit read to prevent memory issues with large responses - _, err = io.CopyN(io.Discard, resp.Body, 64*1024) // Read up to 64KB - if err != nil && err != io.EOF { - log.Printf("Worker %d: Failed to read response body: %v", w.id, err) - // Log but don't fail - this is just for connection reuse - } - - // Close response body and handle error - if closeErr := resp.Body.Close(); closeErr != nil { - log.Printf("Worker %d: Failed to close response body: %v", w.id, closeErr) - } - }() - - // Check response status - if resp.StatusCode != http.StatusOK { - httpErrors.Add(ctx, 1, metric.WithAttributes( - attribute.Int("status_code", resp.StatusCode), - attribute.String("endpoint", w.endpoint), - )) - txsRejected.Add(ctx, 1, metric.WithAttributes( - attribute.String("endpoint", w.endpoint), - attribute.String("scenario", tx.Scenario.Name), - attribute.String("reason", "http_status"), - )) - return fmt.Errorf("Worker %d: HTTP error %d for transaction to %s", w.id, resp.StatusCode, w.endpoint) - } txsAccepted.Add(ctx, 1, metric.WithAttributes( attribute.String("endpoint", w.endpoint), diff --git a/sender/worker_test.go b/sender/worker_test.go index cc55ec3..c9f30c9 100644 --- a/sender/worker_test.go +++ b/sender/worker_test.go @@ -1,10 +1,14 @@ package sender import ( + "context" "net/http" + "net/http/httptest" + "strings" "testing" "time" + "github.com/ethereum/go-ethereum/rpc" "github.com/stretchr/testify/require" ) @@ -48,3 +52,33 @@ func TestNewHttpClient_Smoke(t *testing.T) { _, isBareTransport := c.Transport.(*http.Transport) require.False(t, isBareTransport, "Transport should be wrapped by otelhttp, not bare *http.Transport") } + +func TestNewRPCClient_HTTP(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + client, err := newRPCClient(context.Background(), srv.URL) + require.NoError(t, err) + require.NotNil(t, client) + client.Close() +} + +func TestNewRPCClient_WS(t *testing.T) { + srv := rpc.NewServer() + ts := httptest.NewServer(srv.WebsocketHandler([]string{"*"})) + defer ts.Close() + + wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + client, err := newRPCClient(context.Background(), wsURL) + require.NoError(t, err) + require.NotNil(t, client) + client.Close() +} + +func TestNewRPCClient_UnsupportedScheme(t *testing.T) { + client, err := newRPCClient(context.Background(), "ftp://example.com") + require.Error(t, err) + require.Nil(t, client) +} From 03d7f02e3aafcd6a028978306e303040c9f2d214 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Fri, 12 Jun 2026 15:41:02 +0200 Subject: [PATCH 2/5] WIP --- main.go | 19 ++++--- sender/metrics.go | 17 +++---- sender/sharded_sender.go | 88 +++++++------------------------- sender/worker.go | 106 ++++++++++++++++----------------------- 4 files changed, 77 insertions(+), 153 deletions(-) diff --git a/main.go b/main.go index e89257e..a9e1d89 100644 --- a/main.go +++ b/main.go @@ -256,23 +256,22 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { } // Create the sender from the config struct - snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers, sharedLimiter) - if err != nil { - return fmt.Errorf("failed to create sender: %w", err) - } - // Enable dry-run mode in sender if specified if settings.DryRun { - snd.SetDryRun(true) + cfg.Settings.DryRun = true } if settings.Debug { - snd.SetDebug(true) + cfg.Settings.Debug = true } if settings.TrackReceipts { - snd.SetTrackReceipts(true) + cfg.Settings.TrackReceipts = true } if settings.TrackBlocks { - snd.SetTrackBlocks(true) + cfg.Settings.TrackBlocks = true + } + snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers, sharedLimiter) + if err != nil { + return fmt.Errorf("failed to create sender: %w", err) } // Set statistics collector for sender and its workers @@ -322,7 +321,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { if settings.TxsDir == "" { // Start the sender (starts all workers) s.SpawnBgNamed("sender", func() error { return snd.Run(ctx) }) - log.Printf("✅ Connected to %d endpoints", snd.GetNumShards()) + log.Printf("✅ Connected to %d endpoints", snd.NumShards()) } // Perform prewarming if enabled (before starting logger to avoid logging prewarm transactions) if settings.Prewarm { diff --git a/sender/metrics.go b/sender/metrics.go index ab9d39d..93888fe 100644 --- a/sender/metrics.go +++ b/sender/metrics.go @@ -29,11 +29,6 @@ var ( metric.WithUnit("s"), metric.WithExplicitBucketBoundaries(0.1, 0.2, 0.3, 0.5, 1.0, 2.0, 3.0, 5.0, 10.0, 20.0))) - httpErrors = must(meter.Int64Counter( - "http_errors", - metric.WithDescription("HTTP error responses from the target endpoint, by status code"), - metric.WithUnit("{errors}"))) - txsAccepted = must(meter.Int64Counter( "txs_accepted", metric.WithDescription("Transactions successfully submitted to an endpoint"), @@ -57,10 +52,10 @@ func init() { meteredChainWorkers.lock.RLock() defer meteredChainWorkers.lock.RUnlock() for _, worker := range meteredChainWorkers.workers { - observer.Observe(int64(worker.GetChannelLength()), metric.WithAttributes( - attribute.String("endpoint", worker.GetEndpoint()), - attribute.Int("worker_id", worker.id), - attribute.String("chain_id", worker.seiChainID), + observer.Observe(int64(worker.ChannelLength()), metric.WithAttributes( + attribute.String("endpoint", worker.Endpoint()), + attribute.Int("worker_id", worker.cfg.ID), + attribute.String("chain_id", worker.cfg.SeiChainID), )) } return nil @@ -91,8 +86,8 @@ func meterWorkerQueueLength(worker *Worker) { meteredChainWorkers.lock.Lock() defer meteredChainWorkers.lock.Unlock() id := chainWorkerID{ - workerID: worker.id, - chainID: worker.seiChainID, + workerID: worker.cfg.ID, + chainID: worker.cfg.SeiChainID, } if _, exists := meteredChainWorkers.workers[id]; !exists { meteredChainWorkers.workers[id] = worker diff --git a/sender/sharded_sender.go b/sender/sharded_sender.go index a285106..b67c057 100644 --- a/sender/sharded_sender.go +++ b/sender/sharded_sender.go @@ -16,41 +16,36 @@ import ( // ShardedSender implements TxSender with multiple workers, one per endpoint type ShardedSender struct { workers []*Worker - numShards int - bufferSize int - dryRun bool - debug bool mu sync.RWMutex collector *stats.Collector logger *stats.Logger } // NewShardedSender creates a new sharded sender with workers for each endpoint -func NewShardedSender(cfg *config.LoadConfig, bufferSize int, workers int, limiter *rate.Limiter) (*ShardedSender, error) { +func NewShardedSender(cfg *config.LoadConfig, bufferSize int, tasksPerWorker int, limiter *rate.Limiter) (*ShardedSender, error) { if len(cfg.Endpoints) == 0 { return nil, fmt.Errorf("no endpoints configured") } - workerList := make([]*Worker, len(cfg.Endpoints)) + workers := make([]*Worker, len(cfg.Endpoints)) for i, endpoint := range cfg.Endpoints { - workerList[i] = NewWorker(i, cfg.SeiChainID, endpoint, bufferSize, workers, limiter) + workers[i] = NewWorker(&WorkerConfig { + ID: i, + SeiChainID: cfg.SeiChainID, + Endpoint: endpoint, + BufferSize: bufferSize, + Tasks: tasksPerWorker, + }, limiter) } - return &ShardedSender{ - workers: workerList, - numShards: len(cfg.Endpoints), - bufferSize: bufferSize, - }, nil + return &ShardedSender{workers: workers}, nil } // Start initializes and starts all workers func (s *ShardedSender) Run(ctx context.Context) error { - s.mu.Lock() - workers := s.workers - s.mu.Unlock() - return service.Run(ctx, func(ctx context.Context, s service.Scope) error { - for _, worker := range workers { - s.Spawn(func() error { return worker.Run(ctx) }) + return service.Run(ctx, func(ctx context.Context, scope service.Scope) error { + for _, worker := range s.workers { + scope.Spawn(func() error { return worker.Run(ctx) }) } return nil }) @@ -59,27 +54,19 @@ func (s *ShardedSender) Run(ctx context.Context) error { // Send implements TxSender interface - calculates shard ID and routes to appropriate worker func (s *ShardedSender) Send(ctx context.Context, tx *types.LoadTx) error { // Calculate shard ID based on the transaction - shardID := tx.ShardID(s.numShards) - + shardID := tx.ShardID(len(s.workers)) // Send to the appropriate worker - s.mu.RLock() - worker := s.workers[shardID] - s.mu.RUnlock() - - return worker.Send(ctx, tx) + return s.workers[shardID].Send(ctx, tx) } // GetWorkerStats returns statistics for all workers func (s *ShardedSender) GetWorkerStats() []WorkerStats { - s.mu.RLock() - defer s.mu.RUnlock() - stats := make([]WorkerStats, len(s.workers)) for i, worker := range s.workers { stats[i] = WorkerStats{ WorkerID: i, - Endpoint: worker.GetEndpoint(), - ChannelLength: worker.GetChannelLength(), + Endpoint: worker.Endpoint(), + ChannelLength: worker.ChannelLength(), } } return stats @@ -93,46 +80,7 @@ type WorkerStats struct { } // GetNumShards returns the number of shards (workers) -func (s *ShardedSender) GetNumShards() int { - return s.numShards -} - -// SetDryRun sets the dry-run flag for the sender and its workers -func (s *ShardedSender) SetDryRun(dryRun bool) { - s.mu.Lock() - defer s.mu.Unlock() - - s.dryRun = dryRun - for _, worker := range s.workers { - worker.SetDryRun(dryRun) - } -} - -func (s *ShardedSender) SetDebug(debug bool) { - s.mu.Lock() - defer s.mu.Unlock() - - s.debug = debug - for _, worker := range s.workers { - worker.SetDebug(debug) - } -} - -// SetTrackReceipts sets the track-receipts flag for the sender and its workers -func (s *ShardedSender) SetTrackReceipts(trackReceipts bool) { - s.mu.Lock() - defer s.mu.Unlock() - - for _, worker := range s.workers { - worker.SetTrackReceipts(trackReceipts) - } -} - -// SetTrackBlocks sets the track-blocks flag (placeholder - blocks are tracked separately) -func (s *ShardedSender) SetTrackBlocks(trackBlocks bool) { - // Block tracking is handled by the BlockCollector, not the sender - // This method exists for consistency with the CLI interface -} +func (s *ShardedSender) NumShards() int { return len(s.workers) } // SetStatsCollector sets the statistics collector for all workers func (s *ShardedSender) SetStatsCollector(collector *stats.Collector, logger *stats.Logger) { diff --git a/sender/worker.go b/sender/worker.go index 75aedf7..5e43f0a 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -28,19 +28,24 @@ import ( var tracer = otel.Tracer("github.com/sei-protocol/sei-load/sender") +type WorkerConfig struct { + ID int + SeiChainID string + Endpoint string + BufferSize int + Tasks int + DryRun bool + Debug bool + TrackReceipts bool +} + // Worker handles sending transactions to a specific endpoint type Worker struct { - id int - seiChainID string - endpoint string + cfg *WorkerConfig txChan chan *types.LoadTx sentTxs chan *types.LoadTx - dryRun bool - debug bool collector *stats.Collector logger *stats.Logger - workers int - trackReceipts bool limiter *rate.Limiter // Shared rate limiter for transaction sending } @@ -114,15 +119,11 @@ func newRPCClient(ctx context.Context, endpoint string, opts ...HttpClientOption } // NewWorker creates a new worker for a specific endpoint -func NewWorker(id int, seiChainID string, endpoint string, bufferSize int, workers int, limiter *rate.Limiter) *Worker { +func NewWorker(cfg *WorkerConfig, limiter *rate.Limiter) *Worker { w := &Worker{ - id: id, - seiChainID: seiChainID, - endpoint: endpoint, - txChan: make(chan *types.LoadTx, bufferSize), - sentTxs: make(chan *types.LoadTx, bufferSize), - workers: workers, - trackReceipts: false, + cfg: cfg, + txChan: make(chan *types.LoadTx, cfg.BufferSize), + sentTxs: make(chan *types.LoadTx, cfg.BufferSize), limiter: limiter, } meterWorkerQueueLength(w) @@ -138,15 +139,15 @@ func (w *Worker) SetStatsCollector(collector *stats.Collector, logger *stats.Log // Start begins the worker's processing loop func (w *Worker) Run(ctx context.Context) error { return service.Run(ctx, func(ctx context.Context, s service.Scope) error { - client, err := newRPCClient(ctx, w.endpoint) + client, err := newRPCClient(ctx, w.cfg.Endpoint) if err != nil { - return fmt.Errorf("dial %s: %w", w.endpoint, err) + return fmt.Errorf("dial %s: %w", w.cfg.Endpoint, err) } defer client.Close() - // Start multiple worker goroutines that share the same channel and RPC client. - for range w.workers { - s.Spawn(func() error { return w.processTransactions(ctx, client) }) + // Start multiple goroutines that share the same channel and RPC client. + for range w.cfg.Tasks { + s.Spawn(func() error { return w.runTxSender(ctx, client) }) } return w.watchTransactions(ctx, client) }) @@ -157,23 +158,8 @@ func (w *Worker) Send(ctx context.Context, tx *types.LoadTx) error { return utils.Send(ctx, w.txChan, tx) } -// SetDebug sets the dry-run mode for the worker -func (w *Worker) SetDebug(debug bool) { - w.debug = debug -} - -// SetDryRun sets the dry-run mode for the worker -func (w *Worker) SetDryRun(dryRun bool) { - w.dryRun = dryRun -} - -// SetTrackReceipts sets the track-receipts mode for the worker -func (w *Worker) SetTrackReceipts(trackReceipts bool) { - w.trackReceipts = trackReceipts -} - func (w *Worker) watchTransactions(ctx context.Context, eth *ethclient.Client) error { - if w.dryRun || !w.trackReceipts { + if w.cfg.DryRun || !w.cfg.TrackReceipts { return nil } for ctx.Err() == nil { @@ -194,9 +180,9 @@ func (w *Worker) watchTransactions(ctx context.Context, eth *ethclient.Client) e func (w *Worker) waitForReceipt(ctx context.Context, eth *ethclient.Client, tx *types.LoadTx) (_err error) { ctx, span := tracer.Start(ctx, "sender.check_receipt", trace.WithAttributes( attribute.String("seiload.scenario", tx.Scenario.Name), - attribute.String("seiload.endpoint", w.endpoint), - attribute.Int("seiload.worker_id", w.id), - attribute.String("seiload.chain_id", w.seiChainID), + attribute.String("seiload.endpoint", w.cfg.Endpoint), + attribute.Int("seiload.worker_id", w.cfg.ID), + attribute.String("seiload.chain_id", w.cfg.SeiChainID), )) defer func(start time.Time) { if _err != nil { @@ -208,8 +194,8 @@ func (w *Worker) waitForReceipt(ctx context.Context, eth *ethclient.Client, tx * receiptLatency.Record(ctx, time.Since(start).Seconds(), metric.WithAttributes( attribute.String("scenario", tx.Scenario.Name), - attribute.String("endpoint", w.endpoint), - attribute.String("chain_id", w.seiChainID), + attribute.String("endpoint", w.cfg.Endpoint), + attribute.String("chain_id", w.cfg.SeiChainID), statusAttrFromError(_err)), ) }(time.Now()) @@ -231,7 +217,7 @@ func (w *Worker) waitForReceipt(ctx context.Context, eth *ethclient.Client, tx * if receipt.Status != 1 { return fmt.Errorf("tx %s failed", tx.EthTx.Hash().Hex()) } - if w.debug { + if w.cfg.Debug { log.Printf("✅ tx %s, %s, gas=%d succeeded\n", tx.Scenario.Name, tx.EthTx.Hash().Hex(), receipt.GasUsed) } return nil @@ -239,8 +225,8 @@ func (w *Worker) waitForReceipt(ctx context.Context, eth *ethclient.Client, tx * return ctx.Err() } -// processTransactions is the main worker loop that processes transactions -func (w *Worker) processTransactions(ctx context.Context, client *ethclient.Client) error { +// runTxSender is the main worker loop that processes transactions +func (w *Worker) runTxSender(ctx context.Context, client *ethclient.Client) error { for ctx.Err() == nil { // Apply rate limiting before getting the next transaction if err:=w.limiter.Wait(ctx); err!=nil { @@ -259,7 +245,7 @@ func (w *Worker) processTransactions(ctx context.Context, client *ethclient.Clie err = w.sendTransaction(ctx, client, tx) // Record statistics if collector is available if w.collector != nil { - w.collector.RecordTransaction(tx.Scenario.Name, w.endpoint, time.Since(startTime), err == nil) + w.collector.RecordTransaction(tx.Scenario.Name, w.cfg.Endpoint, time.Since(startTime), err == nil) } if err != nil { log.Printf("%v", err) @@ -272,9 +258,9 @@ func (w *Worker) processTransactions(ctx context.Context, client *ethclient.Clie func (w *Worker) sendTransaction(ctx context.Context, client *ethclient.Client, tx *types.LoadTx) (_err error) { ctx, span := tracer.Start(ctx, "sender.send_tx", trace.WithAttributes( attribute.String("seiload.scenario", tx.Scenario.Name), - attribute.String("seiload.endpoint", w.endpoint), - attribute.Int("seiload.worker_id", w.id), - attribute.String("seiload.chain_id", w.seiChainID), + attribute.String("seiload.endpoint", w.cfg.Endpoint), + attribute.Int("seiload.worker_id", w.cfg.ID), + attribute.String("seiload.chain_id", w.cfg.SeiChainID), )) defer func(start time.Time) { if _err != nil { @@ -285,12 +271,12 @@ func (w *Worker) sendTransaction(ctx context.Context, client *ethclient.Client, sendLatency.Record(ctx, time.Since(start).Seconds(), metric.WithAttributes( attribute.String("scenario", tx.Scenario.Name), - attribute.String("endpoint", w.endpoint), - attribute.String("chain_id", w.seiChainID), + attribute.String("endpoint", w.cfg.Endpoint), + attribute.String("chain_id", w.cfg.SeiChainID), statusAttrFromError(_err)), ) }(time.Now()) - if w.dryRun { + if w.cfg.DryRun { // In dry-run mode, simulate processing time and mark as successful // Use very minimal delay to avoid channel overflow return utils.Sleep(ctx, 10*time.Microsecond) // Much faster simulation @@ -299,15 +285,15 @@ func (w *Worker) sendTransaction(ctx context.Context, client *ethclient.Client, // Send through go-ethereum so the same code path supports both HTTP(S) and WS(S) RPC. if err := client.SendTransaction(ctx, tx.EthTx); err != nil { txsRejected.Add(ctx, 1, metric.WithAttributes( - attribute.String("endpoint", w.endpoint), + attribute.String("endpoint", w.cfg.Endpoint), attribute.String("scenario", tx.Scenario.Name), attribute.String("reason", "rpc"), )) - return fmt.Errorf("Worker %d: Failed to send transaction: %w", w.id, err) + return fmt.Errorf("Worker %d: Failed to send transaction: %w", w.cfg.ID, err) } txsAccepted.Add(ctx, 1, metric.WithAttributes( - attribute.String("endpoint", w.endpoint), + attribute.String("endpoint", w.cfg.Endpoint), attribute.String("scenario", tx.Scenario.Name), )) @@ -319,13 +305,9 @@ func (w *Worker) sendTransaction(ctx context.Context, client *ethclient.Client, return nil } -// GetChannelLength returns the current length of the worker's channel (for monitoring). +// ChannelLength returns the current length of the worker's channel (for monitoring). // This function is safe for concurrent calls. -func (w *Worker) GetChannelLength() int { - return len(w.txChan) -} +func (w *Worker) ChannelLength() int { return len(w.txChan) } -// GetEndpoint returns the worker's endpoint -func (w *Worker) GetEndpoint() string { - return w.endpoint -} +// Endpoint returns the worker's endpoint +func (w *Worker) Endpoint() string { return w.cfg.Endpoint } From 6c14f3c3ce22eab41573276824ba795d8bed77b0 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Fri, 12 Jun 2026 16:37:58 +0200 Subject: [PATCH 3/5] WIP --- config/settings.go | 110 ++++++++++++++++++------------------ config/settings_test.go | 26 ++++----- main.go | 105 ++++++++++++++-------------------- observability/setup_test.go | 14 ++--- profiles/profiles_test.go | 2 +- sender/sharded_sender.go | 36 ++++-------- sender/worker.go | 51 +++++++---------- utils/channels.go | 7 +-- 8 files changed, 150 insertions(+), 201 deletions(-) diff --git a/config/settings.go b/config/settings.go index 5f31dcd..2c4e2bb 100644 --- a/config/settings.go +++ b/config/settings.go @@ -12,19 +12,19 @@ import ( // Settings holds all CLI-configurable parameters type Settings struct { - Workers int `json:"workers,omitempty"` - TPS float64 `json:"tps,omitempty"` - StatsInterval Duration `json:"statsInterval,omitempty"` - BufferSize int `json:"bufferSize,omitempty"` - DryRun bool `json:"dryRun,omitempty"` - Debug bool `json:"debug,omitempty"` - TrackReceipts bool `json:"trackReceipts,omitempty"` - TrackBlocks bool `json:"trackBlocks,omitempty"` - TrackUserLatency bool `json:"trackUserLatency,omitempty"` - Prewarm bool `json:"prewarm,omitempty"` - RampUp bool `json:"rampUp,omitempty"` - ReportPath string `json:"reportPath,omitempty"` - TxsDir string `json:"txsDir,omitempty"` + TasksPerEndpoint int `json:"workers,omitempty"` + TPS float64 `json:"tps,omitempty"` + StatsInterval Duration `json:"statsInterval,omitempty"` + BufferSize int `json:"bufferSize,omitempty"` + DryRun bool `json:"dryRun,omitempty"` + Debug bool `json:"debug,omitempty"` + TrackReceipts bool `json:"trackReceipts,omitempty"` + TrackBlocks bool `json:"trackBlocks,omitempty"` + TrackUserLatency bool `json:"trackUserLatency,omitempty"` + Prewarm bool `json:"prewarm,omitempty"` + RampUp bool `json:"rampUp,omitempty"` + ReportPath string `json:"reportPath,omitempty"` + TxsDir string `json:"txsDir,omitempty"` TargetGas uint64 `json:"targetGas,omitempty"` NumBlocksToWrite int `json:"numBlocksToWrite,omitempty"` PostSummaryFlushDelay Duration `json:"postSummaryFlushDelay,omitempty"` @@ -33,19 +33,19 @@ type Settings struct { // DefaultSettings returns the default configuration values func DefaultSettings() Settings { return Settings{ - Workers: 1, - TPS: 0.0, - StatsInterval: Duration(10 * time.Second), - BufferSize: 1000, - DryRun: false, - Debug: false, - TrackReceipts: false, - TrackBlocks: false, - TrackUserLatency: false, - Prewarm: false, - RampUp: false, - ReportPath: "", - TxsDir: "", + TasksPerEndpoint: 1, + TPS: 0.0, + StatsInterval: Duration(10 * time.Second), + BufferSize: 1000, + DryRun: false, + Debug: false, + TrackReceipts: false, + TrackBlocks: false, + TrackUserLatency: false, + Prewarm: false, + RampUp: false, + ReportPath: "", + TxsDir: "", TargetGas: 10_000_000, NumBlocksToWrite: 100, PostSummaryFlushDelay: Duration(25 * time.Second), @@ -56,19 +56,19 @@ func DefaultSettings() Settings { func InitializeViper(cmd *cobra.Command) error { // Bind flags to viper with error checking flagBindings := map[string]string{ - "statsInterval": "stats-interval", - "bufferSize": "buffer-size", - "tps": "tps", - "dryRun": "dry-run", - "debug": "debug", - "trackReceipts": "track-receipts", - "trackBlocks": "track-blocks", - "prewarm": "prewarm", - "trackUserLatency": "track-user-latency", - "workers": "workers", - "rampUp": "ramp-up", - "reportPath": "report-path", - "txsDir": "txs-dir", + "statsInterval": "stats-interval", + "bufferSize": "buffer-size", + "tps": "tps", + "dryRun": "dry-run", + "debug": "debug", + "trackReceipts": "track-receipts", + "trackBlocks": "track-blocks", + "prewarm": "prewarm", + "trackUserLatency": "track-user-latency", + "workers": "workers", + "rampUp": "ramp-up", + "reportPath": "report-path", + "txsDir": "txs-dir", "targetGas": "target-gas", "numBlocksToWrite": "num-blocks-to-write", "postSummaryFlushDelay": "post-summary-flush-delay", @@ -91,7 +91,7 @@ func InitializeViper(cmd *cobra.Command) error { viper.SetDefault("trackBlocks", defaults.TrackBlocks) viper.SetDefault("prewarm", defaults.Prewarm) viper.SetDefault("trackUserLatency", defaults.TrackUserLatency) - viper.SetDefault("workers", defaults.Workers) + viper.SetDefault("workers", defaults.TasksPerEndpoint) viper.SetDefault("rampUp", defaults.RampUp) viper.SetDefault("reportPath", defaults.ReportPath) viper.SetDefault("txsDir", defaults.TxsDir) @@ -122,21 +122,21 @@ func LoadSettings(settings *Settings) error { } // ResolveSettings gets the final resolved settings from Viper -func ResolveSettings() Settings { - return Settings{ - Workers: viper.GetInt("workers"), - TPS: viper.GetFloat64("tps"), - StatsInterval: Duration(viper.GetDuration("statsInterval")), - BufferSize: viper.GetInt("bufferSize"), - DryRun: viper.GetBool("dryRun"), - Debug: viper.GetBool("debug"), - TrackReceipts: viper.GetBool("trackReceipts"), - TrackBlocks: viper.GetBool("trackBlocks"), - TrackUserLatency: viper.GetBool("trackUserLatency"), - Prewarm: viper.GetBool("prewarm"), - RampUp: viper.GetBool("rampUp"), - ReportPath: viper.GetString("reportPath"), - TxsDir: viper.GetString("txsDir"), +func ResolveSettings() *Settings { + return &Settings{ + TasksPerEndpoint: viper.GetInt("workers"), + TPS: viper.GetFloat64("tps"), + StatsInterval: Duration(viper.GetDuration("statsInterval")), + BufferSize: viper.GetInt("bufferSize"), + DryRun: viper.GetBool("dryRun"), + Debug: viper.GetBool("debug"), + TrackReceipts: viper.GetBool("trackReceipts"), + TrackBlocks: viper.GetBool("trackBlocks"), + TrackUserLatency: viper.GetBool("trackUserLatency"), + Prewarm: viper.GetBool("prewarm"), + RampUp: viper.GetBool("rampUp"), + ReportPath: viper.GetString("reportPath"), + TxsDir: viper.GetString("txsDir"), TargetGas: viper.GetUint64("targetGas"), NumBlocksToWrite: viper.GetInt("numBlocksToWrite"), PostSummaryFlushDelay: Duration(viper.GetDuration("postSummaryFlushDelay")), diff --git a/config/settings_test.go b/config/settings_test.go index 8773159..4495c8c 100644 --- a/config/settings_test.go +++ b/config/settings_test.go @@ -125,19 +125,19 @@ func TestDefaultSettings(t *testing.T) { defaults := DefaultSettings() expected := Settings{ - Workers: 1, - TPS: 0.0, - StatsInterval: Duration(10 * time.Second), - BufferSize: 1000, - DryRun: false, - Debug: false, - TrackReceipts: false, - TrackBlocks: false, - TrackUserLatency: false, - Prewarm: false, - RampUp: false, - ReportPath: "", - TxsDir: "", + Workers: 1, + TPS: 0.0, + StatsInterval: Duration(10 * time.Second), + BufferSize: 1000, + DryRun: false, + Debug: false, + TrackReceipts: false, + TrackBlocks: false, + TrackUserLatency: false, + Prewarm: false, + RampUp: false, + ReportPath: "", + TxsDir: "", TargetGas: 10_000_000, NumBlocksToWrite: 100, PostSummaryFlushDelay: Duration(25 * time.Second), diff --git a/main.go b/main.go index a9e1d89..8502aea 100644 --- a/main.go +++ b/main.go @@ -45,10 +45,8 @@ to multiple endpoints with account pooling management. Use --dry-run to test configuration and view transaction details without actually sending requests or deploying contracts.`, - Run: func(cmd *cobra.Command, args []string) { - if err := runLoadTest(context.Background(), cmd, args); err != nil { - log.Fatal(err) - } + RunE: func(cmd *cobra.Command, args []string) error { + return runLoadTest(cmd.Context(), cmd) }, } @@ -94,7 +92,7 @@ func main() { } } -func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { +func runLoadTest(ctx context.Context, cmd *cobra.Command) error { // Parse the config file into a config.LoadConfig struct cfg, err := loadConfig(configFile) if err != nil { @@ -107,7 +105,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { } // Get resolved settings from the config package - settings := config.ResolveSettings() + cfg.Settings = config.ResolveSettings() // Handle --nodes flag to limit number of endpoints nodes, _ := cmd.Flags().GetInt("nodes") @@ -115,39 +113,38 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { log.Printf("🔧 Limiting endpoints from %d to %d nodes", len(cfg.Endpoints), nodes) cfg.Endpoints = cfg.Endpoints[:nodes] } + // Enable mock deployment in dry-run mode + if cfg.Settings.DryRun { + cfg.MockDeploy = true + } log.Printf("🚀 Starting Sei Chain Load Test v2") log.Printf("📁 Config file: %s", configFile) log.Printf("🎯 Endpoints: %d", len(cfg.Endpoints)) - log.Printf("👥 Workers per endpoint: %d", settings.Workers) - log.Printf("🔧 Total workers: %d", len(cfg.Endpoints)*settings.Workers) + log.Printf("👥 Tasks per endpoint: %d", cfg.Settings.TasksPerEndpoint) + log.Printf("🔧 Total tasks: %d", len(cfg.Endpoints)*cfg.Settings.TasksPerEndpoint) log.Printf("📊 Scenarios: %d", len(cfg.Scenarios)) - log.Printf("⏱️ Stats interval: %v", settings.StatsInterval.ToDuration()) - log.Printf("📦 Buffer size per worker: %d", settings.BufferSize) - if settings.TPS > 0 { - log.Printf("📈 Transactions per second: %.2f", settings.TPS) + log.Printf("⏱️ Stats interval: %v", cfg.Settings.StatsInterval.ToDuration()) + log.Printf("📦 Buffer size per worker: %d", cfg.Settings.BufferSize) + if cfg.Settings.TPS > 0 { + log.Printf("📈 Transactions per second: %.2f", cfg.Settings.TPS) } - if settings.DryRun { + if cfg.Settings.DryRun { log.Printf("📝 Dry run: enabled") } - if settings.TrackReceipts { + if cfg.Settings.TrackReceipts { log.Printf("📝 Track receipts: enabled") } - if settings.TrackBlocks { + if cfg.Settings.TrackBlocks { log.Printf("📝 Track blocks: enabled") } - if settings.Prewarm { + if cfg.Settings.Prewarm { log.Printf("📝 Prewarm: enabled") } - if settings.TrackUserLatency { + if cfg.Settings.TrackUserLatency { log.Printf("📝 Track user latency: enabled") } - // Enable mock deployment in dry-run mode - if settings.DryRun { - cfg.MockDeploy = true - } - listenAddr := cmd.Flag("metricsListenAddr").Value.String() log.Printf("serving metrics at %s/metrics", listenAddr) @@ -203,7 +200,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { // Create statistics collector and logger collector := stats.NewCollector() - logger := stats.NewLogger(collector, settings.StatsInterval.ToDuration(), settings.ReportPath, settings.Debug) + logger := stats.NewLogger(collector, cfg.Settings.StatsInterval.ToDuration(), cfg.Settings.ReportPath, cfg.Settings.Debug) var ramper *sender.Ramper err = service.Run(ctx, func(ctx context.Context, s service.Scope) error { @@ -215,9 +212,9 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { // Create shared rate limiter for all workers if TPS is specified var sharedLimiter *rate.Limiter - if settings.TPS > 0 { - sharedLimiter = rate.NewLimiter(rate.Limit(settings.TPS), 1) - log.Printf("📈 Rate limiting enabled: %.2f TPS shared across all workers", settings.TPS) + if cfg.Settings.TPS > 0 { + sharedLimiter = rate.NewLimiter(rate.Limit(cfg.Settings.TPS), 1) + log.Printf("📈 Rate limiting enabled: %.2f TPS shared across all workers", cfg.Settings.TPS) } else { // No rate limiting sharedLimiter = rate.NewLimiter(rate.Inf, 1) @@ -225,7 +222,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { // Create and start block collector if endpoints are available var blockCollector *stats.BlockCollector - if len(cfg.Endpoints) > 0 && settings.TrackBlocks { + if len(cfg.Endpoints) > 0 && cfg.Settings.TrackBlocks { blockCollector = stats.NewBlockCollector(cfg.SeiChainID) collector.SetBlockCollector(blockCollector) s.SpawnBgNamed("block collector", func() error { @@ -233,7 +230,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { }) } - if settings.RampUp { + if cfg.Settings.RampUp { ramperBlockCollector := stats.NewBlockCollector(cfg.SeiChainID) s.SpawnBgNamed("ramper block collector", func() error { return ramperBlockCollector.Run(ctx, cfg.Endpoints[0]) @@ -248,38 +245,22 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { } // Create and start user latency tracker if endpoints are available - if len(cfg.Endpoints) > 0 && settings.TrackUserLatency { - userLatencyTracker := stats.NewUserLatencyTracker(settings.StatsInterval.ToDuration()) + if len(cfg.Endpoints) > 0 && cfg.Settings.TrackUserLatency { + userLatencyTracker := stats.NewUserLatencyTracker(cfg.Settings.StatsInterval.ToDuration()) s.SpawnBgNamed("user latency tracker", func() error { return userLatencyTracker.Run(ctx, cfg.Endpoints[0]) }) } // Create the sender from the config struct - // Enable dry-run mode in sender if specified - if settings.DryRun { - cfg.Settings.DryRun = true - } - if settings.Debug { - cfg.Settings.Debug = true - } - if settings.TrackReceipts { - cfg.Settings.TrackReceipts = true - } - if settings.TrackBlocks { - cfg.Settings.TrackBlocks = true - } - snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers, sharedLimiter) + snd, err := sender.NewShardedSender(cfg, sharedLimiter, collector) if err != nil { return fmt.Errorf("failed to create sender: %w", err) } - // Set statistics collector for sender and its workers - snd.SetStatsCollector(collector, logger) - // Fund the pool before prewarm/dispatch — both spend gas the accounts // don't have until funded. - if cfg.Funding != nil && !settings.DryRun { + if cfg.Funding != nil && !cfg.Settings.DryRun { if err := funder.FundAccounts(ctx, cfg, gen.GetAccountPools()); err != nil { return fmt.Errorf("failed to fund accounts: %w", err) } @@ -287,7 +268,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { // Create dispatcher var dispatcher *sender.Dispatcher - if settings.TxsDir != "" { + if cfg.Settings.TxsDir != "" { // get latest height ethclient, err := ethclient.Dial(cfg.Endpoints[0]) if err != nil { @@ -297,10 +278,10 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { if err != nil { return fmt.Errorf("failed to get latest height: %w", err) } - numBlocksToWrite := settings.NumBlocksToWrite + numBlocksToWrite := cfg.Settings.NumBlocksToWrite writerHeight := latestHeight + 10 // some buffer log.Printf("🔍 Latest height: %d, writer start height: %d", latestHeight, writerHeight) - writer := sender.NewTxsWriter(settings.TargetGas, settings.TxsDir, writerHeight, uint64(numBlocksToWrite)) + writer := sender.NewTxsWriter(cfg.Settings.TargetGas, cfg.Settings.TxsDir, writerHeight, uint64(numBlocksToWrite)) dispatcher = sender.NewDispatcher(gen, writer) } else { dispatcher = sender.NewDispatcher(gen, snd) @@ -310,7 +291,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { dispatcher.SetStatsCollector(collector) // Set up prewarming if enabled - if settings.Prewarm { + if cfg.Settings.Prewarm { log.Printf("🔥 Creating prewarm generator...") prewarmGen := generator.NewPrewarmGenerator(cfg, gen) dispatcher.SetPrewarmGenerator(prewarmGen) @@ -318,13 +299,13 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { log.Printf("📝 Prewarm mode: Accounts will be prewarmed") } - if settings.TxsDir == "" { + if cfg.Settings.TxsDir == "" { // Start the sender (starts all workers) s.SpawnBgNamed("sender", func() error { return snd.Run(ctx) }) log.Printf("✅ Connected to %d endpoints", snd.NumShards()) } // Perform prewarming if enabled (before starting logger to avoid logging prewarm transactions) - if settings.Prewarm { + if cfg.Settings.Prewarm { if err := dispatcher.Prewarm(ctx); err != nil { return fmt.Errorf("failed to prewarm accounts: %w", err) } @@ -342,20 +323,20 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - log.Printf("📈 Logging statistics every %v (Press Ctrl+C to stop)", settings.StatsInterval.ToDuration()) - if settings.DryRun { + log.Printf("📈 Logging statistics every %v (Press Ctrl+C to stop)", cfg.Settings.StatsInterval.ToDuration()) + if cfg.Settings.DryRun { log.Printf("📝 Dry-run mode: Simulating requests without sending") } - if settings.Debug { + if cfg.Settings.Debug { log.Printf("🐛 Debug mode: Each transaction will be logged") } - if settings.TrackReceipts { + if cfg.Settings.TrackReceipts { log.Printf("📝 Track receipts mode: Receipts will be tracked") } - if settings.TrackBlocks { + if cfg.Settings.TrackBlocks { log.Printf("📝 Track blocks mode: Block data will be collected") } - if settings.TrackUserLatency { + if cfg.Settings.TrackUserLatency { log.Printf("📝 Track user latency mode: User latency will be tracked") } log.Print(strings.Repeat("=", 60)) @@ -369,11 +350,11 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { }) // Print final statistics logger.LogFinalStats() - if settings.RampUp && ramper != nil { + if cfg.Settings.RampUp && ramper != nil { ramper.LogFinalStats() } collector.EmitRunSummary(ctx) - if d := settings.PostSummaryFlushDelay.ToDuration(); d > 0 { + if d := cfg.Settings.PostSummaryFlushDelay.ToDuration(); d > 0 { log.Printf("⏳ Holding pod for post-summary scrape window (%s)...", d) time.Sleep(d) } diff --git a/observability/setup_test.go b/observability/setup_test.go index 1c4b768..f157ad0 100644 --- a/observability/setup_test.go +++ b/observability/setup_test.go @@ -47,14 +47,14 @@ func TestBuildResource_FullScope(t *testing.T) { require.NoError(t, err) want := map[string]string{ - "service.name": "seiload", - "service.version": "v1.2.3", - "service.instance.id": "seiload-abc-0", - "seiload.run_id": "run-42", - "seiload.chain_id": "autobake-42-1", - "seiload.commit_id": "deadbeefcafef00d", + "service.name": "seiload", + "service.version": "v1.2.3", + "service.instance.id": "seiload-abc-0", + "seiload.run_id": "run-42", + "seiload.chain_id": "autobake-42-1", + "seiload.commit_id": "deadbeefcafef00d", "seiload.commit_id_short": "deadbeef", - "seiload.workload": "autobake", + "seiload.workload": "autobake", } got := resourceAttrs(res.Attributes()) for k, v := range want { diff --git a/profiles/profiles_test.go b/profiles/profiles_test.go index d97cd83..aff0eb3 100644 --- a/profiles/profiles_test.go +++ b/profiles/profiles_test.go @@ -80,7 +80,7 @@ func TestProfilesAlignment(t *testing.T) { // Use a decoder with DisallowUnknownFields to catch any extra fields decoder := json.NewDecoder(strings.NewReader(string(data))) decoder.DisallowUnknownFields() - + var strictConfig config.LoadConfig if err := decoder.Decode(&strictConfig); err != nil { t.Errorf("Profile %s contains unexpected/unaligned fields: %v", file.Name(), err) diff --git a/sender/sharded_sender.go b/sender/sharded_sender.go index b67c057..e9e992e 100644 --- a/sender/sharded_sender.go +++ b/sender/sharded_sender.go @@ -3,7 +3,6 @@ package sender import ( "context" "fmt" - "sync" "golang.org/x/time/rate" @@ -15,27 +14,26 @@ import ( // ShardedSender implements TxSender with multiple workers, one per endpoint type ShardedSender struct { - workers []*Worker - mu sync.RWMutex - collector *stats.Collector - logger *stats.Logger + workers []*Worker } // NewShardedSender creates a new sharded sender with workers for each endpoint -func NewShardedSender(cfg *config.LoadConfig, bufferSize int, tasksPerWorker int, limiter *rate.Limiter) (*ShardedSender, error) { +func NewShardedSender(cfg *config.LoadConfig, limiter *rate.Limiter, collector *stats.Collector) (*ShardedSender, error) { if len(cfg.Endpoints) == 0 { return nil, fmt.Errorf("no endpoints configured") } workers := make([]*Worker, len(cfg.Endpoints)) for i, endpoint := range cfg.Endpoints { - workers[i] = NewWorker(&WorkerConfig { - ID: i, + workers[i] = NewWorker(&WorkerConfig{ + ID: i, SeiChainID: cfg.SeiChainID, - Endpoint: endpoint, - BufferSize: bufferSize, - Tasks: tasksPerWorker, - }, limiter) + Endpoint: endpoint, + BufferSize: cfg.Settings.BufferSize, + Tasks: cfg.Settings.TasksPerEndpoint, + Collector: collector, + Limiter: limiter, + }) } return &ShardedSender{workers: workers}, nil @@ -81,17 +79,3 @@ type WorkerStats struct { // GetNumShards returns the number of shards (workers) func (s *ShardedSender) NumShards() int { return len(s.workers) } - -// SetStatsCollector sets the statistics collector for all workers -func (s *ShardedSender) SetStatsCollector(collector *stats.Collector, logger *stats.Logger) { - s.mu.Lock() - defer s.mu.Unlock() - - s.collector = collector - s.logger = logger - - // Pass to all workers - for _, worker := range s.workers { - worker.SetStatsCollector(collector, logger) - } -} diff --git a/sender/worker.go b/sender/worker.go index 5e43f0a..1359ece 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -29,24 +29,23 @@ import ( var tracer = otel.Tracer("github.com/sei-protocol/sei-load/sender") type WorkerConfig struct { - ID int - SeiChainID string - Endpoint string - BufferSize int - Tasks int - DryRun bool - Debug bool + ID int + SeiChainID string + Endpoint string + BufferSize int + Tasks int + DryRun bool + Debug bool TrackReceipts bool + Collector *stats.Collector + Limiter *rate.Limiter // Shared rate limiter for transaction sending } // Worker handles sending transactions to a specific endpoint type Worker struct { - cfg *WorkerConfig - txChan chan *types.LoadTx - sentTxs chan *types.LoadTx - collector *stats.Collector - logger *stats.Logger - limiter *rate.Limiter // Shared rate limiter for transaction sending + cfg *WorkerConfig + txChan chan *types.LoadTx + sentTxs chan *types.LoadTx } // HttpClientOption configures the Transport used by newHttpClient. @@ -119,23 +118,16 @@ func newRPCClient(ctx context.Context, endpoint string, opts ...HttpClientOption } // NewWorker creates a new worker for a specific endpoint -func NewWorker(cfg *WorkerConfig, limiter *rate.Limiter) *Worker { +func NewWorker(cfg *WorkerConfig) *Worker { w := &Worker{ - cfg: cfg, - txChan: make(chan *types.LoadTx, cfg.BufferSize), - sentTxs: make(chan *types.LoadTx, cfg.BufferSize), - limiter: limiter, + cfg: cfg, + txChan: make(chan *types.LoadTx, cfg.BufferSize), + sentTxs: make(chan *types.LoadTx, cfg.BufferSize), } meterWorkerQueueLength(w) return w } -// SetStatsCollector sets the statistics collector for this worker -func (w *Worker) SetStatsCollector(collector *stats.Collector, logger *stats.Logger) { - w.collector = collector - w.logger = logger -} - // Start begins the worker's processing loop func (w *Worker) Run(ctx context.Context) error { return service.Run(ctx, func(ctx context.Context, s service.Scope) error { @@ -229,7 +221,7 @@ func (w *Worker) waitForReceipt(ctx context.Context, eth *ethclient.Client, tx * func (w *Worker) runTxSender(ctx context.Context, client *ethclient.Client) error { for ctx.Err() == nil { // Apply rate limiting before getting the next transaction - if err:=w.limiter.Wait(ctx); err!=nil { + if err := w.cfg.Limiter.Wait(ctx); err != nil { return err } @@ -244,9 +236,7 @@ func (w *Worker) runTxSender(ctx context.Context, client *ethclient.Client) erro tx.AttemptedSendTime = startTime err = w.sendTransaction(ctx, client, tx) // Record statistics if collector is available - if w.collector != nil { - w.collector.RecordTransaction(tx.Scenario.Name, w.cfg.Endpoint, time.Since(startTime), err == nil) - } + w.cfg.Collector.RecordTransaction(tx.Scenario.Name, w.cfg.Endpoint, time.Since(startTime), err == nil) if err != nil { log.Printf("%v", err) } @@ -298,10 +288,7 @@ func (w *Worker) sendTransaction(ctx context.Context, client *ethclient.Client, )) // Write to sentTxs channel without blocking - select { - case w.sentTxs <- tx: - default: - } + utils.SendOrDrop(w.sentTxs, tx) return nil } diff --git a/utils/channels.go b/utils/channels.go index 9eed500..1e11b90 100644 --- a/utils/channels.go +++ b/utils/channels.go @@ -45,13 +45,10 @@ func Send[T any](ctx context.Context, ch chan<- T, v T) error { } // SendOrDrop send a value to channel if not full or drop the item if the channel is full. -func SendOrDrop[T any](ch chan<- T, v T) error { +func SendOrDrop[T any](ch chan<- T, v T) { select { case ch <- v: - return nil - default: - // drop the item - return nil + default: // drop the item } } From 8be54222e87d90241e2b880500f7ea55b80dce43 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Fri, 12 Jun 2026 16:55:41 +0200 Subject: [PATCH 4/5] unrelated --- generator/scenario.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/generator/scenario.go b/generator/scenario.go index d286118..03b3b2f 100644 --- a/generator/scenario.go +++ b/generator/scenario.go @@ -13,8 +13,7 @@ type scenarioGenerator struct { mu sync.RWMutex } -func NewScenarioGenerator(accounts types.AccountPool, - txg scenarios.TxGenerator) Generator { +func NewScenarioGenerator(accounts types.AccountPool, txg scenarios.TxGenerator) Generator { return &scenarioGenerator{ scenario: txg, accountPool: accounts, @@ -23,7 +22,7 @@ func NewScenarioGenerator(accounts types.AccountPool, func (g *scenarioGenerator) GenerateN(n int) []*types.LoadTx { result := make([]*types.LoadTx, 0, n) - for i := 0; i < n; i++ { + for range n { if tx, ok := g.Generate(); ok { result = append(result, tx) } else { From 1680c9934c78aecb91e07e0060a48909035f8f89 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Fri, 12 Jun 2026 17:03:18 +0200 Subject: [PATCH 5/5] applied comments --- config/settings_test.go | 4 ++-- profiles/profiles_test.go | 2 +- sender/sharded_sender.go | 17 ++++++++++------- sender/worker.go | 11 +++++------ 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/config/settings_test.go b/config/settings_test.go index 4495c8c..ae4d797 100644 --- a/config/settings_test.go +++ b/config/settings_test.go @@ -115,7 +115,7 @@ func TestArgumentPrecedence(t *testing.T) { // Verify expectations require.Equal(t, tt.expectedStats, settings.StatsInterval.ToDuration(), "StatsInterval: expected %v, got %v", tt.expectedStats, settings.StatsInterval.ToDuration()) - require.Equal(t, tt.expectedWorkers, settings.Workers, "Workers: expected %d, got %d", tt.expectedWorkers, settings.Workers) + require.Equal(t, tt.expectedWorkers, settings.TasksPerEndpoint, "TasksPerEndpoint: expected %d, got %d", tt.expectedWorkers, settings.TasksPerEndpoint) require.Equal(t, tt.expectedTPS, settings.TPS, "TPS: expected %f, got %f", tt.expectedTPS, settings.TPS) }) } @@ -125,7 +125,7 @@ func TestDefaultSettings(t *testing.T) { defaults := DefaultSettings() expected := Settings{ - Workers: 1, + TasksPerEndpoint: 1, TPS: 0.0, StatsInterval: Duration(10 * time.Second), BufferSize: 1000, diff --git a/profiles/profiles_test.go b/profiles/profiles_test.go index aff0eb3..cd5cec4 100644 --- a/profiles/profiles_test.go +++ b/profiles/profiles_test.go @@ -72,7 +72,7 @@ func TestProfilesAlignment(t *testing.T) { // Test 4: Validate that all expected settings fields are present settings := loadConfig.Settings - if settings.Workers == 0 && settings.TPS == 0 && settings.BufferSize == 0 { + if settings.TasksPerEndpoint == 0 && settings.TPS == 0 && settings.BufferSize == 0 { t.Errorf("Profile %s appears to have zero values for critical settings fields", file.Name()) } diff --git a/sender/sharded_sender.go b/sender/sharded_sender.go index e9e992e..31b0505 100644 --- a/sender/sharded_sender.go +++ b/sender/sharded_sender.go @@ -26,13 +26,16 @@ func NewShardedSender(cfg *config.LoadConfig, limiter *rate.Limiter, collector * workers := make([]*Worker, len(cfg.Endpoints)) for i, endpoint := range cfg.Endpoints { workers[i] = NewWorker(&WorkerConfig{ - ID: i, - SeiChainID: cfg.SeiChainID, - Endpoint: endpoint, - BufferSize: cfg.Settings.BufferSize, - Tasks: cfg.Settings.TasksPerEndpoint, - Collector: collector, - Limiter: limiter, + ID: i, + SeiChainID: cfg.SeiChainID, + Endpoint: endpoint, + BufferSize: cfg.Settings.BufferSize, + Tasks: cfg.Settings.TasksPerEndpoint, + DryRun: cfg.Settings.DryRun, + TrackReceipts: cfg.Settings.TrackReceipts, + Debug: cfg.Settings.Debug, + Collector: collector, + Limiter: limiter, }) } diff --git a/sender/worker.go b/sender/worker.go index 1359ece..aff47fb 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -130,13 +130,12 @@ func NewWorker(cfg *WorkerConfig) *Worker { // Start begins the worker's processing loop func (w *Worker) Run(ctx context.Context) error { + client, err := newRPCClient(ctx, w.cfg.Endpoint) + if err != nil { + return fmt.Errorf("dial %s: %w", w.cfg.Endpoint, err) + } + defer client.Close() return service.Run(ctx, func(ctx context.Context, s service.Scope) error { - client, err := newRPCClient(ctx, w.cfg.Endpoint) - if err != nil { - return fmt.Errorf("dial %s: %w", w.cfg.Endpoint, err) - } - defer client.Close() - // Start multiple goroutines that share the same channel and RPC client. for range w.cfg.Tasks { s.Spawn(func() error { return w.runTxSender(ctx, client) })