From 4c77c34a4050bb37f01d1e6df764b0768645c6ef Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 15:58:57 +0300 Subject: [PATCH 01/17] Drop String alloc in is_hop_by_hop_header --- crates/rproxy/src/utils/utils_http.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/crates/rproxy/src/utils/utils_http.rs b/crates/rproxy/src/utils/utils_http.rs index 3bbba50..2bffd9a 100644 --- a/crates/rproxy/src/utils/utils_http.rs +++ b/crates/rproxy/src/utils/utils_http.rs @@ -1,8 +1,15 @@ // is_hop_by_hop_header ------------------------------------------------ -pub(crate) fn is_hop_by_hop_header(name: &actix_web::http::header::HeaderName) -> bool { +use actix_web::http::header; + +pub(crate) fn is_hop_by_hop_header(name: &header::HeaderName) -> bool { + // Fast path: compare against known HeaderName constants (no allocation). + // The original implementation only filtered these four (per its ASCII + // lowercase match): connection, host, keep-alive, transfer-encoding. matches!( - name.as_str().to_ascii_lowercase().as_str(), - "connection" | "host" | "keep-alive" | "transfer-encoding" - ) + name, + &header::CONNECTION | + &header::HOST | + &header::TRANSFER_ENCODING + ) || name.as_str().eq_ignore_ascii_case("keep-alive") } From 0da807613743174468815a97cb05b8b8bf96e0d7 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 16:01:16 +0300 Subject: [PATCH 02/17] Cache in-flight metric handles on ProxyHttp --- crates/rproxy/src/server/proxy/http/proxy.rs | 122 +++++++------------ 1 file changed, 43 insertions(+), 79 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 131b928..fd79100 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -40,6 +40,7 @@ use bytes::Bytes; use futures::TryStreamExt; use futures_core::Stream; use pin_project::pin_project; +use prometheus_client::metrics::{counter::Counter, gauge::Gauge}; use scc::HashMap; use time::{UtcDateTime, format_description::well_known::Iso8601}; use tokio::sync::broadcast; @@ -79,6 +80,13 @@ where backend: ProxyHttpBackendEndpoint, requests: HashMap, postprocessor: actix::Addr>, + + // Per-worker cached metric handles. These are cheap to clone (each + // wraps an `Arc` internally) and bypass the per-request + // `Family::get_or_create` lookup on the hot path. + in_flight_client: Gauge, + in_flight_backend: Gauge, + proxy_failure_count: Counter, } impl ProxyHttp @@ -135,7 +143,24 @@ where } .start(); - Self { id, shared, backend, requests: HashMap::default(), postprocessor } + let labels_proxy = LabelsProxy { proxy: P::name() }; + let in_flight_client = + shared.metrics.http_in_flight_requests_client.get_or_create(&labels_proxy).clone(); + let in_flight_backend = + shared.metrics.http_in_flight_requests_backend.get_or_create(&labels_proxy).clone(); + let proxy_failure_count = + shared.metrics.http_proxy_failure_count.get_or_create(&labels_proxy).clone(); + + Self { + id, + shared, + backend, + requests: HashMap::default(), + postprocessor, + in_flight_client, + in_flight_backend, + proxy_failure_count, + } } pub(crate) async fn run( @@ -407,10 +432,8 @@ where .inc(); } - metrics - .http_in_flight_requests_client - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + let in_flight_client = this.in_flight_client.clone(); + in_flight_client.inc(); let res = if this.shared.inner.might_intercept() { Self::send_to_backend_and_maybe_intercept(this, info, clnt_req_body, timestamp).await @@ -418,10 +441,7 @@ where Self::stream_to_backend(this, info, clnt_req_body, timestamp).await }; - metrics - .http_in_flight_requests_client - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + in_flight_client.dec(); res } @@ -444,11 +464,7 @@ where timestamp, ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.inc(); #[cfg(debug_assertions)] debug!( @@ -477,16 +493,8 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.dec(); + this.proxy_failure_count.inc(); return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); } }; @@ -503,11 +511,7 @@ where "Finished streaming http request to backend", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.in_flight_backend.dec(); Self::stream_to_client(this, req_id, conn_id, bknd_res) } @@ -532,11 +536,7 @@ where "Sending http request to backend...", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.inc(); let body = match clnt_req_body.to_bytes_limited(this.shared.config().max_request_size()).await { @@ -554,16 +554,8 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.dec(); + this.proxy_failure_count.inc(); return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); } @@ -583,25 +575,13 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.dec(); + this.proxy_failure_count.inc(); return Ok(HttpResponse::PayloadTooLarge().finish()); } }; - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.in_flight_backend.dec(); #[cfg(debug_assertions)] debug!( @@ -672,11 +652,7 @@ where let bknd_req = this.backend.new_backend_request(&req.info); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.inc(); let bknd_res = match bknd_req.send_body(req.body.clone()).await { Ok(bknd_res) => bknd_res, @@ -692,25 +668,13 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.proxy_failure_count.inc(); + this.in_flight_backend.dec(); return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); } }; - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.in_flight_backend.dec(); this.postprocess_client_request(req); From 761da33165736564f65941f8a84f1c8e93bf54ae Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 16:02:09 +0300 Subject: [PATCH 03/17] Cache user-agent metric handle per worker --- crates/rproxy/src/server/proxy/http/proxy.rs | 33 +++++++++++++++----- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index fd79100..6ca5cae 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -87,6 +87,11 @@ where in_flight_client: Gauge, in_flight_backend: Gauge, proxy_failure_count: Counter, + + // Per-worker cache of (user_agent -> Counter) so that we only pay the + // `Family::get_or_create` cost the first time a UA is seen on this + // worker. Lookup is `&str`-keyed (no allocation on hit). + client_info_cache: HashMap, } impl ProxyHttp @@ -160,6 +165,7 @@ where in_flight_client, in_flight_backend, proxy_failure_count, + client_info_cache: HashMap::default(), } } @@ -423,13 +429,26 @@ where !user_agent.is_empty() && let Ok(user_agent) = user_agent.to_str() { - metrics - .client_info - .get_or_create(&LabelsProxyClientInfo { - proxy: P::name(), - user_agent: user_agent.to_string(), - }) - .inc(); + // Hot path: read-only lookup keyed by &str (no allocation). + // Cold path: allocate the owned String key and resolve the + // counter from the metrics family exactly once per worker per + // distinct UA. + if let Some(entry) = this.client_info_cache.read_sync(user_agent, |_, c| c.clone()) { + entry.inc(); + } else { + let counter = metrics + .client_info + .get_or_create(&LabelsProxyClientInfo { + proxy: P::name(), + user_agent: user_agent.to_string(), + }) + .clone(); + counter.inc(); + // Best-effort insert; races with another task on the same + // worker thread shouldn't happen given !Send actix workers, + // but tolerate insert errors regardless. + let _ = this.client_info_cache.insert_sync(user_agent.to_string(), counter); + } } let in_flight_client = this.in_flight_client.clone(); From 6a1c041caeebae16bade966130039d387ee11449 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 16:03:05 +0300 Subject: [PATCH 04/17] Bind connection_info() once per request --- crates/rproxy/src/server/proxy/http/proxy.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 6ca5cae..42f552e 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -1471,6 +1471,10 @@ pub(crate) struct ProxyHttpRequestInfo { impl ProxyHttpRequestInfo { pub(crate) fn new(req: &HttpRequest, guard: Option<&ConnectionGuard>) -> Self { + // Bind connection_info() once — actix recomputes/borrows on each + // call and the lookup showed up as a hot spot under load. + let ci = req.connection_info(); + // copy over only non hop-by-hop headers let mut headers = HeaderMap::new(); for (header, value) in req.headers().iter() { @@ -1480,7 +1484,7 @@ impl ProxyHttpRequestInfo { } // append remote ip to x-forwarded-for - if let Some(peer_addr) = req.connection_info().peer_addr() { + if let Some(peer_addr) = ci.peer_addr() { let mut forwarded_for = String::new(); if let Some(ff) = req.headers().get(header::X_FORWARDED_FOR) && let Ok(ff) = ff.to_str() @@ -1495,17 +1499,17 @@ impl ProxyHttpRequestInfo { } // set x-forwarded-proto if it's not already set - if !req.connection_info().scheme().is_empty() && + if !ci.scheme().is_empty() && req.headers().get(header::X_FORWARDED_PROTO).is_none() && - let Ok(forwarded_proto) = HeaderValue::from_str(req.connection_info().scheme()) + let Ok(forwarded_proto) = HeaderValue::from_str(ci.scheme()) { headers.insert(header::X_FORWARDED_PROTO, forwarded_proto); } // set x-forwarded-host if it's not already set - if !req.connection_info().host().is_empty() && + if !ci.host().is_empty() && req.headers().get(header::X_FORWARDED_HOST).is_none() && - let Ok(forwarded_host) = HeaderValue::from_str(req.connection_info().host()) + let Ok(forwarded_host) = HeaderValue::from_str(ci.host()) { headers.insert(header::X_FORWARDED_HOST, forwarded_host); } @@ -1515,9 +1519,9 @@ impl ProxyHttpRequestInfo { let remote_addr = match guard { Some(guard) => match guard.remote_addr.clone() { Some(remote_addr) => Some(remote_addr), - None => req.connection_info().peer_addr().map(String::from), + None => ci.peer_addr().map(String::from), }, - None => req.connection_info().peer_addr().map(String::from), + None => ci.peer_addr().map(String::from), }; let path = match req.path() { From 346b44c975e73b1f1dd5bc727d0786bc61430c9f Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 16:03:52 +0300 Subject: [PATCH 05/17] Append headers without per-iter revalidation --- crates/rproxy/src/server/proxy/http/proxy.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 42f552e..e8439fe 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -1351,8 +1351,13 @@ where let mut req = self.client.request(info.method.clone(), url.as_str()).no_decompress(); - for (header, value) in info.headers.iter() { - req = req.insert_header((header.clone(), value.clone())); + // Append directly to the awc request's HeaderMap (vs. the + // per-header `insert_header` builder pattern, which re-runs + // `TryIntoHeaderPair` validation each iteration). HeaderValue + // clones are refcount bumps; HeaderName clones are interned/cheap. + let dst = req.headers_mut(); + for (k, v) in info.headers.iter() { + dst.append(k.clone(), v.clone()); } req From eac73d7cdbabed787bc014d8d2b02d8b370c3d9e Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 16:04:45 +0300 Subject: [PATCH 06/17] Drop HeaderName::from_str on response forwarding --- crates/rproxy/src/server/proxy/http/proxy.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index e8439fe..8bfe2ea 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -5,7 +5,6 @@ use std::{ mem, ops::Add, pin::Pin, - str::FromStr, sync::{ Arc, atomic::{AtomicI64, AtomicUsize, Ordering}, @@ -389,13 +388,15 @@ where fn to_client_response(bknd_res: &ClientResponse) -> HttpResponseBuilder { let mut clnt_res = HttpResponse::build(bknd_res.status()); + // The backend's `HeaderName` instances are already validated by the + // http parser, so we forward them verbatim without round-tripping + // through `HeaderName::from_str` (which was showing up as + // surprisingly hot on engine_* call patterns). for (hkey, hval) in bknd_res.headers().iter() { if is_hop_by_hop_header(hkey) { continue; } - if let Ok(hkey) = header::HeaderName::from_str(hkey.as_str()) { - clnt_res.append_header((hkey, hval.clone())); - } + clnt_res.append_header((hkey.clone(), hval.clone())); } clnt_res From 098c7d52ea4f8f647c87821eba9383246f2001f5 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Thu, 4 Jun 2026 11:48:28 +0300 Subject: [PATCH 07/17] Set TCP_NODELAY on backend connections --- crates/rproxy/src/server/proxy/http/proxy.rs | 89 +++++++++++++++++--- 1 file changed, 78 insertions(+), 11 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 8bfe2ea..30f5a4b 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -475,14 +475,60 @@ where let req_id = info.req_id; let conn_id = info.conn_id; + // Buffer the client request body so we can forward it to the backend + // with a Content-Length header (instead of chunked Transfer-Encoding). + // Backends like op-rbuilder's authrpc reader can begin JSON parse as + // soon as the framed body arrives, rather than waiting for the final + // chunk delimiter. The trade-off is loss of upload pipelining; this is + // acceptable for loopback (localhost) backends where bandwidth is high + // and chunk framing overhead dominates. + let body = + match clnt_req_body.to_bytes_limited(this.shared.config().max_request_size()).await { + Ok(Ok(body)) => body, + + Ok(Err(err)) => { + warn!( + proxy = P::name(), + request_id = %req_id, + connection_id = %conn_id, + worker_id = %this.id, + thread_id = ?std::thread::current().id(), + backend_url = %this.backend.url, + latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), + error = ?err, + "Failed to read client request body", + ); + this.shared + .metrics + .http_proxy_failure_count + .get_or_create(&LabelsProxy { proxy: P::name() }) + .inc(); + return Ok(HttpResponse::BadRequest().body(format!("Bad request: {err:?}"))); + } + + Err(_) => { + warn!( + proxy = P::name(), + request_id = %req_id, + connection_id = %conn_id, + worker_id = %this.id, + thread_id = ?std::thread::current().id(), + backend_url = %this.backend.url, + latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), + "Client request body exceeds max size", + ); + this.shared + .metrics + .http_proxy_failure_count + .get_or_create(&LabelsProxy { proxy: P::name() }) + .inc(); + return Ok(HttpResponse::PayloadTooLarge().finish()); + } + }; + + let body_received_at = UtcDateTime::now(); + let bknd_req = this.backend.new_backend_request(&info); - let bknd_req_body = ProxyHttpRequestBody::new( - this.clone(), - info, - clnt_req_body, - this.shared.config().prealloacated_request_buffer_size(), - timestamp, - ); this.in_flight_backend.inc(); @@ -495,10 +541,11 @@ where thread_id = ?std::thread::current().id(), backend_url = %this.backend.url, latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - "Streaming http request to backend...", + body_size = body.len(), + "Sending http request to backend (Content-Length)...", ); - let bknd_res = match bknd_req.send_stream(bknd_req_body).await { + let bknd_res = match bknd_req.send_body(body.clone()).await { Ok(bknd_res) => bknd_res, Err(err) => { @@ -528,11 +575,14 @@ where thread_id = ?std::thread::current().id(), backend_url = %this.backend.url, latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - "Finished streaming http request to backend", + "Got http response from backend", ); this.in_flight_backend.dec(); + let req = ProxiedHttpRequest::new(info, body.to_vec(), timestamp, body_received_at); + this.postprocess_client_request(req); + Self::stream_to_client(this, req_id, conn_id, bknd_res) } @@ -1337,9 +1387,26 @@ where .unwrap() // safety: verified on start .to_string(); + // Build the inner TCP connector ourselves so we can flip + // TCP_NODELAY on after each connect + use actix_service::ServiceExt as _; + let tcp_nodelay = actix_tls::connect::Connector::new( + actix_tls::connect::Resolver::default(), + ) + .service() + .map(|conn: actix_tls::connect::Connection| { + let _ = conn.io_ref().set_nodelay(true); + conn + }); + let client = Client::builder() .add_default_header((header::HOST, host)) - .connector(Connector::new().conn_keep_alive(2 * timeout).limit(connections_limit)) + .connector( + Connector::new() + .connector(tcp_nodelay) + .conn_keep_alive(2 * timeout) + .limit(connections_limit), + ) .timeout(timeout) .finish(); From 3673175b14c39f6d830ce1af4d3ecfe6363fe823 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Thu, 4 Jun 2026 15:00:34 +0300 Subject: [PATCH 08/17] Drop request body buffering --- crates/rproxy/src/server/proxy/http/proxy.rs | 72 +++----------------- 1 file changed, 11 insertions(+), 61 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 30f5a4b..b47a2a1 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -475,60 +475,14 @@ where let req_id = info.req_id; let conn_id = info.conn_id; - // Buffer the client request body so we can forward it to the backend - // with a Content-Length header (instead of chunked Transfer-Encoding). - // Backends like op-rbuilder's authrpc reader can begin JSON parse as - // soon as the framed body arrives, rather than waiting for the final - // chunk delimiter. The trade-off is loss of upload pipelining; this is - // acceptable for loopback (localhost) backends where bandwidth is high - // and chunk framing overhead dominates. - let body = - match clnt_req_body.to_bytes_limited(this.shared.config().max_request_size()).await { - Ok(Ok(body)) => body, - - Ok(Err(err)) => { - warn!( - proxy = P::name(), - request_id = %req_id, - connection_id = %conn_id, - worker_id = %this.id, - thread_id = ?std::thread::current().id(), - backend_url = %this.backend.url, - latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - error = ?err, - "Failed to read client request body", - ); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); - return Ok(HttpResponse::BadRequest().body(format!("Bad request: {err:?}"))); - } - - Err(_) => { - warn!( - proxy = P::name(), - request_id = %req_id, - connection_id = %conn_id, - worker_id = %this.id, - thread_id = ?std::thread::current().id(), - backend_url = %this.backend.url, - latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - "Client request body exceeds max size", - ); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); - return Ok(HttpResponse::PayloadTooLarge().finish()); - } - }; - - let body_received_at = UtcDateTime::now(); - let bknd_req = this.backend.new_backend_request(&info); + let bknd_req_body = ProxyHttpRequestBody::new( + this.clone(), + info, + clnt_req_body, + this.shared.config().prealloacated_request_buffer_size(), + timestamp, + ); this.in_flight_backend.inc(); @@ -541,11 +495,10 @@ where thread_id = ?std::thread::current().id(), backend_url = %this.backend.url, latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - body_size = body.len(), - "Sending http request to backend (Content-Length)...", + "Streaming http request to backend...", ); - let bknd_res = match bknd_req.send_body(body.clone()).await { + let bknd_res = match bknd_req.send_stream(bknd_req_body).await { Ok(bknd_res) => bknd_res, Err(err) => { @@ -575,14 +528,11 @@ where thread_id = ?std::thread::current().id(), backend_url = %this.backend.url, latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - "Got http response from backend", + "Finished streaming http request to backend", ); this.in_flight_backend.dec(); - let req = ProxiedHttpRequest::new(info, body.to_vec(), timestamp, body_received_at); - this.postprocess_client_request(req); - Self::stream_to_client(this, req_id, conn_id, bknd_res) } @@ -1387,7 +1337,7 @@ where .unwrap() // safety: verified on start .to_string(); - // Build the inner TCP connector ourselves so we can flip + // Build the inner TCP connector ourselves so we can flip // TCP_NODELAY on after each connect use actix_service::ServiceExt as _; let tcp_nodelay = actix_tls::connect::Connector::new( From 4e59e106f469095522015b6e7756131552f7db1c Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:25:11 +0300 Subject: [PATCH 09/17] Set awc connection lifetime to infinite, tighten disconnect timeout --- crates/rproxy/src/server/proxy/http/proxy.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index b47a2a1..9f4798b 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -1355,6 +1355,8 @@ where Connector::new() .connector(tcp_nodelay) .conn_keep_alive(2 * timeout) + .conn_lifetime(Duration::from_secs(86_400)) + .disconnect_timeout(Duration::from_millis(100)) .limit(connections_limit), ) .timeout(timeout) From 202791e4202303f1c39c2d76f50cbf90d414a41e Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:26:08 +0300 Subject: [PATCH 10/17] Enable TCP_KEEPALIVE on backend connections --- crates/rproxy/src/server/proxy/http/proxy.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 9f4798b..49180df 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -1346,6 +1346,9 @@ where .service() .map(|conn: actix_tls::connect::Connection| { let _ = conn.io_ref().set_nodelay(true); + let _ = socket2::SockRef::from(conn.io_ref()).set_tcp_keepalive( + &socket2::TcpKeepalive::new().with_time(Duration::from_secs(60)), + ); conn }); From 104f5193d2348890bf8b0a26a51f0d91785f9d90 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:28:01 +0300 Subject: [PATCH 11/17] Drop HeaderMap clone, capture content-encoding only --- crates/rproxy/src/server/proxy/http/proxy.rs | 28 +++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 49180df..ee499f5 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -716,12 +716,13 @@ where let mut clnt_res = Self::to_client_response(&bknd_res); let preallocate = this.shared.config().prealloacated_response_buffer_size(); + let content_encoding = bknd_res.headers().get(header::CONTENT_ENCODING).cloned(); let bknd_res_body = ProxyHttpResponseBody::new( this, req_id, conn_id, status, - bknd_res.headers().clone(), + content_encoding, bknd_res.into_stream(), preallocate, timestamp, @@ -1428,11 +1429,13 @@ where BodySize::Sized(size) => size, // Body is always sized BodySize::None | BodySize::Stream => 0, }; + let content_encoding = + bknd_res.headers().get(header::CONTENT_ENCODING).cloned(); let info = ProxyHttpResponseInfo::new( clnt_req.info.req_id, clnt_req.info.conn_id, bknd_res.status(), - bknd_res.headers().clone(), + content_encoding, ); let mirr_res = ProxiedHttpResponse { info, @@ -1612,18 +1615,23 @@ pub(crate) struct ProxyHttpResponseInfo { req_id: Uuid, conn_id: Uuid, status: StatusCode, - headers: HeaderMap, // TODO: perhaps we don't need all headers, just select ones + content_encoding: Option, } impl ProxyHttpResponseInfo { - pub(crate) fn new(req_id: Uuid, conn_id: Uuid, status: StatusCode, headers: HeaderMap) -> Self { - Self { req_id, conn_id, status, headers } + pub(crate) fn new( + req_id: Uuid, + conn_id: Uuid, + status: StatusCode, + content_encoding: Option, + ) -> Self { + Self { req_id, conn_id, status, content_encoding } } fn content_encoding(&self) -> String { - self.headers - .get(header::CONTENT_ENCODING) - .map(|h| h.to_str().unwrap_or_default()) + self.content_encoding + .as_ref() + .and_then(|h| h.to_str().ok()) .map(|h| h.to_string()) .unwrap_or_default() } @@ -1806,7 +1814,7 @@ where req_id: Uuid, conn_id: Uuid, status: StatusCode, - headers: HeaderMap, + content_encoding: Option, body: S, preallocate: usize, timestamp: UtcDateTime, @@ -1818,7 +1826,7 @@ where start: timestamp, body: Vec::with_capacity(preallocate), max_size, - info: Some(ProxyHttpResponseInfo::new(req_id, conn_id, status, headers)), + info: Some(ProxyHttpResponseInfo::new(req_id, conn_id, status, content_encoding)), } } } From dc21efe12e0d95c3d1baf986a7b78f151ca79fc9 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:29:29 +0300 Subject: [PATCH 12/17] Defer postprocess_client_request off the critical path --- crates/rproxy/src/server/proxy/http/proxy.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index ee499f5..4561706 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -696,9 +696,17 @@ where this.in_flight_backend.dec(); - this.postprocess_client_request(req); - - Self::stream_to_client(this, req_id, conn_id, bknd_res) + // Initiate the response stream first so the client sees no extra + // bookkeeping latency on the critical path, then file the request + // into the in-flight map for the eventual response postprocessor. + // The insert must complete before the response body stream finishes + // (which is when `postprocess_backend_response` calls `remove_sync`); + // this is guaranteed because actix won't begin polling the streaming + // body until after this synchronous handler returns. + let this_clone = this.clone(); + let res = Self::stream_to_client(this, req_id, conn_id, bknd_res); + this_clone.postprocess_client_request(req); + res } fn stream_to_client( From 0bf93f2f2eccf9582f4bd9fc5d02044a085f70ff Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:31:52 +0300 Subject: [PATCH 13/17] Skip postprocess entirely for non-logged, non-mirrored requests --- crates/rproxy/src/server/proxy/http/proxy.rs | 34 ++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 4561706..dd77e3a 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -790,6 +790,40 @@ where mirroring_peers: Arc>>>, mut mirroring_peer_round_robin_index: usize, ) { + // Fast path: when nobody is logging the proxied request/response and + // there are no mirror peers configured, skip the (expensive) response + // decompress + parse and the (always-evaluated) `info!` formatting in + // `maybe_log_proxied_request_and_response`. We still parse the request + // for the jrpc method label so metrics keep their correct dimensions. + let log_off = + !inner.config().log_proxied_requests() && !inner.config().log_proxied_responses(); + let no_mirror = mirroring_peers.is_empty(); + if log_off && no_mirror { + if clnt_req.decompressed_size < clnt_req.size { + (clnt_req.decompressed_body, clnt_req.decompressed_size) = decompress( + clnt_req.body.clone(), + clnt_req.size, + clnt_req.info.content_encoding(), + ); + } + match serde_json::from_slice::(&clnt_req.decompressed_body) { + Ok(jrpc) => { + Self::emit_metrics_on_proxy_success(&jrpc, &clnt_req, &bknd_res, metrics); + } + Err(err) => { + warn!( + proxy = P::name(), + request_id = %clnt_req.info.req_id, + connection_id = %clnt_req.info.conn_id, + worker_id = %worker_id, + error = ?err, + "Failed to parse json-rpc request", + ); + } + } + return; + } + if clnt_req.decompressed_size < clnt_req.size { (clnt_req.decompressed_body, clnt_req.decompressed_size) = decompress(clnt_req.body.clone(), clnt_req.size, clnt_req.info.content_encoding()); From bcef4afa6f723c089e19e21548531a1b7a95c8a7 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:35:33 +0300 Subject: [PATCH 14/17] Reuse parsed JrpcRequestMetaMaybeBatch in finalise_proxying --- crates/rproxy/src/server/proxy/http/proxy.rs | 31 ++++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index dd77e3a..34f9486 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -618,7 +618,8 @@ where let (decompressed_body, decompressed_size) = decompress(body.clone(), size, info.content_encoding()); - match serde_json::from_slice::(&decompressed_body) { + let jrpc_meta = match serde_json::from_slice::(&decompressed_body) + { Ok(jrpc) => { if let Some(res) = this.shared.inner.should_intercept(&jrpc) { let json_req = if this.shared.config().log_proxied_requests() { @@ -644,6 +645,9 @@ where return res; } + // Stash the parsed jrpc so `finalise_proxying` can avoid + // re-parsing the same body. + Some(Arc::new(jrpc)) } Err(err) => { @@ -657,8 +661,9 @@ where error = ?err, "Failed to parse json-rpc request", ); + None } - } + }; let req = ProxiedHttpRequest { info, @@ -668,6 +673,7 @@ where decompressed_size, start: timestamp, end: UtcDateTime::now(), + jrpc_meta, }; let bknd_req = this.backend.new_backend_request(&req.info); @@ -799,6 +805,12 @@ where !inner.config().log_proxied_requests() && !inner.config().log_proxied_responses(); let no_mirror = mirroring_peers.is_empty(); if log_off && no_mirror { + // If the request body has already been parsed (intercept path) + // we can avoid a second decompress + parse. + if let Some(jrpc) = clnt_req.jrpc_meta.clone() { + Self::emit_metrics_on_proxy_success(&jrpc, &clnt_req, &bknd_res, metrics); + return; + } if clnt_req.decompressed_size < clnt_req.size { (clnt_req.decompressed_body, clnt_req.decompressed_size) = decompress( clnt_req.body.clone(), @@ -834,7 +846,15 @@ where decompress(bknd_res.body.clone(), bknd_res.size, bknd_res.info.content_encoding()); } - match serde_json::from_slice::(&clnt_req.decompressed_body) { + // Reuse the parsed jrpc body when the intercept path stashed it on + // the `ProxiedHttpRequest`, otherwise parse it now. + let parsed = if let Some(stashed) = clnt_req.jrpc_meta.clone() { + Ok(stashed) + } else { + serde_json::from_slice::(&clnt_req.decompressed_body) + .map(Arc::new) + }; + match parsed { Ok(jrpc) => { if inner.should_mirror(&jrpc, &clnt_req, &bknd_res) { let mirrors_count = match inner.config().mirroring_strategy() { @@ -1990,6 +2010,10 @@ pub(crate) struct ProxiedHttpRequest { decompressed_size: usize, start: UtcDateTime, end: UtcDateTime, + /// Optionally stashed jrpc meta parsed on the intercept path so that + /// `finalise_proxying` does not redo `serde_json::from_slice` on the + /// same body. `None` for the prod streaming path which never parses. + jrpc_meta: Option>, } impl ProxiedHttpRequest { @@ -2008,6 +2032,7 @@ impl ProxiedHttpRequest { decompressed_size: 0, start, end, + jrpc_meta: None, } } From 69132bdea22c14ba781192af1ddaf031a02291bb Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 19:03:22 +0300 Subject: [PATCH 15/17] Cap user-agent metric cache to prevent unbounded growth --- crates/rproxy/src/server/proxy/http/proxy.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 34f9486..6bacf15 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -445,10 +445,13 @@ where }) .clone(); counter.inc(); - // Best-effort insert; races with another task on the same - // worker thread shouldn't happen given !Send actix workers, - // but tolerate insert errors regardless. - let _ = this.client_info_cache.insert_sync(user_agent.to_string(), counter); + // soft cap to prevent unbounded growth from hostile UA values + if this.client_info_cache.len() < 100 { + // Best-effort insert; races with another task on the same + // worker thread shouldn't happen given !Send actix workers, + // but tolerate insert errors regardless. + let _ = this.client_info_cache.insert_sync(user_agent.to_string(), counter); + } } } From 5373d2c63a16b166f0fc33b7624a6cedd8568c5b Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 19:03:58 +0300 Subject: [PATCH 16/17] Comment disconnect_timeout rationale --- crates/rproxy/src/server/proxy/http/proxy.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 6bacf15..566e18a 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -1425,6 +1425,8 @@ where .connector(tcp_nodelay) .conn_keep_alive(2 * timeout) .conn_lifetime(Duration::from_secs(86_400)) + // 100ms grace is fine for loopback (op-rbuilder); raise if backend + // is ever non-loopback (would risk truncating responses on shutdown). .disconnect_timeout(Duration::from_millis(100)) .limit(connections_limit), ) From d7d7f94cbf426e1c7675fe294f70a1a1ec17f0d9 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 21:16:40 +0300 Subject: [PATCH 17/17] Drop conn_lifetime override, let awc default reap idle backend conns --- crates/rproxy/src/server/proxy/http/proxy.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 566e18a..09ac1eb 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -1424,7 +1424,6 @@ where Connector::new() .connector(tcp_nodelay) .conn_keep_alive(2 * timeout) - .conn_lifetime(Duration::from_secs(86_400)) // 100ms grace is fine for loopback (op-rbuilder); raise if backend // is ever non-loopback (would risk truncating responses on shutdown). .disconnect_timeout(Duration::from_millis(100))