From 4c77c34a4050bb37f01d1e6df764b0768645c6ef Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 15:58:57 +0300 Subject: [PATCH 01/17] Drop String alloc in is_hop_by_hop_header --- crates/rproxy/src/utils/utils_http.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/crates/rproxy/src/utils/utils_http.rs b/crates/rproxy/src/utils/utils_http.rs index 3bbba50..2bffd9a 100644 --- a/crates/rproxy/src/utils/utils_http.rs +++ b/crates/rproxy/src/utils/utils_http.rs @@ -1,8 +1,15 @@ // is_hop_by_hop_header ------------------------------------------------ -pub(crate) fn is_hop_by_hop_header(name: &actix_web::http::header::HeaderName) -> bool { +use actix_web::http::header; + +pub(crate) fn is_hop_by_hop_header(name: &header::HeaderName) -> bool { + // Fast path: compare against known HeaderName constants (no allocation). + // The original implementation only filtered these four (per its ASCII + // lowercase match): connection, host, keep-alive, transfer-encoding. matches!( - name.as_str().to_ascii_lowercase().as_str(), - "connection" | "host" | "keep-alive" | "transfer-encoding" - ) + name, + &header::CONNECTION | + &header::HOST | + &header::TRANSFER_ENCODING + ) || name.as_str().eq_ignore_ascii_case("keep-alive") } From 0da807613743174468815a97cb05b8b8bf96e0d7 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 16:01:16 +0300 Subject: [PATCH 02/17] Cache in-flight metric handles on ProxyHttp --- crates/rproxy/src/server/proxy/http/proxy.rs | 122 +++++++------------ 1 file changed, 43 insertions(+), 79 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 131b928..fd79100 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -40,6 +40,7 @@ use bytes::Bytes; use futures::TryStreamExt; use futures_core::Stream; use pin_project::pin_project; +use prometheus_client::metrics::{counter::Counter, gauge::Gauge}; use scc::HashMap; use time::{UtcDateTime, format_description::well_known::Iso8601}; use tokio::sync::broadcast; @@ -79,6 +80,13 @@ where backend: ProxyHttpBackendEndpoint, requests: HashMap, postprocessor: actix::Addr>, + + // Per-worker cached metric handles. These are cheap to clone (each + // wraps an `Arc` internally) and bypass the per-request + // `Family::get_or_create` lookup on the hot path. + in_flight_client: Gauge, + in_flight_backend: Gauge, + proxy_failure_count: Counter, } impl ProxyHttp @@ -135,7 +143,24 @@ where } .start(); - Self { id, shared, backend, requests: HashMap::default(), postprocessor } + let labels_proxy = LabelsProxy { proxy: P::name() }; + let in_flight_client = + shared.metrics.http_in_flight_requests_client.get_or_create(&labels_proxy).clone(); + let in_flight_backend = + shared.metrics.http_in_flight_requests_backend.get_or_create(&labels_proxy).clone(); + let proxy_failure_count = + shared.metrics.http_proxy_failure_count.get_or_create(&labels_proxy).clone(); + + Self { + id, + shared, + backend, + requests: HashMap::default(), + postprocessor, + in_flight_client, + in_flight_backend, + proxy_failure_count, + } } pub(crate) async fn run( @@ -407,10 +432,8 @@ where .inc(); } - metrics - .http_in_flight_requests_client - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + let in_flight_client = this.in_flight_client.clone(); + in_flight_client.inc(); let res = if this.shared.inner.might_intercept() { Self::send_to_backend_and_maybe_intercept(this, info, clnt_req_body, timestamp).await @@ -418,10 +441,7 @@ where Self::stream_to_backend(this, info, clnt_req_body, timestamp).await }; - metrics - .http_in_flight_requests_client - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + in_flight_client.dec(); res } @@ -444,11 +464,7 @@ where timestamp, ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.inc(); #[cfg(debug_assertions)] debug!( @@ -477,16 +493,8 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.dec(); + this.proxy_failure_count.inc(); return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); } }; @@ -503,11 +511,7 @@ where "Finished streaming http request to backend", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.in_flight_backend.dec(); Self::stream_to_client(this, req_id, conn_id, bknd_res) } @@ -532,11 +536,7 @@ where "Sending http request to backend...", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.inc(); let body = match clnt_req_body.to_bytes_limited(this.shared.config().max_request_size()).await { @@ -554,16 +554,8 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.dec(); + this.proxy_failure_count.inc(); return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); } @@ -583,25 +575,13 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.dec(); + this.proxy_failure_count.inc(); return Ok(HttpResponse::PayloadTooLarge().finish()); } }; - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.in_flight_backend.dec(); #[cfg(debug_assertions)] debug!( @@ -672,11 +652,7 @@ where let bknd_req = this.backend.new_backend_request(&req.info); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.inc(); let bknd_res = match bknd_req.send_body(req.body.clone()).await { Ok(bknd_res) => bknd_res, @@ -692,25 +668,13 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.proxy_failure_count.inc(); + this.in_flight_backend.dec(); return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); } }; - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.in_flight_backend.dec(); this.postprocess_client_request(req); From 761da33165736564f65941f8a84f1c8e93bf54ae Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 16:02:09 +0300 Subject: [PATCH 03/17] Cache user-agent metric handle per worker --- crates/rproxy/src/server/proxy/http/proxy.rs | 33 +++++++++++++++----- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index fd79100..6ca5cae 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -87,6 +87,11 @@ where in_flight_client: Gauge, in_flight_backend: Gauge, proxy_failure_count: Counter, + + // Per-worker cache of (user_agent -> Counter) so that we only pay the + // `Family::get_or_create` cost the first time a UA is seen on this + // worker. Lookup is `&str`-keyed (no allocation on hit). + client_info_cache: HashMap, } impl ProxyHttp @@ -160,6 +165,7 @@ where in_flight_client, in_flight_backend, proxy_failure_count, + client_info_cache: HashMap::default(), } } @@ -423,13 +429,26 @@ where !user_agent.is_empty() && let Ok(user_agent) = user_agent.to_str() { - metrics - .client_info - .get_or_create(&LabelsProxyClientInfo { - proxy: P::name(), - user_agent: user_agent.to_string(), - }) - .inc(); + // Hot path: read-only lookup keyed by &str (no allocation). + // Cold path: allocate the owned String key and resolve the + // counter from the metrics family exactly once per worker per + // distinct UA. + if let Some(entry) = this.client_info_cache.read_sync(user_agent, |_, c| c.clone()) { + entry.inc(); + } else { + let counter = metrics + .client_info + .get_or_create(&LabelsProxyClientInfo { + proxy: P::name(), + user_agent: user_agent.to_string(), + }) + .clone(); + counter.inc(); + // Best-effort insert; races with another task on the same + // worker thread shouldn't happen given !Send actix workers, + // but tolerate insert errors regardless. + let _ = this.client_info_cache.insert_sync(user_agent.to_string(), counter); + } } let in_flight_client = this.in_flight_client.clone(); From 6a1c041caeebae16bade966130039d387ee11449 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 16:03:05 +0300 Subject: [PATCH 04/17] Bind connection_info() once per request --- crates/rproxy/src/server/proxy/http/proxy.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 6ca5cae..42f552e 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -1471,6 +1471,10 @@ pub(crate) struct ProxyHttpRequestInfo { impl ProxyHttpRequestInfo { pub(crate) fn new(req: &HttpRequest, guard: Option<&ConnectionGuard>) -> Self { + // Bind connection_info() once — actix recomputes/borrows on each + // call and the lookup showed up as a hot spot under load. + let ci = req.connection_info(); + // copy over only non hop-by-hop headers let mut headers = HeaderMap::new(); for (header, value) in req.headers().iter() { @@ -1480,7 +1484,7 @@ impl ProxyHttpRequestInfo { } // append remote ip to x-forwarded-for - if let Some(peer_addr) = req.connection_info().peer_addr() { + if let Some(peer_addr) = ci.peer_addr() { let mut forwarded_for = String::new(); if let Some(ff) = req.headers().get(header::X_FORWARDED_FOR) && let Ok(ff) = ff.to_str() @@ -1495,17 +1499,17 @@ impl ProxyHttpRequestInfo { } // set x-forwarded-proto if it's not already set - if !req.connection_info().scheme().is_empty() && + if !ci.scheme().is_empty() && req.headers().get(header::X_FORWARDED_PROTO).is_none() && - let Ok(forwarded_proto) = HeaderValue::from_str(req.connection_info().scheme()) + let Ok(forwarded_proto) = HeaderValue::from_str(ci.scheme()) { headers.insert(header::X_FORWARDED_PROTO, forwarded_proto); } // set x-forwarded-host if it's not already set - if !req.connection_info().host().is_empty() && + if !ci.host().is_empty() && req.headers().get(header::X_FORWARDED_HOST).is_none() && - let Ok(forwarded_host) = HeaderValue::from_str(req.connection_info().host()) + let Ok(forwarded_host) = HeaderValue::from_str(ci.host()) { headers.insert(header::X_FORWARDED_HOST, forwarded_host); } @@ -1515,9 +1519,9 @@ impl ProxyHttpRequestInfo { let remote_addr = match guard { Some(guard) => match guard.remote_addr.clone() { Some(remote_addr) => Some(remote_addr), - None => req.connection_info().peer_addr().map(String::from), + None => ci.peer_addr().map(String::from), }, - None => req.connection_info().peer_addr().map(String::from), + None => ci.peer_addr().map(String::from), }; let path = match req.path() { From 346b44c975e73b1f1dd5bc727d0786bc61430c9f Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 16:03:52 +0300 Subject: [PATCH 05/17] Append headers without per-iter revalidation --- crates/rproxy/src/server/proxy/http/proxy.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 42f552e..e8439fe 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -1351,8 +1351,13 @@ where let mut req = self.client.request(info.method.clone(), url.as_str()).no_decompress(); - for (header, value) in info.headers.iter() { - req = req.insert_header((header.clone(), value.clone())); + // Append directly to the awc request's HeaderMap (vs. the + // per-header `insert_header` builder pattern, which re-runs + // `TryIntoHeaderPair` validation each iteration). HeaderValue + // clones are refcount bumps; HeaderName clones are interned/cheap. + let dst = req.headers_mut(); + for (k, v) in info.headers.iter() { + dst.append(k.clone(), v.clone()); } req From eac73d7cdbabed787bc014d8d2b02d8b370c3d9e Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 16:04:45 +0300 Subject: [PATCH 06/17] Drop HeaderName::from_str on response forwarding --- crates/rproxy/src/server/proxy/http/proxy.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index e8439fe..8bfe2ea 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -5,7 +5,6 @@ use std::{ mem, ops::Add, pin::Pin, - str::FromStr, sync::{ Arc, atomic::{AtomicI64, AtomicUsize, Ordering}, @@ -389,13 +388,15 @@ where fn to_client_response(bknd_res: &ClientResponse) -> HttpResponseBuilder { let mut clnt_res = HttpResponse::build(bknd_res.status()); + // The backend's `HeaderName` instances are already validated by the + // http parser, so we forward them verbatim without round-tripping + // through `HeaderName::from_str` (which was showing up as + // surprisingly hot on engine_* call patterns). for (hkey, hval) in bknd_res.headers().iter() { if is_hop_by_hop_header(hkey) { continue; } - if let Ok(hkey) = header::HeaderName::from_str(hkey.as_str()) { - clnt_res.append_header((hkey, hval.clone())); - } + clnt_res.append_header((hkey.clone(), hval.clone())); } clnt_res From 098c7d52ea4f8f647c87821eba9383246f2001f5 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Thu, 4 Jun 2026 11:48:28 +0300 Subject: [PATCH 07/17] Set TCP_NODELAY on backend connections --- crates/rproxy/src/server/proxy/http/proxy.rs | 89 +++++++++++++++++--- 1 file changed, 78 insertions(+), 11 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 8bfe2ea..30f5a4b 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -475,14 +475,60 @@ where let req_id = info.req_id; let conn_id = info.conn_id; + // Buffer the client request body so we can forward it to the backend + // with a Content-Length header (instead of chunked Transfer-Encoding). + // Backends like op-rbuilder's authrpc reader can begin JSON parse as + // soon as the framed body arrives, rather than waiting for the final + // chunk delimiter. The trade-off is loss of upload pipelining; this is + // acceptable for loopback (localhost) backends where bandwidth is high + // and chunk framing overhead dominates. + let body = + match clnt_req_body.to_bytes_limited(this.shared.config().max_request_size()).await { + Ok(Ok(body)) => body, + + Ok(Err(err)) => { + warn!( + proxy = P::name(), + request_id = %req_id, + connection_id = %conn_id, + worker_id = %this.id, + thread_id = ?std::thread::current().id(), + backend_url = %this.backend.url, + latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), + error = ?err, + "Failed to read client request body", + ); + this.shared + .metrics + .http_proxy_failure_count + .get_or_create(&LabelsProxy { proxy: P::name() }) + .inc(); + return Ok(HttpResponse::BadRequest().body(format!("Bad request: {err:?}"))); + } + + Err(_) => { + warn!( + proxy = P::name(), + request_id = %req_id, + connection_id = %conn_id, + worker_id = %this.id, + thread_id = ?std::thread::current().id(), + backend_url = %this.backend.url, + latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), + "Client request body exceeds max size", + ); + this.shared + .metrics + .http_proxy_failure_count + .get_or_create(&LabelsProxy { proxy: P::name() }) + .inc(); + return Ok(HttpResponse::PayloadTooLarge().finish()); + } + }; + + let body_received_at = UtcDateTime::now(); + let bknd_req = this.backend.new_backend_request(&info); - let bknd_req_body = ProxyHttpRequestBody::new( - this.clone(), - info, - clnt_req_body, - this.shared.config().prealloacated_request_buffer_size(), - timestamp, - ); this.in_flight_backend.inc(); @@ -495,10 +541,11 @@ where thread_id = ?std::thread::current().id(), backend_url = %this.backend.url, latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - "Streaming http request to backend...", + body_size = body.len(), + "Sending http request to backend (Content-Length)...", ); - let bknd_res = match bknd_req.send_stream(bknd_req_body).await { + let bknd_res = match bknd_req.send_body(body.clone()).await { Ok(bknd_res) => bknd_res, Err(err) => { @@ -528,11 +575,14 @@ where thread_id = ?std::thread::current().id(), backend_url = %this.backend.url, latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - "Finished streaming http request to backend", + "Got http response from backend", ); this.in_flight_backend.dec(); + let req = ProxiedHttpRequest::new(info, body.to_vec(), timestamp, body_received_at); + this.postprocess_client_request(req); + Self::stream_to_client(this, req_id, conn_id, bknd_res) } @@ -1337,9 +1387,26 @@ where .unwrap() // safety: verified on start .to_string(); + // Build the inner TCP connector ourselves so we can flip + // TCP_NODELAY on after each connect + use actix_service::ServiceExt as _; + let tcp_nodelay = actix_tls::connect::Connector::new( + actix_tls::connect::Resolver::default(), + ) + .service() + .map(|conn: actix_tls::connect::Connection| { + let _ = conn.io_ref().set_nodelay(true); + conn + }); + let client = Client::builder() .add_default_header((header::HOST, host)) - .connector(Connector::new().conn_keep_alive(2 * timeout).limit(connections_limit)) + .connector( + Connector::new() + .connector(tcp_nodelay) + .conn_keep_alive(2 * timeout) + .limit(connections_limit), + ) .timeout(timeout) .finish(); From 3673175b14c39f6d830ce1af4d3ecfe6363fe823 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Thu, 4 Jun 2026 15:00:34 +0300 Subject: [PATCH 08/17] Drop request body buffering --- crates/rproxy/src/server/proxy/http/proxy.rs | 72 +++----------------- 1 file changed, 11 insertions(+), 61 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 30f5a4b..b47a2a1 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -475,60 +475,14 @@ where let req_id = info.req_id; let conn_id = info.conn_id; - // Buffer the client request body so we can forward it to the backend - // with a Content-Length header (instead of chunked Transfer-Encoding). - // Backends like op-rbuilder's authrpc reader can begin JSON parse as - // soon as the framed body arrives, rather than waiting for the final - // chunk delimiter. The trade-off is loss of upload pipelining; this is - // acceptable for loopback (localhost) backends where bandwidth is high - // and chunk framing overhead dominates. - let body = - match clnt_req_body.to_bytes_limited(this.shared.config().max_request_size()).await { - Ok(Ok(body)) => body, - - Ok(Err(err)) => { - warn!( - proxy = P::name(), - request_id = %req_id, - connection_id = %conn_id, - worker_id = %this.id, - thread_id = ?std::thread::current().id(), - backend_url = %this.backend.url, - latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - error = ?err, - "Failed to read client request body", - ); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); - return Ok(HttpResponse::BadRequest().body(format!("Bad request: {err:?}"))); - } - - Err(_) => { - warn!( - proxy = P::name(), - request_id = %req_id, - connection_id = %conn_id, - worker_id = %this.id, - thread_id = ?std::thread::current().id(), - backend_url = %this.backend.url, - latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - "Client request body exceeds max size", - ); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); - return Ok(HttpResponse::PayloadTooLarge().finish()); - } - }; - - let body_received_at = UtcDateTime::now(); - let bknd_req = this.backend.new_backend_request(&info); + let bknd_req_body = ProxyHttpRequestBody::new( + this.clone(), + info, + clnt_req_body, + this.shared.config().prealloacated_request_buffer_size(), + timestamp, + ); this.in_flight_backend.inc(); @@ -541,11 +495,10 @@ where thread_id = ?std::thread::current().id(), backend_url = %this.backend.url, latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - body_size = body.len(), - "Sending http request to backend (Content-Length)...", + "Streaming http request to backend...", ); - let bknd_res = match bknd_req.send_body(body.clone()).await { + let bknd_res = match bknd_req.send_stream(bknd_req_body).await { Ok(bknd_res) => bknd_res, Err(err) => { @@ -575,14 +528,11 @@ where thread_id = ?std::thread::current().id(), backend_url = %this.backend.url, latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - "Got http response from backend", + "Finished streaming http request to backend", ); this.in_flight_backend.dec(); - let req = ProxiedHttpRequest::new(info, body.to_vec(), timestamp, body_received_at); - this.postprocess_client_request(req); - Self::stream_to_client(this, req_id, conn_id, bknd_res) } @@ -1387,7 +1337,7 @@ where .unwrap() // safety: verified on start .to_string(); - // Build the inner TCP connector ourselves so we can flip + // Build the inner TCP connector ourselves so we can flip // TCP_NODELAY on after each connect use actix_service::ServiceExt as _; let tcp_nodelay = actix_tls::connect::Connector::new( From 4e59e106f469095522015b6e7756131552f7db1c Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:25:11 +0300 Subject: [PATCH 09/17] Set awc connection lifetime to infinite, tighten disconnect timeout --- crates/rproxy/src/server/proxy/http/proxy.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index b47a2a1..9f4798b 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -1355,6 +1355,8 @@ where Connector::new() .connector(tcp_nodelay) .conn_keep_alive(2 * timeout) + .conn_lifetime(Duration::from_secs(86_400)) + .disconnect_timeout(Duration::from_millis(100)) .limit(connections_limit), ) .timeout(timeout) From 202791e4202303f1c39c2d76f50cbf90d414a41e Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:26:08 +0300 Subject: [PATCH 10/17] Enable TCP_KEEPALIVE on backend connections --- crates/rproxy/src/server/proxy/http/proxy.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 9f4798b..49180df 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -1346,6 +1346,9 @@ where .service() .map(|conn: actix_tls::connect::Connection| { let _ = conn.io_ref().set_nodelay(true); + let _ = socket2::SockRef::from(conn.io_ref()).set_tcp_keepalive( + &socket2::TcpKeepalive::new().with_time(Duration::from_secs(60)), + ); conn }); From 104f5193d2348890bf8b0a26a51f0d91785f9d90 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:28:01 +0300 Subject: [PATCH 11/17] Drop HeaderMap clone, capture content-encoding only --- crates/rproxy/src/server/proxy/http/proxy.rs | 28 +++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 49180df..ee499f5 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -716,12 +716,13 @@ where let mut clnt_res = Self::to_client_response(&bknd_res); let preallocate = this.shared.config().prealloacated_response_buffer_size(); + let content_encoding = bknd_res.headers().get(header::CONTENT_ENCODING).cloned(); let bknd_res_body = ProxyHttpResponseBody::new( this, req_id, conn_id, status, - bknd_res.headers().clone(), + content_encoding, bknd_res.into_stream(), preallocate, timestamp, @@ -1428,11 +1429,13 @@ where BodySize::Sized(size) => size, // Body is always sized BodySize::None | BodySize::Stream => 0, }; + let content_encoding = + bknd_res.headers().get(header::CONTENT_ENCODING).cloned(); let info = ProxyHttpResponseInfo::new( clnt_req.info.req_id, clnt_req.info.conn_id, bknd_res.status(), - bknd_res.headers().clone(), + content_encoding, ); let mirr_res = ProxiedHttpResponse { info, @@ -1612,18 +1615,23 @@ pub(crate) struct ProxyHttpResponseInfo { req_id: Uuid, conn_id: Uuid, status: StatusCode, - headers: HeaderMap, // TODO: perhaps we don't need all headers, just select ones + content_encoding: Option, } impl ProxyHttpResponseInfo { - pub(crate) fn new(req_id: Uuid, conn_id: Uuid, status: StatusCode, headers: HeaderMap) -> Self { - Self { req_id, conn_id, status, headers } + pub(crate) fn new( + req_id: Uuid, + conn_id: Uuid, + status: StatusCode, + content_encoding: Option, + ) -> Self { + Self { req_id, conn_id, status, content_encoding } } fn content_encoding(&self) -> String { - self.headers - .get(header::CONTENT_ENCODING) - .map(|h| h.to_str().unwrap_or_default()) + self.content_encoding + .as_ref() + .and_then(|h| h.to_str().ok()) .map(|h| h.to_string()) .unwrap_or_default() } @@ -1806,7 +1814,7 @@ where req_id: Uuid, conn_id: Uuid, status: StatusCode, - headers: HeaderMap, + content_encoding: Option, body: S, preallocate: usize, timestamp: UtcDateTime, @@ -1818,7 +1826,7 @@ where start: timestamp, body: Vec::with_capacity(preallocate), max_size, - info: Some(ProxyHttpResponseInfo::new(req_id, conn_id, status, headers)), + info: Some(ProxyHttpResponseInfo::new(req_id, conn_id, status, content_encoding)), } } } From dc21efe12e0d95c3d1baf986a7b78f151ca79fc9 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:29:29 +0300 Subject: [PATCH 12/17] Defer postprocess_client_request off the critical path --- crates/rproxy/src/server/proxy/http/proxy.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index ee499f5..4561706 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -696,9 +696,17 @@ where this.in_flight_backend.dec(); - this.postprocess_client_request(req); - - Self::stream_to_client(this, req_id, conn_id, bknd_res) + // Initiate the response stream first so the client sees no extra + // bookkeeping latency on the critical path, then file the request + // into the in-flight map for the eventual response postprocessor. + // The insert must complete before the response body stream finishes + // (which is when `postprocess_backend_response` calls `remove_sync`); + // this is guaranteed because actix won't begin polling the streaming + // body until after this synchronous handler returns. + let this_clone = this.clone(); + let res = Self::stream_to_client(this, req_id, conn_id, bknd_res); + this_clone.postprocess_client_request(req); + res } fn stream_to_client( From 0bf93f2f2eccf9582f4bd9fc5d02044a085f70ff Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:31:52 +0300 Subject: [PATCH 13/17] Skip postprocess entirely for non-logged, non-mirrored requests --- crates/rproxy/src/server/proxy/http/proxy.rs | 34 ++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 4561706..dd77e3a 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -790,6 +790,40 @@ where mirroring_peers: Arc>>>, mut mirroring_peer_round_robin_index: usize, ) { + // Fast path: when nobody is logging the proxied request/response and + // there are no mirror peers configured, skip the (expensive) response + // decompress + parse and the (always-evaluated) `info!` formatting in + // `maybe_log_proxied_request_and_response`. We still parse the request + // for the jrpc method label so metrics keep their correct dimensions. + let log_off = + !inner.config().log_proxied_requests() && !inner.config().log_proxied_responses(); + let no_mirror = mirroring_peers.is_empty(); + if log_off && no_mirror { + if clnt_req.decompressed_size < clnt_req.size { + (clnt_req.decompressed_body, clnt_req.decompressed_size) = decompress( + clnt_req.body.clone(), + clnt_req.size, + clnt_req.info.content_encoding(), + ); + } + match serde_json::from_slice::(&clnt_req.decompressed_body) { + Ok(jrpc) => { + Self::emit_metrics_on_proxy_success(&jrpc, &clnt_req, &bknd_res, metrics); + } + Err(err) => { + warn!( + proxy = P::name(), + request_id = %clnt_req.info.req_id, + connection_id = %clnt_req.info.conn_id, + worker_id = %worker_id, + error = ?err, + "Failed to parse json-rpc request", + ); + } + } + return; + } + if clnt_req.decompressed_size < clnt_req.size { (clnt_req.decompressed_body, clnt_req.decompressed_size) = decompress(clnt_req.body.clone(), clnt_req.size, clnt_req.info.content_encoding()); From bcef4afa6f723c089e19e21548531a1b7a95c8a7 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:35:33 +0300 Subject: [PATCH 14/17] Reuse parsed JrpcRequestMetaMaybeBatch in finalise_proxying --- crates/rproxy/src/server/proxy/http/proxy.rs | 31 ++++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index dd77e3a..34f9486 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -618,7 +618,8 @@ where let (decompressed_body, decompressed_size) = decompress(body.clone(), size, info.content_encoding()); - match serde_json::from_slice::(&decompressed_body) { + let jrpc_meta = match serde_json::from_slice::(&decompressed_body) + { Ok(jrpc) => { if let Some(res) = this.shared.inner.should_intercept(&jrpc) { let json_req = if this.shared.config().log_proxied_requests() { @@ -644,6 +645,9 @@ where return res; } + // Stash the parsed jrpc so `finalise_proxying` can avoid + // re-parsing the same body. + Some(Arc::new(jrpc)) } Err(err) => { @@ -657,8 +661,9 @@ where error = ?err, "Failed to parse json-rpc request", ); + None } - } + }; let req = ProxiedHttpRequest { info, @@ -668,6 +673,7 @@ where decompressed_size, start: timestamp, end: UtcDateTime::now(), + jrpc_meta, }; let bknd_req = this.backend.new_backend_request(&req.info); @@ -799,6 +805,12 @@ where !inner.config().log_proxied_requests() && !inner.config().log_proxied_responses(); let no_mirror = mirroring_peers.is_empty(); if log_off && no_mirror { + // If the request body has already been parsed (intercept path) + // we can avoid a second decompress + parse. + if let Some(jrpc) = clnt_req.jrpc_meta.clone() { + Self::emit_metrics_on_proxy_success(&jrpc, &clnt_req, &bknd_res, metrics); + return; + } if clnt_req.decompressed_size < clnt_req.size { (clnt_req.decompressed_body, clnt_req.decompressed_size) = decompress( clnt_req.body.clone(), @@ -834,7 +846,15 @@ where decompress(bknd_res.body.clone(), bknd_res.size, bknd_res.info.content_encoding()); } - match serde_json::from_slice::(&clnt_req.decompressed_body) { + // Reuse the parsed jrpc body when the intercept path stashed it on + // the `ProxiedHttpRequest`, otherwise parse it now. + let parsed = if let Some(stashed) = clnt_req.jrpc_meta.clone() { + Ok(stashed) + } else { + serde_json::from_slice::(&clnt_req.decompressed_body) + .map(Arc::new) + }; + match parsed { Ok(jrpc) => { if inner.should_mirror(&jrpc, &clnt_req, &bknd_res) { let mirrors_count = match inner.config().mirroring_strategy() { @@ -1990,6 +2010,10 @@ pub(crate) struct ProxiedHttpRequest { decompressed_size: usize, start: UtcDateTime, end: UtcDateTime, + /// Optionally stashed jrpc meta parsed on the intercept path so that + /// `finalise_proxying` does not redo `serde_json::from_slice` on the + /// same body. `None` for the prod streaming path which never parses. + jrpc_meta: Option>, } impl ProxiedHttpRequest { @@ -2008,6 +2032,7 @@ impl ProxiedHttpRequest { decompressed_size: 0, start, end, + jrpc_meta: None, } } From c094e054b631c2ada2015c85ea284928e384c72c Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 17:40:47 +0300 Subject: [PATCH 15/17] Refcount body chunks instead of memcpy --- crates/rproxy/src/server/proxy/http/proxy.rs | 77 ++++++++++++++------ 1 file changed, 54 insertions(+), 23 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 34f9486..748daff 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -1711,7 +1711,8 @@ where info: Option, start: UtcDateTime, - body: Vec, + body: Vec, + body_len: usize, max_size: usize, #[pin] @@ -1723,11 +1724,15 @@ where C: ConfigProxyHttp, P: ProxyHttpInner, { + /// `_preallocate` is retained for API stability but is no longer used + /// to pre-size a flat buffer — chunks are now refcounted (`Bytes`) + /// and stored in a `Vec`, so there's no flat capacity to + /// reserve. fn new( proxy: web::Data>, info: ProxyHttpRequestInfo, body: S, - preallocate: usize, + _preallocate: usize, timestamp: UtcDateTime, ) -> Self { let max_size = proxy.shared.config().max_request_size(); @@ -1736,7 +1741,8 @@ where info: Some(info), stream: body, start: timestamp, - body: Vec::with_capacity(preallocate), + body: Vec::new(), + body_len: 0, max_size, } } @@ -1776,14 +1782,16 @@ where thread_id = ?std::thread::current().id(), latency_total = (UtcDateTime::now() - *this.start).as_seconds_f64(), chunk_size = chunk.len(), - total_size = this.body.len() + chunk.len(), + total_size = *this.body_len + chunk.len(), "Polled chunk of http request body", ); } - if this.body.len() + chunk.len() > *this.max_size { + if *this.body_len + chunk.len() > *this.max_size { return Poll::Ready(Some(Err(E::from(PayloadError::Overflow)))); } - this.body.extend_from_slice(&chunk); + // Refcount bump only — no memcpy of the chunk payload. + *this.body_len += chunk.len(); + this.body.push(chunk.clone()); Poll::Ready(Some(Ok(chunk))) } @@ -1805,7 +1813,7 @@ where worker_id = %this.proxy.id, thread_id = ?std::thread::current().id(), latency_total = (UtcDateTime::now() - *this.start).as_seconds_f64(), - total_size = this.body.len(), + total_size = *this.body_len, error = ?err, "Error while polling http request body", ); @@ -1831,13 +1839,14 @@ where worker_id = %this.proxy.id, thread_id = ?std::thread::current().id(), latency_total = (UtcDateTime::now() - *this.start).as_seconds_f64(), - total_size = this.body.len(), + total_size = *this.body_len, "Done polling http request body", ); } let end = UtcDateTime::now(); if let Some(info) = mem::take(this.info) { - let req = ProxiedHttpRequest::new(info, mem::take(this.body), *this.start, end); + let body = concat_bytes(mem::take(this.body)); + let req = ProxiedHttpRequest::new(info, body, *this.start, end); this.proxy.postprocess_client_request(req); } Poll::Ready(None) @@ -1858,7 +1867,8 @@ where info: Option, start: UtcDateTime, - body: Vec, + body: Vec, + body_len: usize, max_size: usize, #[pin] @@ -1870,6 +1880,10 @@ where C: ConfigProxyHttp, P: ProxyHttpInner, { + /// `_preallocate` is retained for API stability but is no longer used + /// to pre-size a flat buffer — chunks are now refcounted (`Bytes`) + /// and stored in a `Vec`, so there's no flat capacity to + /// reserve. #[allow(clippy::too_many_arguments)] fn new( proxy: web::Data>, @@ -1878,7 +1892,7 @@ where status: StatusCode, content_encoding: Option, body: S, - preallocate: usize, + _preallocate: usize, timestamp: UtcDateTime, ) -> Self { let max_size = proxy.shared.config().max_response_size(); @@ -1886,7 +1900,8 @@ where proxy, stream: body, start: timestamp, - body: Vec::with_capacity(preallocate), + body: Vec::new(), + body_len: 0, max_size, info: Some(ProxyHttpResponseInfo::new(req_id, conn_id, status, content_encoding)), } @@ -1927,14 +1942,16 @@ where thread_id = ?std::thread::current().id(), latency_total = (UtcDateTime::now() - *this.start).as_seconds_f64(), chunk_size = chunk.len(), - total_size = this.body.len() + chunk.len(), + total_size = *this.body_len + chunk.len(), "Polled chunk of http response body", ); } - if this.body.len() + chunk.len() > *this.max_size { + if *this.body_len + chunk.len() > *this.max_size { return Poll::Ready(Some(Err(E::from(PayloadError::Overflow)))); } - this.body.extend_from_slice(&chunk); + // Refcount bump only — no memcpy of the chunk payload. + *this.body_len += chunk.len(); + this.body.push(chunk.clone()); Poll::Ready(Some(Ok(chunk))) } @@ -1956,7 +1973,7 @@ where worker_id = %this.proxy.id, thread_id = ?std::thread::current().id(), latency_total = (UtcDateTime::now() - *this.start).as_seconds_f64(), - total_size = this.body.len(), + total_size = *this.body_len, error = ?err, "Error while polling http response body", ); @@ -1982,14 +1999,14 @@ where worker_id = %this.proxy.id, thread_id = ?std::thread::current().id(), latency_total = (UtcDateTime::now() - *this.start).as_seconds_f64(), - total_size = this.body.len(), + total_size = *this.body_len, "Done polling http response body", ); } let end = UtcDateTime::now(); if let Some(info) = mem::take(this.info) { - let res = - ProxiedHttpResponse::new(info, mem::take(this.body), *this.start, end); + let body = concat_bytes(mem::take(this.body)); + let res = ProxiedHttpResponse::new(info, body, *this.start, end); this.proxy.postprocess_backend_response(res); } Poll::Ready(None) @@ -1998,6 +2015,20 @@ where } } +/// Coalesce a vector of refcounted chunks into a single `Bytes`. +/// +/// - Empty: returns `Bytes::new()`. +/// - Single chunk: returns the lone `Bytes` directly (zero-copy). +/// - Multi-chunk: concatenates into one contiguous allocation. +#[inline] +fn concat_bytes(mut chunks: Vec) -> Bytes { + match chunks.len() { + 0 => Bytes::new(), + 1 => chunks.pop().unwrap(), + _ => Bytes::from(chunks.concat()), + } +} + // ProxiedHttpRequest -------------------------------------------------- #[derive(Clone, actix::Message)] @@ -2019,14 +2050,14 @@ pub(crate) struct ProxiedHttpRequest { impl ProxiedHttpRequest { pub(crate) fn new( info: ProxyHttpRequestInfo, - body: Vec, + body: Bytes, start: UtcDateTime, end: UtcDateTime, ) -> Self { let size = body.len(); Self { info, - body: Bytes::from(body), + body, size, decompressed_body: Bytes::new(), decompressed_size: 0, @@ -2069,14 +2100,14 @@ pub(crate) struct ProxiedHttpResponse { impl ProxiedHttpResponse { pub(crate) fn new( info: ProxyHttpResponseInfo, - body: Vec, + body: Bytes, start: UtcDateTime, end: UtcDateTime, ) -> Self { let size = body.len(); Self { info, - body: Bytes::from(body), + body, size, decompressed_body: Bytes::new(), decompressed_size: 0, From 13b66688ffbdb829d6f4647584c62cc70df5fb30 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:49:04 +0300 Subject: [PATCH 16/17] Drop ProxyHttp.requests HashMap, thread request through response --- crates/rproxy/src/server/proxy/http/proxy.rs | 149 +++++++++++++------ 1 file changed, 100 insertions(+), 49 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 748daff..caf249a 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -42,7 +42,7 @@ use pin_project::pin_project; use prometheus_client::metrics::{counter::Counter, gauge::Gauge}; use scc::HashMap; use time::{UtcDateTime, format_description::well_known::Iso8601}; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, oneshot}; use tracing::{debug, error, info, warn}; use url::Url; use uuid::Uuid; @@ -77,7 +77,6 @@ where shared: ProxyHttpSharedState, backend: ProxyHttpBackendEndpoint, - requests: HashMap, postprocessor: actix::Addr>, // Per-worker cached metric handles. These are cheap to clone (each @@ -159,7 +158,6 @@ where id, shared, backend, - requests: HashMap::default(), postprocessor, in_flight_client, in_flight_backend, @@ -476,12 +474,17 @@ where let conn_id = info.conn_id; let bknd_req = this.backend.new_backend_request(&info); + // Oneshot hands the completed ProxiedHttpRequest from the + // request body's terminal poll to the response body's terminal + // poll, replacing the previous `proxy.requests` HashMap lookup. + let (req_tx, req_rx) = oneshot::channel::(); let bknd_req_body = ProxyHttpRequestBody::new( this.clone(), info, clnt_req_body, this.shared.config().prealloacated_request_buffer_size(), timestamp, + req_tx, ); this.in_flight_backend.inc(); @@ -533,7 +536,7 @@ where this.in_flight_backend.dec(); - Self::stream_to_client(this, req_id, conn_id, bknd_res) + Self::stream_to_client(this, req_id, conn_id, bknd_res, req_rx) } async fn send_to_backend_and_maybe_intercept( @@ -702,17 +705,16 @@ where this.in_flight_backend.dec(); - // Initiate the response stream first so the client sees no extra - // bookkeeping latency on the critical path, then file the request - // into the in-flight map for the eventual response postprocessor. - // The insert must complete before the response body stream finishes - // (which is when `postprocess_backend_response` calls `remove_sync`); - // this is guaranteed because actix won't begin polling the streaming - // body until after this synchronous handler returns. - let this_clone = this.clone(); - let res = Self::stream_to_client(this, req_id, conn_id, bknd_res); - this_clone.postprocess_client_request(req); - res + // Hand the already-buffered request to the response body via + // oneshot. The response body's terminal poll receives it and + // dispatches the combo to the postprocessor. `tx.send` is a + // trivial slot-store (no HashMap insert, no contention), so we + // no longer need PR 1's stream-first-then-file ordering trick — + // the bookkeeping cost is gone entirely. + let (req_tx, req_rx) = oneshot::channel::(); + let _ = req_tx.send(req); + + Self::stream_to_client(this, req_id, conn_id, bknd_res, req_rx) } fn stream_to_client( @@ -720,6 +722,7 @@ where req_id: Uuid, conn_id: Uuid, bknd_res: ClientResponse, + req_rx: oneshot::Receiver, ) -> Result where S: Stream> + Unpin + 'static, @@ -740,6 +743,7 @@ where bknd_res.into_stream(), preallocate, timestamp, + req_rx, ); #[cfg(debug_assertions)] @@ -755,38 +759,6 @@ where Ok(clnt_res.streaming(bknd_res_body)) } - fn postprocess_client_request(&self, req: ProxiedHttpRequest) { - let id = req.info.req_id; - let conn_id = req.info.conn_id; - - if self.requests.insert_sync(id, req).is_err() { - error!( - proxy = P::name(), - request_id = %id, - connection_id = %conn_id, - worker_id = %self.id, - "Duplicate request id", - ); - }; - } - - fn postprocess_backend_response(&self, bknd_res: ProxiedHttpResponse) { - let Some((_, clnt_req)) = self.requests.remove_sync(&bknd_res.info.req_id) else { - error!( - proxy = P::name(), - request_id = %bknd_res.info.req_id, - connection_id = %bknd_res.info.conn_id, - worker_id = %self.id, - "Proxied http response for unmatching request", - ); - return - }; - - // hand over to postprocessor asynchronously so that we can return the - // response to the client as early as possible - self.postprocessor.do_send(ProxiedHttpCombo { req: clnt_req, res: bknd_res }); - } - fn finalise_proxying( mut clnt_req: ProxiedHttpRequest, mut bknd_res: ProxiedHttpResponse, @@ -1715,6 +1687,12 @@ where body_len: usize, max_size: usize, + /// Oneshot for handing the completed `ProxiedHttpRequest` off to the + /// response stream (which combines it with the response and forwards + /// the pair to the postprocessor). Replaces the previous + /// `ProxyHttp.requests` HashMap. + req_tx: Option>, + #[pin] stream: S, } @@ -1734,6 +1712,7 @@ where body: S, _preallocate: usize, timestamp: UtcDateTime, + req_tx: oneshot::Sender, ) -> Self { let max_size = proxy.shared.config().max_request_size(); Self { @@ -1744,6 +1723,7 @@ where body: Vec::new(), body_len: 0, max_size, + req_tx: Some(req_tx), } } } @@ -1847,7 +1827,14 @@ where if let Some(info) = mem::take(this.info) { let body = concat_bytes(mem::take(this.body)); let req = ProxiedHttpRequest::new(info, body, *this.start, end); - this.proxy.postprocess_client_request(req); + if let Some(tx) = this.req_tx.take() { + // Receiver lives on the matching response body. If + // the client disconnected before reading the + // response, the receiver is dropped and we silently + // discard the postprocess hand-off — there is no + // matching response to combine with anyway. + let _ = tx.send(req); + } } Poll::Ready(None) } @@ -1871,6 +1858,12 @@ where body_len: usize, max_size: usize, + /// Oneshot for receiving the matching `ProxiedHttpRequest` from the + /// request body stream. Combined with the completed response and + /// forwarded to the postprocessor on terminal poll. Replaces the + /// previous `ProxyHttp.requests` HashMap lookup. + req_rx: Option>, + #[pin] stream: S, } @@ -1894,6 +1887,7 @@ where body: S, _preallocate: usize, timestamp: UtcDateTime, + req_rx: oneshot::Receiver, ) -> Self { let max_size = proxy.shared.config().max_response_size(); Self { @@ -1904,6 +1898,7 @@ where body_len: 0, max_size, info: Some(ProxyHttpResponseInfo::new(req_id, conn_id, status, content_encoding)), + req_rx: Some(req_rx), } } } @@ -2007,7 +2002,63 @@ where if let Some(info) = mem::take(this.info) { let body = concat_bytes(mem::take(this.body)); let res = ProxiedHttpResponse::new(info, body, *this.start, end); - this.proxy.postprocess_backend_response(res); + // Combine with the matching request and dispatch to + // the postprocessor. The request body's terminal poll + // sends on the oneshot — usually before the response + // completes (backend processing time dominates), so + // `try_recv` typically resolves immediately. On a + // small/fast response that races ahead of the request + // body completion, fall back to awaiting on a spawned + // task. Closed/cancelled => no request to combine with; + // log and drop the postprocessing for this exchange. + let postprocessor = this.proxy.postprocessor.clone(); + let worker_id = this.proxy.id; + match this.req_rx.take() { + Some(mut rx) => match rx.try_recv() { + Ok(req) => { + postprocessor.do_send(ProxiedHttpCombo { req, res }); + } + Err(oneshot::error::TryRecvError::Empty) => { + let req_id = res.info.req_id; + let conn_id = res.info.conn_id; + actix::spawn(async move { + match rx.await { + Ok(req) => { + postprocessor + .do_send(ProxiedHttpCombo { req, res }); + } + Err(_) => { + error!( + proxy = P::name(), + request_id = %req_id, + connection_id = %conn_id, + worker_id = %worker_id, + "Proxied http response for unmatching request (oneshot closed)", + ); + } + } + }); + } + Err(oneshot::error::TryRecvError::Closed) => { + error!( + proxy = P::name(), + request_id = %res.info.req_id, + connection_id = %res.info.conn_id, + worker_id = %worker_id, + "Proxied http response for unmatching request (oneshot closed)", + ); + } + }, + None => { + error!( + proxy = P::name(), + request_id = %res.info.req_id, + connection_id = %res.info.conn_id, + worker_id = %worker_id, + "Proxied http response without a request receiver", + ); + } + } } Poll::Ready(None) } From be5e85467c57896681cc59b136d618ec99fcaa5e Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 19:05:50 +0300 Subject: [PATCH 17/17] Observe oneshot spawn-fallback in postprocess pairing --- crates/rproxy/src/server/metrics.rs | 10 ++++++++++ crates/rproxy/src/server/proxy/http/proxy.rs | 15 +++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/crates/rproxy/src/server/metrics.rs b/crates/rproxy/src/server/metrics.rs index 638846d..ba167ad 100644 --- a/crates/rproxy/src/server/metrics.rs +++ b/crates/rproxy/src/server/metrics.rs @@ -47,6 +47,8 @@ pub(crate) struct Metrics { pub(crate) http_proxy_success_count: Family, pub(crate) http_proxy_failure_count: Family, + pub(crate) proxy_oneshot_spawn_fallback_count: Family, + pub(crate) http_request_size: Family, pub(crate) http_response_size: Family, @@ -91,6 +93,8 @@ impl Metrics { http_proxy_success_count: Family::default(), http_proxy_failure_count: Family::default(), + proxy_oneshot_spawn_fallback_count: Family::default(), + http_request_size: Family::default(), http_response_size: Family::default(), @@ -191,6 +195,12 @@ impl Metrics { this.http_proxy_failure_count.clone(), ); + this.registry.register( + "proxy_oneshot_spawn_fallback_count", + "count of postprocess pairings that fell back to spawned await on the request oneshot", + this.proxy_oneshot_spawn_fallback_count.clone(), + ); + this.registry.register_with_unit( "http_request_size", "sizes of incoming http requests", diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index caf249a..5062f87 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -85,6 +85,7 @@ where in_flight_client: Gauge, in_flight_backend: Gauge, proxy_failure_count: Counter, + oneshot_spawn_fallback_count: Counter, // Per-worker cache of (user_agent -> Counter) so that we only pay the // `Family::get_or_create` cost the first time a UA is seen on this @@ -153,6 +154,11 @@ where shared.metrics.http_in_flight_requests_backend.get_or_create(&labels_proxy).clone(); let proxy_failure_count = shared.metrics.http_proxy_failure_count.get_or_create(&labels_proxy).clone(); + let oneshot_spawn_fallback_count = shared + .metrics + .proxy_oneshot_spawn_fallback_count + .get_or_create(&labels_proxy) + .clone(); Self { id, @@ -162,6 +168,7 @@ where in_flight_client, in_flight_backend, proxy_failure_count, + oneshot_spawn_fallback_count, client_info_cache: HashMap::default(), } } @@ -2021,6 +2028,14 @@ where Err(oneshot::error::TryRecvError::Empty) => { let req_id = res.info.req_id; let conn_id = res.info.conn_id; + this.proxy.oneshot_spawn_fallback_count.inc(); + warn!( + proxy = P::name(), + request_id = %req_id, + connection_id = %conn_id, + worker_id = %worker_id, + "Proxied http response terminal poll raced ahead of request; spawning oneshot await fallback", + ); actix::spawn(async move { match rx.await { Ok(req) => {