Skip to content
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Works with **any** gRPC service via proto descriptor files. No code generation,
- **Full request mapping**: path params, query parameters (typed + repeated + nested), and `body` (`*` / named field / none)
- **`response_body`** to return a single response subfield, and **`additional_bindings`** for multiple routes per RPC
- **Auto-generated OpenAPI** documentation from proto messages, served at `/openapi.json`
- **Server-streaming** RPC → chunked HTTP responses
- **Server-streaming** RPC → NDJSON by default, or Server-Sent Events via `Accept: text/event-stream` negotiation
- **gRPC → HTTP status mapping** following the standard `google.rpc.Code` table
- **Header forwarding** from HTTP requests to gRPC metadata (configurable allow-list)
- **Context propagation**: W3C trace-context (`traceparent` forwarded or synthesized) and client deadlines (`grpc-timeout`) carried across the REST↔gRPC boundary
Expand Down Expand Up @@ -81,6 +81,18 @@ maintenance:
enabled: false
message: "Service is under maintenance. Please try again later."

# Optional: server-streaming response behavior.
# Streaming RPCs return NDJSON by default; clients sending
# `Accept: text/event-stream` get Server-Sent Events instead. An error
# mid-stream is delivered as an explicit terminal frame in both formats. For
# SSE it uses the `stream-error` event type (consumed via
# `addEventListener("stream-error", ...)`), distinct from the browser
# `EventSource` `onerror`, which fires only on transport failures.
streaming:
# SSE keep-alive interval (seconds). Comment frames keep idle streams alive
# through load balancers / nginx read timeouts. Default: 15.
sse_keep_alive_secs: 15

# Rate limiting (Shield)
shield:
enabled: true
Expand Down
61 changes: 60 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ pub struct ProxyConfig {
/// Headers to forward from HTTP to gRPC metadata.
#[serde(default = "default_forwarded_headers")]
pub forwarded_headers: Vec<String>,

/// Server-streaming response behavior.
#[serde(default)]
pub streaming: StreamingConfig,
}

fn default_forwarded_headers() -> Vec<String> {
Expand All @@ -85,6 +89,32 @@ fn default_forwarded_headers() -> Vec<String> {
]
}

/// Server-streaming response behavior.
///
/// Server-streaming RPCs are exposed as NDJSON by default and as Server-Sent
/// Events when the client sends `Accept: text/event-stream`. The keep-alive
/// interval applies only to the SSE path.
#[derive(Debug, Clone, Deserialize)]
pub struct StreamingConfig {
Comment thread
polaz marked this conversation as resolved.
/// SSE keep-alive interval in seconds. Comment frames are emitted on idle
/// streams to keep intermediaries (load balancers, nginx) from closing the
/// connection on read timeout. Default: 15.
#[serde(default = "default_sse_keep_alive_secs")]
pub sse_keep_alive_secs: u64,
}

fn default_sse_keep_alive_secs() -> u64 {
15
}

impl Default for StreamingConfig {
fn default() -> Self {
Self {
sse_keep_alive_secs: default_sse_keep_alive_secs(),
}
}
}

/// Upstream gRPC service configuration.
#[derive(Debug, Clone, Deserialize)]
pub struct UpstreamConfig {
Expand Down Expand Up @@ -509,7 +539,21 @@ impl ProxyConfig {
/// Useful for embedding the proxy: load a baked-in config (e.g. via
/// `include_str!`) without touching the filesystem.
pub fn from_yaml_str(yaml: &str) -> anyhow::Result<Self> {
Ok(serde_yaml::from_str(yaml)?)
let config: Self = serde_yaml::from_str(yaml)?;
config.validate()?;
Ok(config)
}

/// Validate cross-field constraints that the type system can't express.
///
/// Called automatically by [`from_yaml_str`](Self::from_yaml_str); call it
/// directly when building a [`ProxyConfig`] programmatically so the same
/// invariants are enforced on the embedded path.
pub fn validate(&self) -> anyhow::Result<()> {
if self.streaming.sse_keep_alive_secs == 0 {
anyhow::bail!("streaming.sse_keep_alive_secs must be greater than 0");
}
Ok(())
}

/// Parse rate string like "20/min" → requests per window.
Expand All @@ -536,11 +580,26 @@ upstream:
assert_eq!(config.upstream.default, "grpc://localhost:4180");
assert_eq!(config.listen.http, "0.0.0.0:8080");
assert_eq!(config.service.name, "structured-proxy");
assert_eq!(config.streaming.sse_keep_alive_secs, 15);
assert!(config.descriptors.is_empty());
assert!(config.auth.is_none());
assert!(config.shield.is_none());
}

#[test]
fn test_zero_sse_keep_alive_is_rejected() {
// A zero keep-alive would make axum's SSE timer fire continuously
// instead of acting as a periodic heartbeat — reject it at load time.
let yaml = r#"
upstream:
default: "grpc://localhost:4180"
streaming:
sse_keep_alive_secs: 0
"#;
let err = ProxyConfig::from_yaml_str(yaml).unwrap_err();
assert!(err.to_string().contains("sse_keep_alive_secs"));
}

#[test]
fn test_full_config_deserialize() {
let yaml = r#"
Expand Down
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ pub struct ProxyState {
pub metrics_namespace: String,
/// Path class patterns for metrics.
pub metrics_classes: Vec<config::MetricsClassConfig>,
/// SSE keep-alive interval (seconds) for server-streaming responses.
pub sse_keep_alive_secs: u64,
}

/// Universal proxy server.
Expand Down Expand Up @@ -132,6 +134,9 @@ impl ProxyServer {

/// Build the axum router with all endpoints.
pub fn router(&self) -> anyhow::Result<Router> {
// Enforce cross-field invariants on the embedded path too, where the
// config is built directly instead of through `from_yaml_str`.
self.config.validate()?;
let pool = self.load_descriptors()?;

let grpc_upstream = self.config.upstream.default.clone();
Expand All @@ -154,6 +159,7 @@ impl ProxyServer {
forwarded_headers: self.config.forwarded_headers.clone(),
metrics_namespace,
metrics_classes: self.config.metrics_classes.clone(),
sse_keep_alive_secs: self.config.streaming.sse_keep_alive_secs,
};

let cors = self.build_cors();
Expand Down Expand Up @@ -461,6 +467,7 @@ upstream:
forwarded_headers: vec![],
metrics_namespace: "test".into(),
metrics_classes: vec![],
sse_keep_alive_secs: 15,
};

let check = |path: &str| -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/transcode/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn status_to_response(status: tonic::Status) -> Response {
}

/// Human-readable gRPC code name for JSON error responses.
fn grpc_code_name(code: tonic::Code) -> &'static str {
pub(crate) fn grpc_code_name(code: tonic::Code) -> &'static str {
match code {
tonic::Code::Ok => "OK",
tonic::Code::Cancelled => "CANCELLED",
Expand Down
Loading