feat(streaming): expose server-streaming RPCs as SSE via Accept negotiation#52
feat(streaming): expose server-streaming RPCs as SSE via Accept negotiation#52polaz wants to merge 8 commits into
Conversation
- Negotiate Server-Sent Events via `Accept: text/event-stream`; NDJSON stays the default, so existing clients are unaffected - Frame SSE through axum's Sse/Event with a configurable keep-alive (streaming.sse_keep_alive_secs, default 15s) to survive LB/nginx idle read timeouts - Deliver a gRPC error mid-stream as an explicit terminal frame (event: error / final JSON line) instead of truncating the body - Share one SerializeOptions constructor between unary and streaming so a message serializes identically regardless of RPC kind Closes #51
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Plus Run ID: 📒 Files selected for processing (1)
📝 WalkthroughSummary by CodeRabbit
WalkthroughAdds Server-Sent Events output for server-streaming gRPC-to-REST transcoding via ChangesSSE streaming via Accept negotiation
Sequence Diagram(s)sequenceDiagram
participant Client
participant streaming_handler
participant wants_sse
participant gRPC upstream
participant sse_response/ndjson_response
Client->>streaming_handler: HTTP request (Accept header)
streaming_handler->>wants_sse: inspect Accept header
wants_sse-->>streaming_handler: true / false
streaming_handler->>gRPC upstream: forward streaming RPC
loop per message
gRPC upstream-->>streaming_handler: streamed message
streaming_handler->>sse_response/ndjson_response: serialize via message_to_json_string
sse_response/ndjson_response-->>Client: data event (SSE) or JSON line (NDJSON)
end
alt gRPC error mid-stream
gRPC upstream-->>streaming_handler: Status error
streaming_handler->>sse_response/ndjson_response: stream_error_json terminal frame
sse_response/ndjson_response-->>Client: event: stream-error (SSE) or error JSON line (NDJSON)
end
streaming_handler-->>Client: clean stream close
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/lib.rs (1)
149-159:⚠️ Potential issue | 🟠 MajorReject a zero SSE keep-alive interval before building state.
streaming.sse_keep_alive_secs: 0currently flows into the SSE keep-alive timer without validation. With a zero duration, axum's keep-alive mechanism would trigger immediately and repeatedly during idle periods, creating excessive keep-alive messages instead of the intended periodic heartbeat. Fail fast duringrouter()construction by validating thatsse_keep_alive_secs > 0before buildingProxyState, rather than allowing the misconfiguration to surface as a busy/unusable stream at runtime.Proposed fix
let service_name = self.config.service.name.clone(); let metrics_namespace = service_name.replace('-', "_"); + let sse_keep_alive_secs = self.config.streaming.sse_keep_alive_secs; + if sse_keep_alive_secs == 0 { + return Err(anyhow::anyhow!( + "streaming.sse_keep_alive_secs must be greater than 0" + )); + } let state = ProxyState { service_name: service_name.clone(), grpc_upstream, grpc_channel, @@ forwarded_headers: self.config.forwarded_headers.clone(), metrics_namespace, metrics_classes: self.config.metrics_classes.clone(), - sse_keep_alive_secs: self.config.streaming.sse_keep_alive_secs, + sse_keep_alive_secs, };🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/lib.rs` around lines 149 - 159, Add validation in the router() method to reject zero values for sse_keep_alive_secs before constructing ProxyState. Before the ProxyState initialization block that assigns sse_keep_alive_secs from self.config.streaming.sse_keep_alive_secs, add a check that ensures the value is greater than zero and return an error if it is not, preventing the zero duration from being passed to the SSE keep-alive timer mechanism.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/transcode/mod.rs`:
- Around line 212-222: The `wants_sse` function currently uses HeaderMap::get()
which only reads the first Accept header value and does not parse quality
factors. Update the function to use HeaderMap::get_all() to handle multiple
Accept headers and parse the quality factor (q parameter) from each media type,
rejecting the request when q is 0 or less (as per RFC 7231). Additionally, add
regression tests to verify that repeated Accept headers are properly handled and
that text/event-stream with q=0 is correctly rejected.
- Around line 304-308: The Response::builder() method chain in the streaming
response handler is manually setting the transfer-encoding header to chunked,
which is unnecessary and violates HTTP/2 specifications. Axum and Hyper
automatically handle the appropriate body framing for streaming responses, so
remove the .header("transfer-encoding", "chunked") line from the
Response::builder() chain. This will allow Hyper to manage the correct framing
behavior based on the HTTP protocol version being used.
---
Outside diff comments:
In `@src/lib.rs`:
- Around line 149-159: Add validation in the router() method to reject zero
values for sse_keep_alive_secs before constructing ProxyState. Before the
ProxyState initialization block that assigns sse_keep_alive_secs from
self.config.streaming.sse_keep_alive_secs, add a check that ensures the value is
greater than zero and return an error if it is not, preventing the zero duration
from being passed to the SSE keep-alive timer mechanism.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 1482efca-3932-491c-b8b6-6ea12e09389e
📒 Files selected for processing (6)
README.mdsrc/config.rssrc/lib.rssrc/transcode/error.rssrc/transcode/mod.rstests/embedded.rs
Cover RFC 7231 q=0 rejection and multi-line Accept headers for the SSE content negotiation. Both fail against the current single-header, q-unaware wants_sse and pass once it parses get_all() + quality factors.
wants_sse now scans every Accept header line via get_all() instead of only the first, and respects the RFC 7231 quality factor so text/event-stream;q=0 no longer selects the SSE path.
|
| Filename | Overview |
|---|---|
| src/transcode/mod.rs | Core of the PR: adds wants_sse/accept_range_selects_sse for content negotiation, json_frames scan-based stream that makes error frames terminal, and ndjson_response/sse_response builders. Logic is sound; one minor inconsistency: serialization error frames omit the code field that gRPC-status error frames carry. |
| src/config.rs | Adds StreamingConfig with sse_keep_alive_secs (default 15), validate() method that rejects a zero keep-alive, and a test covering that rejection. Clean implementation. |
| src/lib.rs | Threads sse_keep_alive_secs into ProxyState and calls config.validate() at the start of router() to enforce invariants on the embedded (programmatic) path. |
| src/transcode/error.rs | Widens grpc_code_name visibility to pub(crate) so stream_error_json in mod.rs can reuse it — minimal, correct change. |
| tests/embedded.rs | Adds streaming: Default::default() to the manually-constructed ProxyConfig so the embedded path compiles and passes validate(). |
| README.md | Documents SSE content negotiation, stream-error event name distinction, and sse_keep_alive_secs config knob. Accurate and complete. |
Reviews (3): Last reviewed commit: "docs(readme): trim streaming feature bul..." | Re-trigger Greptile
Drop the hand-set transfer-encoding: chunked header. hyper selects chunked on HTTP/1.1 and DATA frames on HTTP/2 from the protocol version; a manual transfer-encoding is redundant on h1 and illegal on h2.
A zero sse_keep_alive_secs makes axum's SSE keep-alive timer fire continuously instead of as a periodic heartbeat. Validate it at config load and at router build (covering the embedded path) so the misconfiguration fails fast rather than producing a busy stream.
Assert that an error mid-stream is the last frame in both NDJSON and SSE (later messages dropped), and that SSE labels it event: stream-error rather than the reserved error type. Both fail until the stream stops after an error and the SSE event is renamed.
- Stop the stream after the first error so an error frame is always the last thing a client sees; a serialization failure and a gRPC status now share terminal semantics instead of the former silently continuing past a serialization-error frame - Emit the terminal SSE error as event: stream-error rather than the reserved error type, which the browser EventSource dispatches only for transport failures and would never deliver to a custom handler
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@README.md`:
- Line 19: Simplify the Server-streaming RPC feature bullet at line 19 to a
brief, scannable headline such as "Server-streaming RPC with NDJSON or
Server-Sent Events" instead of the current ~270-character description that
bundles negotiation semantics, configuration options, error handling, and API
distinctions together. The detailed explanation of keep-alive configuration,
error frame delivery, and SSE event handling already exists in the configuration
section at lines 84-91, so those technical details can safely be omitted from
the feature list to improve readability.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 6c0eac09-c858-4f0d-b5b5-cda0ea311afa
📒 Files selected for processing (4)
README.mdsrc/config.rssrc/lib.rssrc/transcode/mod.rs
Move the terminal-error-frame and SSE stream-error/onerror detail from the dense feature bullet into the streaming config section, leaving the features list scannable.
Summary
Server-streaming RPCs were exposed only as NDJSON (
application/x-ndjson), which the browserEventSourceAPI cannot consume, and a gRPC error mid-stream truncated the HTTP body with no explicit signal. This adds Server-Sent Events output via content negotiation, keeping NDJSON as the backward-compatible default.Acceptnegotiation:Accept: text/event-stream→ SSE; anything else → current NDJSON (no behavior change for existing clients).axum::response::sse(Sse/Event/KeepAlive), not hand-rolleddata:strings.streaming.sse_keep_alive_secs, default 15s) so idle streams survive LB / nginx read timeouts.Statuserror mid-stream is emitted as an explicit final element in both formats (event: errorfor SSE, a final{"error","message","code"}line for NDJSON), then the stream closes cleanly instead of truncating.SerializeOptions: unary and streaming now use one constructor, so a message serializes identically regardless of RPC kind.Testing
cargo nextest run→ 126/126 passing (5 new unit tests:Acceptdecision in 3 shapes, SSE/NDJSON single-message serialize incl. 64-bit stringification, error-frame format).cargo clippy --all-targetsclean;cargo doc --no-depsclean for the new code.EventSourcenot exercised.Notes (out of scope)
DynamicMessage::new) — path/query binding for streaming RPCs is a separate concern.204 No Contentfor empty unary responses (e.g. DELETE →google.protobuf.Empty) intentionally left untouched to avoid changing unary behavior here.Closes #51