diff --git a/forester/src/api_server.rs b/forester/src/api_server.rs index d80c7757b9..fbd9109db2 100644 --- a/forester/src/api_server.rs +++ b/forester/src/api_server.rs @@ -1118,15 +1118,23 @@ pub fn spawn_api_server(config: ApiServerConfig) -> anyhow::Result, - #[arg(long, env = "RPC_POOL_SIZE", default_value = "100")] + #[arg(long, env = "RPC_POOL_SIZE", default_value = "32")] pub rpc_pool_size: u32, #[arg(long, env = "RPC_POOL_CONNECTION_TIMEOUT_SECS", default_value = "15")] @@ -303,6 +303,14 @@ pub struct StartArgs { )] pub enable_v1_multi_nullify: bool, + #[arg( + long, + env = "ENABLE_V1_PRESORT", + help = "Fetch queue leaf indices from the indexer (get_queue_leaf_indices) to pre-sort V1 work items for better dedup grouping. Requires an indexer that implements the endpoint. Best-effort; disabled by default.", + default_value = "false" + )] + pub enable_v1_presort: bool, + #[arg( long, env = "WORK_ITEM_BATCH_SIZE", diff --git a/forester/src/config.rs b/forester/src/config.rs index 8f88cecf22..c944a36634 100644 --- a/forester/src/config.rs +++ b/forester/src/config.rs @@ -36,6 +36,10 @@ pub struct ForesterConfig { /// Enable nullify_state_v1_multi instruction for batching 2-4 V1 state nullifications. /// Requires lookup_table_address to be set. pub enable_v1_multi_nullify: bool, + /// Enable the get_queue_leaf_indices pre-sort path for V1 work items. + /// Best-effort optimization; disabled by default. Only enable against an indexer + /// that implements the endpoint. + pub enable_v1_presort: bool, /// Number of queue items to process per batch cycle. Default: 50. pub work_item_batch_size: usize, } @@ -431,6 +435,7 @@ impl ForesterConfig { .transpose()?, min_queue_items: args.min_queue_items, enable_v1_multi_nullify: args.enable_v1_multi_nullify, + enable_v1_presort: args.enable_v1_presort, work_item_batch_size: args.work_item_batch_size.unwrap_or(50) as usize, }) } @@ -488,6 +493,7 @@ impl ForesterConfig { lookup_table_address: None, min_queue_items: None, enable_v1_multi_nullify: false, + enable_v1_presort: false, work_item_batch_size: 50, }) } @@ -511,6 +517,7 @@ impl Clone for ForesterConfig { lookup_table_address: self.lookup_table_address, min_queue_items: self.min_queue_items, enable_v1_multi_nullify: self.enable_v1_multi_nullify, + enable_v1_presort: self.enable_v1_presort, work_item_batch_size: self.work_item_batch_size, } } diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index a253b4bad3..6cf29d6c4e 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -39,7 +39,7 @@ use solana_sdk::{ transaction::TransactionError, }; use tokio::{ - sync::{mpsc, oneshot, Mutex}, + sync::{mpsc, oneshot, Mutex, Semaphore}, task::JoinHandle, time::{sleep, Instant, MissedTickBehavior}, }; @@ -280,6 +280,8 @@ pub struct EpochManager { run_id: Arc, /// Per-epoch registration trackers to coordinate re-finalization when new foresters register mid-epoch registration_trackers: Arc>>, + /// Process-wide limiter for V1 transaction sends across all trees. + v1_send_permits: Arc, } impl Clone for EpochManager { @@ -310,6 +312,7 @@ impl Clone for EpochManager { heartbeat: self.heartbeat.clone(), run_id: self.run_id.clone(), registration_trackers: self.registration_trackers.clone(), + v1_send_permits: self.v1_send_permits.clone(), } } } @@ -333,6 +336,7 @@ impl EpochManager { run_id: String, ) -> Result { let authority = Arc::new(config.payer_keypair.insecure_clone()); + let v1_send_permit_count = config.transaction_config.max_concurrent_sends.max(1); Ok(Self { config, protocol_config, @@ -359,6 +363,7 @@ impl EpochManager { heartbeat, run_id: Arc::::from(run_id), registration_trackers: Arc::new(DashMap::new()), + v1_send_permits: Arc::new(Semaphore::new(v1_send_permit_count)), }) } @@ -2265,8 +2270,14 @@ impl EpochManager { let mut estimated_slot = self.slot_tracker.estimated_current_slot(); - // Polling interval for checking queue - const POLL_INTERVAL: Duration = Duration::from_millis(200); + // Adaptive queue polling: start responsive, then back off (capped) while the + // queue has nothing ready to process, and reset to the minimum as soon as work + // is found. A fixed 200ms poll made idle V2 trees re-fetch the queue ~5x/sec for + // the whole eligible window, which is the dominant source of wasted RPC/indexer + // load (and can exhaust a shared RPC credit budget). + const POLL_INTERVAL_MIN: Duration = Duration::from_millis(200); + const POLL_INTERVAL_MAX: Duration = Duration::from_secs(10); + let mut poll_interval = POLL_INTERVAL_MIN; 'inner_processing_loop: loop { if estimated_slot >= forester_slot_details.end_solana_slot { @@ -2334,9 +2345,12 @@ impl EpochManager { processing_start_time.elapsed(), ) .await; + // Work found: stay responsive while the queue drains. + poll_interval = POLL_INTERVAL_MIN; } else { - // No items to process, wait before polling again - tokio::time::sleep(POLL_INTERVAL).await; + // Nothing ready: wait, then back off (capped) to avoid hammering RPC. + tokio::time::sleep(poll_interval).await; + poll_interval = (poll_interval * 2).min(POLL_INTERVAL_MAX); } } Err(e) => { @@ -2350,7 +2364,8 @@ impl EpochManager { error = ?e, "V2 processing failed for tree" ); - tokio::time::sleep(POLL_INTERVAL).await; + tokio::time::sleep(poll_interval).await; + poll_interval = (poll_interval * 2).min(POLL_INTERVAL_MAX); } } @@ -3062,8 +3077,7 @@ impl EpochManager { } else { None }, - enable_presort: self.config.enable_v1_multi_nullify - && !self.address_lookup_tables.is_empty(), + enable_presort: self.config.enable_v1_presort, work_item_batch_size: self.config.work_item_batch_size, }; @@ -3083,6 +3097,7 @@ impl EpochManager { &batched_tx_config, *tree_accounts, transaction_builder, + self.v1_send_permits.clone(), ) .await?; @@ -4730,6 +4745,7 @@ mod tests { lookup_table_address: None, min_queue_items: None, enable_v1_multi_nullify: false, + enable_v1_presort: false, work_item_batch_size: 50, } } diff --git a/forester/src/processor/v1/send_transaction.rs b/forester/src/processor/v1/send_transaction.rs index 86e7bfb650..7a67b632ba 100644 --- a/forester/src/processor/v1/send_transaction.rs +++ b/forester/src/processor/v1/send_transaction.rs @@ -16,7 +16,10 @@ use solana_sdk::{ pubkey::Pubkey, signature::{Keypair, Signature, Signer}, }; -use tokio::time::Instant; +use tokio::{ + sync::{OwnedSemaphorePermit, Semaphore}, + time::{timeout, Instant}, +}; use tracing::{error, info, trace, warn}; use crate::{ @@ -40,6 +43,8 @@ struct PreparedBatchData { struct ChunkSendContext { pool: Arc>, max_concurrent_sends: usize, + send_permits: Arc, + permits_pre_acquired: bool, timeout_deadline: Instant, cancel_signal: Arc, num_sent_transactions: Arc, @@ -67,6 +72,7 @@ pub async fn send_batched_transactions, + send_permits: Arc, ) -> std::result::Result { let function_start_time = Instant::now(); @@ -144,18 +150,20 @@ pub async fn send_batched_transactions MAX_ITEMS_PER_CYCLE { @@ -164,17 +172,25 @@ pub async fn send_batched_transactions> = items_to_process .chunks(work_item_batch_size) .map(|c| c.to_vec()) .collect(); let num_chunks = chunks.len(); + let max_concurrent_chunks = effective_max_concurrent_sends + .div_ceil(work_item_batch_size) + .max(1) + .min(num_chunks.max(1)); info!( tree = %tree_accounts.merkle_tree, - "Processing {} concurrent chunks of up to {} items each", - num_chunks, work_item_batch_size + "Processing {} chunks of up to {} items each with chunk_concurrency={} and send_concurrency={}", + num_chunks, + work_item_batch_size, + max_concurrent_chunks, + effective_max_concurrent_sends ); let chunk_futures: Vec<_> = chunks @@ -190,13 +206,37 @@ pub async fn send_batched_transactions= timeout_deadline { return Ok(()); } - // Each chunk gets a fresh blockhash + let permits_to_reserve = work_chunk.len().min(effective_max_concurrent_sends).max(1); + let _chunk_send_permits = match acquire_send_permits( + &send_permits, + permits_to_reserve, + timeout_deadline, + ) + .await + { + Ok(permits) => permits, + Err(e) => { + warn!( + tree = %tree_id, + error = ?e, + "Skipping chunk because send concurrency permits were not available before deadline" + ); + return Ok(()); + } + }; + + if cancel_signal.load(Ordering::SeqCst) || Instant::now() >= timeout_deadline { + return Ok(()); + } + + // Each chunk gets a fresh blockhash after it owns send capacity. let (recent_blockhash, last_valid_block_height) = { let mut rpc = pool.get_connection().await.map_err(ForesterError::from)?; rpc.get_latest_blockhash().await.map_err(|e| { @@ -232,6 +272,8 @@ pub async fn send_batched_transactions>() + .await; for result in results { if let Err(ForesterError::NotEligible) = result { return Err(ForesterError::NotEligible); @@ -392,6 +437,34 @@ fn compute_effective_max_concurrent_sends( effective.max(1) } +async fn acquire_send_permits( + send_permits: &Arc, + permit_count: usize, + timeout_deadline: Instant, +) -> std::result::Result { + if Instant::now() >= timeout_deadline { + return Err(ForesterError::General { + error: "Timed out waiting for send concurrency permits".to_string(), + }); + } + + let permit_count = u32::try_from(permit_count).map_err(|_| ForesterError::General { + error: "send concurrency permit count exceeds supported limit".to_string(), + })?; + let permit_wait = timeout_deadline.saturating_duration_since(Instant::now()); + timeout( + permit_wait, + send_permits.clone().acquire_many_owned(permit_count), + ) + .await + .map_err(|_| ForesterError::General { + error: "Timed out waiting for send concurrency permits".to_string(), + })? + .map_err(|_| ForesterError::General { + error: "send concurrency limiter closed".to_string(), + }) +} + async fn execute_transaction_chunk_sending( transactions: Vec, context: &ChunkSendContext, @@ -406,11 +479,14 @@ async fn execute_transaction_chunk_sending( let num_sent_transactions = Arc::clone(&context.num_sent_transactions); let timeout_deadline = context.timeout_deadline; let max_concurrent_sends = context.max_concurrent_sends; + let send_permits = Arc::clone(&context.send_permits); + let permits_pre_acquired = context.permits_pre_acquired; let confirmation = context.confirmation; let transaction_send_futures = transactions.into_iter().map(|prepared_transaction| { let pool_clone = Arc::clone(&pool); let cancel_signal_clone = Arc::clone(&cancel_signal); let num_sent_transactions_clone = Arc::clone(&num_sent_transactions); + let send_permits_clone = Arc::clone(&send_permits); let tx_label = prepared_transaction.label().to_string(); async move { @@ -423,6 +499,32 @@ async fn execute_transaction_chunk_sending( .unwrap_or_default(); let tx_signature_str = tx_signature.to_string(); + let _send_permit = if permits_pre_acquired { + None + } else { + let permit_wait = timeout_deadline.saturating_duration_since(Instant::now()); + match timeout(permit_wait, send_permits_clone.acquire_owned()).await { + Ok(Ok(permit)) => Some(permit), + Ok(Err(_)) => { + error!( + tx.signature_attempt = %tx_signature_str, + "Send concurrency limiter was closed" + ); + return TransactionSendResult::SendFailure( + ForesterError::General { + error: "send concurrency limiter closed".to_string(), + }, + Some(tx_signature), + ); + } + Err(_) => return TransactionSendResult::Timeout, + } + }; + + if cancel_signal_clone.load(Ordering::SeqCst) || Instant::now() >= timeout_deadline { + return TransactionSendResult::Cancelled; + } + match pool_clone.get_connection().await { Ok(mut rpc) => { if Instant::now() >= timeout_deadline { @@ -562,3 +664,63 @@ async fn execute_transaction_chunk_sending( } Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn acquire_send_permits_acquires_requested_permits() { + let send_permits = Arc::new(Semaphore::new(3)); + let permit = acquire_send_permits( + &send_permits, + 2, + Instant::now() + Duration::from_millis(100), + ) + .await + .expect("permit reservation should succeed"); + + assert_eq!(send_permits.available_permits(), 1); + + drop(permit); + assert_eq!(send_permits.available_permits(), 3); + } + + #[tokio::test] + async fn acquire_send_permits_waits_for_full_reservation() { + let send_permits = Arc::new(Semaphore::new(2)); + let first_permit = acquire_send_permits( + &send_permits, + 2, + Instant::now() + Duration::from_millis(100), + ) + .await + .expect("first reservation should succeed"); + let waiter_permits = Arc::clone(&send_permits); + + let waiter = tokio::spawn(async move { + acquire_send_permits( + &waiter_permits, + 2, + Instant::now() + Duration::from_millis(500), + ) + .await + }); + + tokio::time::sleep(Duration::from_millis(25)).await; + + assert!(!waiter.is_finished()); + + drop(first_permit); + let second_permit = timeout(Duration::from_millis(100), waiter) + .await + .expect("second reservation should complete after permits are released") + .expect("waiter task should not panic") + .expect("second reservation should succeed"); + + assert_eq!(send_permits.available_permits(), 0); + + drop(second_permit); + assert_eq!(send_permits.available_permits(), 2); + } +} diff --git a/forester/src/smart_transaction.rs b/forester/src/smart_transaction.rs index 7f093b0f26..6a3a4b0302 100644 --- a/forester/src/smart_transaction.rs +++ b/forester/src/smart_transaction.rs @@ -45,6 +45,8 @@ impl Default for ConfirmationConfig { } } +const MIN_TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(2); + #[derive(Debug, Clone, Copy)] pub struct TransactionPolicy { pub priority_fee_config: PriorityFeeConfig, @@ -440,6 +442,11 @@ async fn send_prepared_transaction( let last_valid_block_height = transaction.last_valid_block_height(); let rpc = &*rpc; let mut last_send_error = None; + let resend_interval = confirmation + .poll_interval + .saturating_mul(4) + .max(MIN_TRANSACTION_RESEND_INTERVAL); + let mut next_send_at = Instant::now(); for attempt in 0..confirmation.max_attempts { if let Some(signature) = confirmed_signature_or_error(rpc, signature).await? { @@ -457,16 +464,19 @@ async fn send_prepared_transaction( }); } - match transaction.send_with_confirmation_config(rpc).await { - Ok(_) => last_send_error = None, - Err(error) if rpc_is_already_processed(&error) => last_send_error = None, - // BlockhashNotFound is transient: the sending RPC may not have - // propagated the blockhash from the fetcher yet. Retry until the - // blockhash either propagates or the outer `get_block_height > - // last_valid_block_height` check exits with BlockhashExpired. - Err(error) if is_blockhash_not_found(&error) => last_send_error = Some(error), - Err(error) if rpc.should_retry(&error) => last_send_error = Some(error), - Err(error) => return Err(error.into()), + if Instant::now() >= next_send_at { + match transaction.send_with_confirmation_config(rpc).await { + Ok(_) => last_send_error = None, + Err(error) if rpc_is_already_processed(&error) => last_send_error = None, + // BlockhashNotFound is transient: the sending RPC may not have + // propagated the blockhash from the fetcher yet. Retry until the + // blockhash either propagates or the outer `get_block_height > + // last_valid_block_height` check exits with BlockhashExpired. + Err(error) if is_blockhash_not_found(&error) => last_send_error = Some(error), + Err(error) if rpc.should_retry(&error) => last_send_error = Some(error), + Err(error) => return Err(error.into()), + } + next_send_at = Instant::now() + resend_interval; } if let Some(signature) = confirmed_signature_or_error(rpc, signature).await? { diff --git a/forester/tests/e2e_test.rs b/forester/tests/e2e_test.rs index c11500f29a..21460bb276 100644 --- a/forester/tests/e2e_test.rs +++ b/forester/tests/e2e_test.rs @@ -303,6 +303,7 @@ async fn e2e_test() { }), min_queue_items: None, enable_v1_multi_nullify: false, + enable_v1_presort: false, work_item_batch_size: 50, }; let test_mode = TestMode::from_env(); diff --git a/forester/tests/legacy/priority_fee_test.rs b/forester/tests/legacy/priority_fee_test.rs index 37f2ed010a..b475e6acd6 100644 --- a/forester/tests/legacy/priority_fee_test.rs +++ b/forester/tests/legacy/priority_fee_test.rs @@ -89,6 +89,7 @@ async fn test_priority_fee_request() { lookup_table_address: None, min_queue_items: None, enable_v1_multi_nullify: false, + enable_v1_presort: false, api_server_port: 8080, api_server_public_bind: false, group_authority: None, diff --git a/forester/tests/legacy/test_utils.rs b/forester/tests/legacy/test_utils.rs index fdac97ba34..960d73873c 100644 --- a/forester/tests/legacy/test_utils.rs +++ b/forester/tests/legacy/test_utils.rs @@ -125,6 +125,7 @@ pub fn forester_config() -> ForesterConfig { lookup_table_address: None, min_queue_items: None, enable_v1_multi_nullify: false, + enable_v1_presort: false, work_item_batch_size: 50, } } diff --git a/forester/tests/priority_fee_test.rs b/forester/tests/priority_fee_test.rs index 5067342d2f..a02df66b56 100644 --- a/forester/tests/priority_fee_test.rs +++ b/forester/tests/priority_fee_test.rs @@ -94,6 +94,7 @@ async fn test_priority_fee_request() { lookup_table_address: None, min_queue_items: None, enable_v1_multi_nullify: false, + enable_v1_presort: false, api_server_port: 8080, group_authority: None, light_pda_programs: vec![], diff --git a/forester/tests/test_utils.rs b/forester/tests/test_utils.rs index 4ae9352482..9061a526f4 100644 --- a/forester/tests/test_utils.rs +++ b/forester/tests/test_utils.rs @@ -138,6 +138,7 @@ pub fn forester_config() -> ForesterConfig { lookup_table_address: None, min_queue_items: None, enable_v1_multi_nullify: false, + enable_v1_presort: false, work_item_batch_size: 50, } } diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 43e5784a18..ff79a41f96 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.90.0" +channel = "1.91" components = ["rustfmt", "clippy"] diff --git a/sdk-libs/client/src/indexer/photon_indexer.rs b/sdk-libs/client/src/indexer/photon_indexer.rs index afc7bd3c21..7e35e9995c 100644 --- a/sdk-libs/client/src/indexer/photon_indexer.rs +++ b/sdk-libs/client/src/indexer/photon_indexer.rs @@ -65,9 +65,18 @@ impl PhotonIndexer { } Err(e) => { let is_retryable = match &e { - IndexerError::ApiError(_) => { - warn!("API Error: {}", e); - true + IndexerError::ApiError(msg) => { + // A 404 / "method not found" means the indexer does not implement + // this endpoint; retrying cannot succeed and only burns the backoff + // budget (up to ~44s with the default config). Treat it as non-retryable. + let lower = msg.to_lowercase(); + if lower.contains("method not found") || lower.contains("status 404") { + warn!("Non-retryable API error (endpoint not available): {}", e); + false + } else { + warn!("API Error: {}", e); + true + } } IndexerError::PhotonError { context: _,