From 1e9a8845bacaa8b360324bc80408cc9fb70be875 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Thu, 4 Jun 2026 11:48:28 +0300 Subject: [PATCH 1/2] Set TCP_NODELAY on backend connections --- crates/rproxy/src/server/proxy/http/proxy.rs | 89 +++++++++++++++++--- 1 file changed, 78 insertions(+), 11 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 131b928..ee17ec2 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -435,14 +435,60 @@ where let req_id = info.req_id; let conn_id = info.conn_id; + // Buffer the client request body so we can forward it to the backend + // with a Content-Length header (instead of chunked Transfer-Encoding). + // Backends like op-rbuilder's authrpc reader can begin JSON parse as + // soon as the framed body arrives, rather than waiting for the final + // chunk delimiter. The trade-off is loss of upload pipelining; this is + // acceptable for loopback (localhost) backends where bandwidth is high + // and chunk framing overhead dominates. + let body = + match clnt_req_body.to_bytes_limited(this.shared.config().max_request_size()).await { + Ok(Ok(body)) => body, + + Ok(Err(err)) => { + warn!( + proxy = P::name(), + request_id = %req_id, + connection_id = %conn_id, + worker_id = %this.id, + thread_id = ?std::thread::current().id(), + backend_url = %this.backend.url, + latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), + error = ?err, + "Failed to read client request body", + ); + this.shared + .metrics + .http_proxy_failure_count + .get_or_create(&LabelsProxy { proxy: P::name() }) + .inc(); + return Ok(HttpResponse::BadRequest().body(format!("Bad request: {err:?}"))); + } + + Err(_) => { + warn!( + proxy = P::name(), + request_id = %req_id, + connection_id = %conn_id, + worker_id = %this.id, + thread_id = ?std::thread::current().id(), + backend_url = %this.backend.url, + latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), + "Client request body exceeds max size", + ); + this.shared + .metrics + .http_proxy_failure_count + .get_or_create(&LabelsProxy { proxy: P::name() }) + .inc(); + return Ok(HttpResponse::PayloadTooLarge().finish()); + } + }; + + let body_received_at = UtcDateTime::now(); + let bknd_req = this.backend.new_backend_request(&info); - let bknd_req_body = ProxyHttpRequestBody::new( - this.clone(), - info, - clnt_req_body, - this.shared.config().prealloacated_request_buffer_size(), - timestamp, - ); this.shared .metrics @@ -459,10 +505,11 @@ where thread_id = ?std::thread::current().id(), backend_url = %this.backend.url, latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - "Streaming http request to backend...", + body_size = body.len(), + "Sending http request to backend (Content-Length)...", ); - let bknd_res = match bknd_req.send_stream(bknd_req_body).await { + let bknd_res = match bknd_req.send_body(body.clone()).await { Ok(bknd_res) => bknd_res, Err(err) => { @@ -500,7 +547,7 @@ where thread_id = ?std::thread::current().id(), backend_url = %this.backend.url, latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - "Finished streaming http request to backend", + "Got http response from backend", ); this.shared @@ -509,6 +556,9 @@ where .get_or_create(&LabelsProxy { proxy: P::name() }) .dec(); + let req = ProxiedHttpRequest::new(info, body.to_vec(), timestamp, body_received_at); + this.postprocess_client_request(req); + Self::stream_to_client(this, req_id, conn_id, bknd_res) } @@ -1353,9 +1403,26 @@ where .unwrap() // safety: verified on start .to_string(); + // Build the inner TCP connector ourselves so we can flip + // TCP_NODELAY on after each connect + use actix_service::ServiceExt as _; + let tcp_nodelay = actix_tls::connect::Connector::new( + actix_tls::connect::Resolver::default(), + ) + .service() + .map(|conn: actix_tls::connect::Connection| { + let _ = conn.io_ref().set_nodelay(true); + conn + }); + let client = Client::builder() .add_default_header((header::HOST, host)) - .connector(Connector::new().conn_keep_alive(2 * timeout).limit(connections_limit)) + .connector( + Connector::new() + .connector(tcp_nodelay) + .conn_keep_alive(2 * timeout) + .limit(connections_limit), + ) .timeout(timeout) .finish(); From 2c6f4758db9fa0318720464924b61c12d5590d47 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Thu, 4 Jun 2026 15:00:34 +0300 Subject: [PATCH 2/2] Drop request body buffering --- crates/rproxy/src/server/proxy/http/proxy.rs | 72 +++----------------- 1 file changed, 11 insertions(+), 61 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index ee17ec2..3ad30af 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -435,60 +435,14 @@ where let req_id = info.req_id; let conn_id = info.conn_id; - // Buffer the client request body so we can forward it to the backend - // with a Content-Length header (instead of chunked Transfer-Encoding). - // Backends like op-rbuilder's authrpc reader can begin JSON parse as - // soon as the framed body arrives, rather than waiting for the final - // chunk delimiter. The trade-off is loss of upload pipelining; this is - // acceptable for loopback (localhost) backends where bandwidth is high - // and chunk framing overhead dominates. - let body = - match clnt_req_body.to_bytes_limited(this.shared.config().max_request_size()).await { - Ok(Ok(body)) => body, - - Ok(Err(err)) => { - warn!( - proxy = P::name(), - request_id = %req_id, - connection_id = %conn_id, - worker_id = %this.id, - thread_id = ?std::thread::current().id(), - backend_url = %this.backend.url, - latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - error = ?err, - "Failed to read client request body", - ); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); - return Ok(HttpResponse::BadRequest().body(format!("Bad request: {err:?}"))); - } - - Err(_) => { - warn!( - proxy = P::name(), - request_id = %req_id, - connection_id = %conn_id, - worker_id = %this.id, - thread_id = ?std::thread::current().id(), - backend_url = %this.backend.url, - latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - "Client request body exceeds max size", - ); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); - return Ok(HttpResponse::PayloadTooLarge().finish()); - } - }; - - let body_received_at = UtcDateTime::now(); - let bknd_req = this.backend.new_backend_request(&info); + let bknd_req_body = ProxyHttpRequestBody::new( + this.clone(), + info, + clnt_req_body, + this.shared.config().prealloacated_request_buffer_size(), + timestamp, + ); this.shared .metrics @@ -505,11 +459,10 @@ where thread_id = ?std::thread::current().id(), backend_url = %this.backend.url, latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - body_size = body.len(), - "Sending http request to backend (Content-Length)...", + "Streaming http request to backend...", ); - let bknd_res = match bknd_req.send_body(body.clone()).await { + let bknd_res = match bknd_req.send_stream(bknd_req_body).await { Ok(bknd_res) => bknd_res, Err(err) => { @@ -547,7 +500,7 @@ where thread_id = ?std::thread::current().id(), backend_url = %this.backend.url, latency_total = (UtcDateTime::now() - timestamp).as_seconds_f64(), - "Got http response from backend", + "Finished streaming http request to backend", ); this.shared @@ -556,9 +509,6 @@ where .get_or_create(&LabelsProxy { proxy: P::name() }) .dec(); - let req = ProxiedHttpRequest::new(info, body.to_vec(), timestamp, body_received_at); - this.postprocess_client_request(req); - Self::stream_to_client(this, req_id, conn_id, bknd_res) } @@ -1403,7 +1353,7 @@ where .unwrap() // safety: verified on start .to_string(); - // Build the inner TCP connector ourselves so we can flip + // Build the inner TCP connector ourselves so we can flip // TCP_NODELAY on after each connect use actix_service::ServiceExt as _; let tcp_nodelay = actix_tls::connect::Connector::new(