From 853dff354fb9cf3b43aca60d4de6b1b6d43bcd94 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sun, 21 Jun 2026 13:42:39 +0300 Subject: [PATCH 1/8] feat(streaming): expose server-streaming RPCs as SSE - Negotiate Server-Sent Events via `Accept: text/event-stream`; NDJSON stays the default, so existing clients are unaffected - Frame SSE through axum's Sse/Event with a configurable keep-alive (streaming.sse_keep_alive_secs, default 15s) to survive LB/nginx idle read timeouts - Deliver a gRPC error mid-stream as an explicit terminal frame (event: error / final JSON line) instead of truncating the body - Share one SerializeOptions constructor between unary and streaming so a message serializes identically regardless of RPC kind Closes #51 --- README.md | 10 +- src/config.rs | 30 +++++ src/lib.rs | 4 + src/transcode/error.rs | 2 +- src/transcode/mod.rs | 249 +++++++++++++++++++++++++++++++++++------ tests/embedded.rs | 1 + 6 files changed, 262 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 1e38706..9140dd5 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Works with **any** gRPC service via proto descriptor files. No code generation, - **Full request mapping**: path params, query parameters (typed + repeated + nested), and `body` (`*` / named field / none) - **`response_body`** to return a single response subfield, and **`additional_bindings`** for multiple routes per RPC - **Auto-generated OpenAPI** documentation from proto messages, served at `/openapi.json` -- **Server-streaming** RPC → chunked HTTP responses +- **Server-streaming** RPC → NDJSON by default, or Server-Sent Events when the client sends `Accept: text/event-stream` (configurable keep-alive); a gRPC error mid-stream is delivered as an explicit terminal frame in both formats - **gRPC → HTTP status mapping** following the standard `google.rpc.Code` table - **Header forwarding** from HTTP requests to gRPC metadata (configurable allow-list) - **Context propagation**: W3C trace-context (`traceparent` forwarded or synthesized) and client deadlines (`grpc-timeout`) carried across the REST↔gRPC boundary @@ -81,6 +81,14 @@ maintenance: enabled: false message: "Service is under maintenance. Please try again later." +# Optional: server-streaming response behavior. +# Streaming RPCs return NDJSON by default; clients sending +# `Accept: text/event-stream` get Server-Sent Events instead. +streaming: + # SSE keep-alive interval (seconds). Comment frames keep idle streams alive + # through load balancers / nginx read timeouts. Default: 15. + sse_keep_alive_secs: 15 + # Rate limiting (Shield) shield: enabled: true diff --git a/src/config.rs b/src/config.rs index 9f31004..9e36f8c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -69,6 +69,10 @@ pub struct ProxyConfig { /// Headers to forward from HTTP to gRPC metadata. #[serde(default = "default_forwarded_headers")] pub forwarded_headers: Vec, + + /// Server-streaming response behavior. + #[serde(default)] + pub streaming: StreamingConfig, } fn default_forwarded_headers() -> Vec { @@ -85,6 +89,32 @@ fn default_forwarded_headers() -> Vec { ] } +/// Server-streaming response behavior. +/// +/// Server-streaming RPCs are exposed as NDJSON by default and as Server-Sent +/// Events when the client sends `Accept: text/event-stream`. The keep-alive +/// interval applies only to the SSE path. +#[derive(Debug, Clone, Deserialize)] +pub struct StreamingConfig { + /// SSE keep-alive interval in seconds. Comment frames are emitted on idle + /// streams to keep intermediaries (load balancers, nginx) from closing the + /// connection on read timeout. Default: 15. + #[serde(default = "default_sse_keep_alive_secs")] + pub sse_keep_alive_secs: u64, +} + +fn default_sse_keep_alive_secs() -> u64 { + 15 +} + +impl Default for StreamingConfig { + fn default() -> Self { + Self { + sse_keep_alive_secs: default_sse_keep_alive_secs(), + } + } +} + /// Upstream gRPC service configuration. #[derive(Debug, Clone, Deserialize)] pub struct UpstreamConfig { diff --git a/src/lib.rs b/src/lib.rs index d981eb7..e125af3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,6 +66,8 @@ pub struct ProxyState { pub metrics_namespace: String, /// Path class patterns for metrics. pub metrics_classes: Vec, + /// SSE keep-alive interval (seconds) for server-streaming responses. + pub sse_keep_alive_secs: u64, } /// Universal proxy server. @@ -154,6 +156,7 @@ impl ProxyServer { forwarded_headers: self.config.forwarded_headers.clone(), metrics_namespace, metrics_classes: self.config.metrics_classes.clone(), + sse_keep_alive_secs: self.config.streaming.sse_keep_alive_secs, }; let cors = self.build_cors(); @@ -461,6 +464,7 @@ upstream: forwarded_headers: vec![], metrics_namespace: "test".into(), metrics_classes: vec![], + sse_keep_alive_secs: 15, }; let check = |path: &str| -> bool { diff --git a/src/transcode/error.rs b/src/transcode/error.rs index 064982a..0bc6860 100644 --- a/src/transcode/error.rs +++ b/src/transcode/error.rs @@ -44,7 +44,7 @@ pub fn status_to_response(status: tonic::Status) -> Response { } /// Human-readable gRPC code name for JSON error responses. -fn grpc_code_name(code: tonic::Code) -> &'static str { +pub(crate) fn grpc_code_name(code: tonic::Code) -> &'static str { match code { tonic::Code::Ok => "OK", tonic::Code::Cancelled => "CANCELLED", diff --git a/src/transcode/mod.rs b/src/transcode/mod.rs index feb4b7f..98171d7 100644 --- a/src/transcode/mod.rs +++ b/src/transcode/mod.rs @@ -13,6 +13,7 @@ pub mod request; use axum::extract::{Path, RawQuery, State}; use axum::http::{HeaderMap, StatusCode}; +use axum::response::sse::{Event, KeepAlive, Sse}; use axum::response::{IntoResponse, Response}; use axum::routing::{delete, get, patch, post, put, MethodRouter}; use axum::{Json, Router}; @@ -31,6 +32,8 @@ pub trait TranscodeState: Clone + Send + Sync + 'static { fn grpc_channel(&self) -> tonic::transport::Channel; /// Headers to forward from HTTP to gRPC metadata. fn forwarded_headers(&self) -> &[String]; + /// SSE keep-alive interval (seconds) for server-streaming responses. + fn sse_keep_alive_secs(&self) -> u64; } impl TranscodeState for crate::ProxyState { @@ -40,6 +43,9 @@ impl TranscodeState for crate::ProxyState { fn forwarded_headers(&self) -> &[String] { &self.forwarded_headers } + fn sse_keep_alive_secs(&self) -> u64 { + self.sse_keep_alive_secs + } } /// Route entry extracted from proto HTTP annotations. @@ -173,7 +179,55 @@ pub fn routes(pool: &DescriptorPool, aliases: &[AliasConfig]) router } -/// Handler for server-streaming RPCs (NDJSON response). +/// JSON serialization options shared by the unary and streaming response paths, +/// so a given message serializes identically regardless of RPC kind. +fn response_serialize_options() -> SerializeOptions { + SerializeOptions::new() + .skip_default_fields(false) + .stringify_64_bit_integers(true) +} + +/// Serialize one streamed gRPC message to a compact JSON string. +fn message_to_json_string(msg: &DynamicMessage, opts: &SerializeOptions) -> Result { + let value = msg + .serialize_with_options(serde_json::value::Serializer, opts) + .map_err(|e| e.to_string())?; + serde_json::to_string(&value).map_err(|e| e.to_string()) +} + +/// Terminal error frame for a stream that failed mid-flight. Shared by the +/// NDJSON and SSE paths so a client sees the same shape in either format. +fn stream_error_json(status: &tonic::Status) -> serde_json::Value { + serde_json::json!({ + "error": error::grpc_code_name(status.code()), + "message": status.message(), + "code": status.code() as i32, + }) +} + +/// Whether the client negotiated a Server-Sent Events response via `Accept`. +/// +/// Matches `text/event-stream` in any position of a comma-separated `Accept` +/// list, ignoring media-type parameters (`;q=...`) and case. +fn wants_sse(headers: &HeaderMap) -> bool { + headers + .get(axum::http::header::ACCEPT) + .and_then(|v| v.to_str().ok()) + .is_some_and(|accept| { + accept.split(',').any(|part| { + part.split(';') + .next() + .is_some_and(|media| media.trim().eq_ignore_ascii_case("text/event-stream")) + }) + }) +} + +/// Handler for server-streaming RPCs. +/// +/// Returns Server-Sent Events when the client sends `Accept: text/event-stream`, +/// otherwise newline-delimited JSON (NDJSON). In both formats a gRPC error +/// mid-stream is delivered as an explicit terminal frame before the stream is +/// closed cleanly, rather than truncating the HTTP body. async fn streaming_handler( State(proxy_state): State, headers: HeaderMap, @@ -206,45 +260,85 @@ async fn streaming_handler( .into_response(); } + let use_sse = wants_sse(&headers); + match grpc_client .server_streaming(grpc_request, grpc_path, grpc_codec) .await { Ok(response) => { let stream = response.into_inner(); - let serialize_opts = SerializeOptions::new() - .skip_default_fields(false) - .stringify_64_bit_integers(true); - - let byte_stream = stream.map(move |result| match result { - Ok(msg) => { - match msg.serialize_with_options(serde_json::value::Serializer, &serialize_opts) - { - Ok(json_value) => { - let mut bytes = serde_json::to_vec(&json_value).unwrap_or_default(); - bytes.push(b'\n'); - Ok::(axum::body::Bytes::from(bytes)) - } - Err(e) => Err(std::io::Error::other(format!("serialization error: {e}"))), - } - } - Err(status) => Err(std::io::Error::other(format!( - "gRPC stream error: {status}" - ))), - }); - - let body = axum::body::Body::from_stream(byte_stream); - Response::builder() - .status(StatusCode::OK) - .header("content-type", "application/x-ndjson") - .header("transfer-encoding", "chunked") - .body(body) - .unwrap_or_else(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response()) + if use_sse { + sse_response(stream, proxy_state.sse_keep_alive_secs()) + } else { + ndjson_response(stream) + } } Err(status) => error::status_to_response(status), } } +/// Build an NDJSON (`application/x-ndjson`) streaming response. +fn ndjson_response(stream: St) -> Response +where + St: futures::Stream> + Send + 'static, +{ + let opts = response_serialize_options(); + let byte_stream = stream.map(move |result| { + let mut line = match result { + Ok(msg) => match message_to_json_string(&msg, &opts) { + Ok(s) => s, + Err(e) => serde_json::json!({ + "error": "INTERNAL", + "message": format!("serialization error: {e}"), + }) + .to_string(), + }, + Err(status) => stream_error_json(&status).to_string(), + }; + line.push('\n'); + Ok::(axum::body::Bytes::from(line)) + }); + + let body = axum::body::Body::from_stream(byte_stream); + Response::builder() + .status(StatusCode::OK) + .header("content-type", "application/x-ndjson") + .header("transfer-encoding", "chunked") + .body(body) + .unwrap_or_else(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response()) +} + +/// Build a Server-Sent Events (`text/event-stream`) streaming response. +fn sse_response(stream: St, keep_alive_secs: u64) -> Response +where + St: futures::Stream> + Send + 'static, +{ + let opts = response_serialize_options(); + let event_stream = stream.map(move |result| { + let event = match result { + Ok(msg) => match message_to_json_string(&msg, &opts) { + Ok(s) => Event::default().data(s), + Err(e) => Event::default().event("error").data( + serde_json::json!({ + "error": "INTERNAL", + "message": format!("serialization error: {e}"), + }) + .to_string(), + ), + }, + Err(status) => Event::default() + .event("error") + .data(stream_error_json(&status).to_string()), + }; + Ok::(event) + }); + + Sse::new(event_stream) + .keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(keep_alive_secs))) + .into_response() +} + /// Generic transcoding handler. async fn transcode_handler( State(proxy_state): State, @@ -354,9 +448,7 @@ async fn transcode_handler( match grpc_client.unary(grpc_request, grpc_path, grpc_codec).await { Ok(response) => { let response_msg = response.into_inner(); - let serialize_opts = SerializeOptions::new() - .skip_default_fields(false) - .stringify_64_bit_integers(true); + let serialize_opts = response_serialize_options(); match response_msg .serialize_with_options(serde_json::value::Serializer, &serialize_opts) { @@ -864,4 +956,97 @@ mod tests { .route(&nested, get(|| async { "ok" })) .route(&catch_all, get(|| async { "ok" })); } + + /// Build a single-message descriptor pool with one `Item { name, count }` + /// message, used to exercise the streaming serialization helpers. + fn item_message() -> DynamicMessage { + use prost_reflect::prost::Message; + use prost_reflect::prost_types::{ + field_descriptor_proto::{Label, Type}, + DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet, + }; + + let item = DescriptorProto { + name: Some("Item".to_string()), + field: vec![ + FieldDescriptorProto { + name: Some("name".to_string()), + number: Some(1), + label: Some(Label::Optional as i32), + r#type: Some(Type::String as i32), + ..Default::default() + }, + FieldDescriptorProto { + name: Some("count".to_string()), + number: Some(2), + label: Some(Label::Optional as i32), + r#type: Some(Type::Int64 as i32), + ..Default::default() + }, + ], + ..Default::default() + }; + let file = FileDescriptorProto { + name: Some("item.proto".to_string()), + package: Some("test.v1".to_string()), + message_type: vec![item], + syntax: Some("proto3".to_string()), + ..Default::default() + }; + let mut bytes = Vec::new(); + FileDescriptorSet { file: vec![file] } + .encode(&mut bytes) + .unwrap(); + let pool = DescriptorPool::decode(bytes.as_slice()).unwrap(); + let desc = pool.get_message_by_name("test.v1.Item").unwrap(); + + let mut msg = DynamicMessage::new(desc); + msg.set_field_by_name("name", prost_reflect::Value::String("alice".to_string())); + msg.set_field_by_name("count", prost_reflect::Value::I64(42)); + msg + } + + #[test] + fn wants_sse_detects_event_stream_accept() { + let mut headers = HeaderMap::new(); + headers.insert("accept", "text/event-stream".parse().unwrap()); + assert!(wants_sse(&headers)); + } + + #[test] + fn wants_sse_matches_within_list_and_ignores_params() { + let mut headers = HeaderMap::new(); + headers.insert( + "accept", + "application/json, text/event-stream;q=0.9".parse().unwrap(), + ); + assert!(wants_sse(&headers)); + } + + #[test] + fn wants_sse_false_for_json_and_missing() { + let mut headers = HeaderMap::new(); + headers.insert("accept", "application/json".parse().unwrap()); + assert!(!wants_sse(&headers)); + assert!(!wants_sse(&HeaderMap::new())); + } + + #[test] + fn message_to_json_string_stringifies_64bit() { + let opts = response_serialize_options(); + let json = message_to_json_string(&item_message(), &opts).unwrap(); + let value: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(value["name"], "alice"); + // 64-bit integers are stringified to survive JS number precision limits. + assert_eq!(value["count"], "42"); + } + + #[test] + fn stream_error_json_carries_grpc_code_name() { + let status = tonic::Status::permission_denied("nope"); + let value = stream_error_json(&status); + assert_eq!(value["error"], "PERMISSION_DENIED"); + assert_eq!(value["message"], "nope"); + assert_eq!(value["code"], tonic::Code::PermissionDenied as i32); + } } diff --git a/tests/embedded.rs b/tests/embedded.rs index 0f88510..1b3553f 100644 --- a/tests/embedded.rs +++ b/tests/embedded.rs @@ -42,6 +42,7 @@ fn embedded_config_is_constructible() { // bypasses the serde defaults, so set every header you need here (or load // the config via from_file / from_yaml_str, where the default list applies). forwarded_headers: vec!["authorization".into()], + streaming: Default::default(), }; // The server accepts a programmatically-built config (the embedded path). let _server = ProxyServer::from_config(config); From 87fa7f55e89e5958c1860ee816fede70eeb4bcec Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sun, 21 Jun 2026 14:40:22 +0300 Subject: [PATCH 2/8] test(streaming): add regression tests for Accept negotiation Cover RFC 7231 q=0 rejection and multi-line Accept headers for the SSE content negotiation. Both fail against the current single-header, q-unaware wants_sse and pass once it parses get_all() + quality factors. --- src/transcode/mod.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/transcode/mod.rs b/src/transcode/mod.rs index 98171d7..54a2989 100644 --- a/src/transcode/mod.rs +++ b/src/transcode/mod.rs @@ -1031,6 +1031,25 @@ mod tests { assert!(!wants_sse(&HeaderMap::new())); } + #[test] + fn wants_sse_rejects_explicit_q_zero() { + // RFC 7231 §5.3.1: `q=0` means the media type is explicitly NOT + // acceptable, so it must not select the SSE path. + let mut headers = HeaderMap::new(); + headers.insert("accept", "text/event-stream;q=0".parse().unwrap()); + assert!(!wants_sse(&headers)); + } + + #[test] + fn wants_sse_honors_second_accept_header_line() { + // A client may send multiple `Accept` header lines; the negotiation + // must consider all of them, not just the first. + let mut headers = HeaderMap::new(); + headers.append("accept", "application/json".parse().unwrap()); + headers.append("accept", "text/event-stream".parse().unwrap()); + assert!(wants_sse(&headers)); + } + #[test] fn message_to_json_string_stringifies_64bit() { let opts = response_serialize_options(); From 2b19a838534dc42f0bacd22582dc6e29d1336f73 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sun, 21 Jun 2026 14:40:57 +0300 Subject: [PATCH 3/8] fix(streaming): parse all Accept headers and quality factors for SSE wants_sse now scans every Accept header line via get_all() instead of only the first, and respects the RFC 7231 quality factor so text/event-stream;q=0 no longer selects the SSE path. --- src/transcode/mod.rs | 41 ++++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/src/transcode/mod.rs b/src/transcode/mod.rs index 54a2989..af12ce7 100644 --- a/src/transcode/mod.rs +++ b/src/transcode/mod.rs @@ -207,19 +207,38 @@ fn stream_error_json(status: &tonic::Status) -> serde_json::Value { /// Whether the client negotiated a Server-Sent Events response via `Accept`. /// -/// Matches `text/event-stream` in any position of a comma-separated `Accept` -/// list, ignoring media-type parameters (`;q=...`) and case. +/// Considers every `Accept` header line (a client may send more than one) and +/// every comma-separated media range within each. Matches `text/event-stream` +/// case-insensitively and honors the quality factor: per RFC 7231 §5.3.1 a +/// `q=0` weight means the type is explicitly not acceptable, so it does not +/// select the SSE path. fn wants_sse(headers: &HeaderMap) -> bool { headers - .get(axum::http::header::ACCEPT) - .and_then(|v| v.to_str().ok()) - .is_some_and(|accept| { - accept.split(',').any(|part| { - part.split(';') - .next() - .is_some_and(|media| media.trim().eq_ignore_ascii_case("text/event-stream")) - }) - }) + .get_all(axum::http::header::ACCEPT) + .iter() + .filter_map(|v| v.to_str().ok()) + .flat_map(|accept| accept.split(',')) + .any(accept_range_selects_sse) +} + +/// Whether a single `Accept` media range selects `text/event-stream` with a +/// non-zero quality factor. +fn accept_range_selects_sse(range: &str) -> bool { + let mut parts = range.split(';'); + let media = parts.next().unwrap_or("").trim(); + if !media.eq_ignore_ascii_case("text/event-stream") { + return false; + } + // Default weight is 1.0; only an explicit `q=0` (or unparseable-as-positive) + // disqualifies the match. A malformed weight falls back to acceptable. + for param in parts { + let mut kv = param.splitn(2, '='); + if kv.next().unwrap_or("").trim().eq_ignore_ascii_case("q") { + let q: f32 = kv.next().unwrap_or("").trim().parse().unwrap_or(1.0); + return q > 0.0; + } + } + true } /// Handler for server-streaming RPCs. From b782fe77215f52e0f6451ad84c1a65be1a678849 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sun, 21 Jun 2026 14:42:12 +0300 Subject: [PATCH 4/8] fix(streaming): let hyper choose NDJSON body framing Drop the hand-set transfer-encoding: chunked header. hyper selects chunked on HTTP/1.1 and DATA frames on HTTP/2 from the protocol version; a manual transfer-encoding is redundant on h1 and illegal on h2. --- src/transcode/mod.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/transcode/mod.rs b/src/transcode/mod.rs index af12ce7..e3b3d1c 100644 --- a/src/transcode/mod.rs +++ b/src/transcode/mod.rs @@ -320,10 +320,12 @@ where }); let body = axum::body::Body::from_stream(byte_stream); + // Body framing (chunked on HTTP/1.1, DATA frames on HTTP/2) is chosen by + // hyper from the protocol version; setting transfer-encoding by hand would + // be redundant on HTTP/1.1 and illegal on HTTP/2. Response::builder() .status(StatusCode::OK) .header("content-type", "application/x-ndjson") - .header("transfer-encoding", "chunked") .body(body) .unwrap_or_else(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response()) } @@ -1079,6 +1081,20 @@ mod tests { assert_eq!(value["count"], "42"); } + #[test] + fn ndjson_response_omits_manual_transfer_encoding() { + // hyper picks the framing per protocol version; a hand-set + // transfer-encoding would be illegal on HTTP/2. + let resp = ndjson_response(futures::stream::empty::< + Result, + >()); + assert_eq!( + resp.headers().get("content-type").unwrap(), + "application/x-ndjson" + ); + assert!(resp.headers().get("transfer-encoding").is_none()); + } + #[test] fn stream_error_json_carries_grpc_code_name() { let status = tonic::Status::permission_denied("nope"); From 165ff624872bf3e7c3580824b95de3358607f256 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sun, 21 Jun 2026 14:42:22 +0300 Subject: [PATCH 5/8] fix(config): reject zero SSE keep-alive interval A zero sse_keep_alive_secs makes axum's SSE keep-alive timer fire continuously instead of as a periodic heartbeat. Validate it at config load and at router build (covering the embedded path) so the misconfiguration fails fast rather than producing a busy stream. --- src/config.rs | 31 ++++++++++++++++++++++++++++++- src/lib.rs | 3 +++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/src/config.rs b/src/config.rs index 9e36f8c..4c7edb6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -539,7 +539,21 @@ impl ProxyConfig { /// Useful for embedding the proxy: load a baked-in config (e.g. via /// `include_str!`) without touching the filesystem. pub fn from_yaml_str(yaml: &str) -> anyhow::Result { - Ok(serde_yaml::from_str(yaml)?) + let config: Self = serde_yaml::from_str(yaml)?; + config.validate()?; + Ok(config) + } + + /// Validate cross-field constraints that the type system can't express. + /// + /// Called automatically by [`from_yaml_str`](Self::from_yaml_str); call it + /// directly when building a [`ProxyConfig`] programmatically so the same + /// invariants are enforced on the embedded path. + pub fn validate(&self) -> anyhow::Result<()> { + if self.streaming.sse_keep_alive_secs == 0 { + anyhow::bail!("streaming.sse_keep_alive_secs must be greater than 0"); + } + Ok(()) } /// Parse rate string like "20/min" → requests per window. @@ -566,11 +580,26 @@ upstream: assert_eq!(config.upstream.default, "grpc://localhost:4180"); assert_eq!(config.listen.http, "0.0.0.0:8080"); assert_eq!(config.service.name, "structured-proxy"); + assert_eq!(config.streaming.sse_keep_alive_secs, 15); assert!(config.descriptors.is_empty()); assert!(config.auth.is_none()); assert!(config.shield.is_none()); } + #[test] + fn test_zero_sse_keep_alive_is_rejected() { + // A zero keep-alive would make axum's SSE timer fire continuously + // instead of acting as a periodic heartbeat — reject it at load time. + let yaml = r#" +upstream: + default: "grpc://localhost:4180" +streaming: + sse_keep_alive_secs: 0 +"#; + let err = ProxyConfig::from_yaml_str(yaml).unwrap_err(); + assert!(err.to_string().contains("sse_keep_alive_secs")); + } + #[test] fn test_full_config_deserialize() { let yaml = r#" diff --git a/src/lib.rs b/src/lib.rs index e125af3..04ede9a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -134,6 +134,9 @@ impl ProxyServer { /// Build the axum router with all endpoints. pub fn router(&self) -> anyhow::Result { + // Enforce cross-field invariants on the embedded path too, where the + // config is built directly instead of through `from_yaml_str`. + self.config.validate()?; let pool = self.load_descriptors()?; let grpc_upstream = self.config.upstream.default.clone(); From 99a53e2ab9995035853fe05ddc4d6c5a66f27e33 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sun, 21 Jun 2026 14:46:04 +0300 Subject: [PATCH 6/8] test(streaming): cover terminal error frames and SSE event name Assert that an error mid-stream is the last frame in both NDJSON and SSE (later messages dropped), and that SSE labels it event: stream-error rather than the reserved error type. Both fail until the stream stops after an error and the SSE event is renamed. --- src/transcode/mod.rs | 55 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 51 insertions(+), 4 deletions(-) diff --git a/src/transcode/mod.rs b/src/transcode/mod.rs index e3b3d1c..51025bd 100644 --- a/src/transcode/mod.rs +++ b/src/transcode/mod.rs @@ -978,9 +978,15 @@ mod tests { .route(&catch_all, get(|| async { "ok" })); } - /// Build a single-message descriptor pool with one `Item { name, count }` - /// message, used to exercise the streaming serialization helpers. + /// `Item { name: "alice", count: 42 }` — default fixture for the + /// serialization helpers. fn item_message() -> DynamicMessage { + item_message_named("alice", 42) + } + + /// Build an `Item { name, count }` message from a freshly-decoded + /// descriptor pool, used to exercise the streaming serialization helpers. + fn item_message_named(name: &str, count: i64) -> DynamicMessage { use prost_reflect::prost::Message; use prost_reflect::prost_types::{ field_descriptor_proto::{Label, Type}, @@ -1022,11 +1028,52 @@ mod tests { let desc = pool.get_message_by_name("test.v1.Item").unwrap(); let mut msg = DynamicMessage::new(desc); - msg.set_field_by_name("name", prost_reflect::Value::String("alice".to_string())); - msg.set_field_by_name("count", prost_reflect::Value::I64(42)); + msg.set_field_by_name("name", prost_reflect::Value::String(name.to_string())); + msg.set_field_by_name("count", prost_reflect::Value::I64(count)); msg } + /// Collect a streaming response body into a single UTF-8 string. + async fn collect_body(resp: Response) -> String { + let bytes = axum::body::to_bytes(resp.into_body(), usize::MAX) + .await + .unwrap(); + String::from_utf8(bytes.to_vec()).unwrap() + } + + #[tokio::test] + async fn ndjson_error_frame_is_terminal() { + // A gRPC error mid-stream must be the LAST frame: messages the upstream + // would yield after the error are dropped, so the error line is an + // unambiguous end-of-stream signal rather than a mid-stream marker. + let items = vec![ + Ok(item_message_named("alice", 1)), + Err(tonic::Status::internal("boom")), + Ok(item_message_named("bob", 2)), + ]; + let body = collect_body(ndjson_response(futures::stream::iter(items))).await; + let lines: Vec<&str> = body.lines().collect(); + assert_eq!(lines.len(), 2, "stream must stop after the error frame"); + assert!(lines[0].contains("alice")); + assert!(lines[1].contains("INTERNAL") && lines[1].contains("boom")); + assert!(!body.contains("bob"), "post-error message must be dropped"); + } + + #[tokio::test] + async fn sse_error_uses_distinct_event_name() { + // The terminal error is sent as `event: stream-error`, not the reserved + // `error` type that collides with the browser EventSource onerror. + let items = vec![ + Ok(item_message_named("alice", 1)), + Err(tonic::Status::permission_denied("nope")), + Ok(item_message_named("bob", 2)), + ]; + let body = collect_body(sse_response(futures::stream::iter(items), 15)).await; + assert!(body.contains("stream-error")); + assert!(body.contains("PERMISSION_DENIED")); + assert!(!body.contains("bob"), "post-error message must be dropped"); + } + #[test] fn wants_sse_detects_event_stream_accept() { let mut headers = HeaderMap::new(); From b310175c06b424c767ded9c672542b1d04f2f827 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sun, 21 Jun 2026 14:46:58 +0300 Subject: [PATCH 7/8] fix(streaming): make error frames terminal and rename SSE error event - Stop the stream after the first error so an error frame is always the last thing a client sees; a serialization failure and a gRPC status now share terminal semantics instead of the former silently continuing past a serialization-error frame - Emit the terminal SSE error as event: stream-error rather than the reserved error type, which the browser EventSource dispatches only for transport failures and would never deliver to a custom handler --- README.md | 2 +- src/transcode/mod.rs | 86 ++++++++++++++++++++++++++++++-------------- 2 files changed, 60 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 9140dd5..d189ab5 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Works with **any** gRPC service via proto descriptor files. No code generation, - **Full request mapping**: path params, query parameters (typed + repeated + nested), and `body` (`*` / named field / none) - **`response_body`** to return a single response subfield, and **`additional_bindings`** for multiple routes per RPC - **Auto-generated OpenAPI** documentation from proto messages, served at `/openapi.json` -- **Server-streaming** RPC → NDJSON by default, or Server-Sent Events when the client sends `Accept: text/event-stream` (configurable keep-alive); a gRPC error mid-stream is delivered as an explicit terminal frame in both formats +- **Server-streaming** RPC → NDJSON by default, or Server-Sent Events when the client sends `Accept: text/event-stream` (configurable keep-alive); an error mid-stream is delivered as an explicit terminal frame in both formats (SSE uses the `stream-error` event type, consumed via `addEventListener("stream-error", ...)` — distinct from the browser `EventSource` `onerror`, which fires only on transport failures) - **gRPC → HTTP status mapping** following the standard `google.rpc.Code` table - **Header forwarding** from HTTP requests to gRPC metadata (configurable allow-list) - **Context propagation**: W3C trace-context (`traceparent` forwarded or synthesized) and client deadlines (`grpc-timeout`) carried across the REST↔gRPC boundary diff --git a/src/transcode/mod.rs b/src/transcode/mod.rs index 51025bd..3545dd6 100644 --- a/src/transcode/mod.rs +++ b/src/transcode/mod.rs @@ -297,23 +297,64 @@ async fn streaming_handler( } } -/// Build an NDJSON (`application/x-ndjson`) streaming response. -fn ndjson_response(stream: St) -> Response +/// One JSON frame of a streaming response, already serialized. +/// +/// `Error` is terminal: [`json_frames`] stops the stream right after yielding +/// it, so an error frame is always the last thing a client sees regardless of +/// whether it came from a gRPC status or a serialization failure. +enum StreamFrame { + Data(String), + Error(String), +} + +/// Turn a gRPC message stream into a stream of serialized JSON frames, stopping +/// after the first error so error frames are unambiguously terminal. +/// +/// Both a gRPC `Status` and a per-message serialization failure become a +/// terminal [`StreamFrame::Error`]; downstream messages the upstream might +/// still emit are dropped rather than streamed past the error. +fn json_frames(stream: St) -> impl futures::Stream + Send + 'static where St: futures::Stream> + Send + 'static, { let opts = response_serialize_options(); - let byte_stream = stream.map(move |result| { - let mut line = match result { + stream.scan(false, move |stopped, result| { + if *stopped { + return futures::future::ready(None); + } + let frame = match result { Ok(msg) => match message_to_json_string(&msg, &opts) { - Ok(s) => s, - Err(e) => serde_json::json!({ - "error": "INTERNAL", - "message": format!("serialization error: {e}"), - }) - .to_string(), + Ok(s) => StreamFrame::Data(s), + Err(e) => { + *stopped = true; + StreamFrame::Error( + serde_json::json!({ + "error": "INTERNAL", + "message": format!("serialization error: {e}"), + }) + .to_string(), + ) + } }, - Err(status) => stream_error_json(&status).to_string(), + Err(status) => { + *stopped = true; + StreamFrame::Error(stream_error_json(&status).to_string()) + } + }; + futures::future::ready(Some(frame)) + }) +} + +/// Build an NDJSON (`application/x-ndjson`) streaming response. +fn ndjson_response(stream: St) -> Response +where + St: futures::Stream> + Send + 'static, +{ + // Data and error frames are both JSON lines; an error is distinguished by + // its `error` field and by being the final line (see `json_frames`). + let byte_stream = json_frames(stream).map(|frame| { + let mut line = match frame { + StreamFrame::Data(s) | StreamFrame::Error(s) => s, }; line.push('\n'); Ok::(axum::body::Bytes::from(line)) @@ -335,22 +376,13 @@ fn sse_response(stream: St, keep_alive_secs: u64) -> Response where St: futures::Stream> + Send + 'static, { - let opts = response_serialize_options(); - let event_stream = stream.map(move |result| { - let event = match result { - Ok(msg) => match message_to_json_string(&msg, &opts) { - Ok(s) => Event::default().data(s), - Err(e) => Event::default().event("error").data( - serde_json::json!({ - "error": "INTERNAL", - "message": format!("serialization error: {e}"), - }) - .to_string(), - ), - }, - Err(status) => Event::default() - .event("error") - .data(stream_error_json(&status).to_string()), + // Terminal errors use the `stream-error` event type, not the reserved + // `error` type that the browser EventSource dispatches for transport + // failures — clients listen for it via addEventListener("stream-error"). + let event_stream = json_frames(stream).map(|frame| { + let event = match frame { + StreamFrame::Data(s) => Event::default().data(s), + StreamFrame::Error(s) => Event::default().event("stream-error").data(s), }; Ok::(event) }); From f8285f02dec70f7dccdc678c07c31d7e395f69b0 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sun, 21 Jun 2026 15:04:21 +0300 Subject: [PATCH 8/8] docs(readme): trim streaming feature bullet to a headline Move the terminal-error-frame and SSE stream-error/onerror detail from the dense feature bullet into the streaming config section, leaving the features list scannable. --- README.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index d189ab5..00fa239 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Works with **any** gRPC service via proto descriptor files. No code generation, - **Full request mapping**: path params, query parameters (typed + repeated + nested), and `body` (`*` / named field / none) - **`response_body`** to return a single response subfield, and **`additional_bindings`** for multiple routes per RPC - **Auto-generated OpenAPI** documentation from proto messages, served at `/openapi.json` -- **Server-streaming** RPC → NDJSON by default, or Server-Sent Events when the client sends `Accept: text/event-stream` (configurable keep-alive); an error mid-stream is delivered as an explicit terminal frame in both formats (SSE uses the `stream-error` event type, consumed via `addEventListener("stream-error", ...)` — distinct from the browser `EventSource` `onerror`, which fires only on transport failures) +- **Server-streaming** RPC → NDJSON by default, or Server-Sent Events via `Accept: text/event-stream` negotiation - **gRPC → HTTP status mapping** following the standard `google.rpc.Code` table - **Header forwarding** from HTTP requests to gRPC metadata (configurable allow-list) - **Context propagation**: W3C trace-context (`traceparent` forwarded or synthesized) and client deadlines (`grpc-timeout`) carried across the REST↔gRPC boundary @@ -83,7 +83,11 @@ maintenance: # Optional: server-streaming response behavior. # Streaming RPCs return NDJSON by default; clients sending -# `Accept: text/event-stream` get Server-Sent Events instead. +# `Accept: text/event-stream` get Server-Sent Events instead. An error +# mid-stream is delivered as an explicit terminal frame in both formats. For +# SSE it uses the `stream-error` event type (consumed via +# `addEventListener("stream-error", ...)`), distinct from the browser +# `EventSource` `onerror`, which fires only on transport failures. streaming: # SSE keep-alive interval (seconds). Comment frames keep idle streams alive # through load balancers / nginx read timeouts. Default: 15.