Skip to content

feat(streaming): expose server-streaming RPCs as SSE via Accept negotiation#52

Open
polaz wants to merge 8 commits into
mainfrom
feat/#51-sse-streaming
Open

feat(streaming): expose server-streaming RPCs as SSE via Accept negotiation#52
polaz wants to merge 8 commits into
mainfrom
feat/#51-sse-streaming

Conversation

@polaz

@polaz polaz commented Jun 21, 2026

Copy link
Copy Markdown
Member

Summary

Server-streaming RPCs were exposed only as NDJSON (application/x-ndjson), which the browser EventSource API cannot consume, and a gRPC error mid-stream truncated the HTTP body with no explicit signal. This adds Server-Sent Events output via content negotiation, keeping NDJSON as the backward-compatible default.

  • Accept negotiation: Accept: text/event-stream → SSE; anything else → current NDJSON (no behavior change for existing clients).
  • SSE framing via axum::response::sse (Sse / Event / KeepAlive), not hand-rolled data: strings.
  • Keep-alive for SSE, interval configurable (streaming.sse_keep_alive_secs, default 15s) so idle streams survive LB / nginx read timeouts.
  • Terminal error frame: a gRPC Status error mid-stream is emitted as an explicit final element in both formats (event: error for SSE, a final {"error","message","code"} line for NDJSON), then the stream closes cleanly instead of truncating.
  • Shared SerializeOptions: unary and streaming now use one constructor, so a message serializes identically regardless of RPC kind.

Testing

  • cargo nextest run → 126/126 passing (5 new unit tests: Accept decision in 3 shapes, SSE/NDJSON single-message serialize incl. 64-bit stringification, error-frame format).
  • cargo clippy --all-targets clean; cargo doc --no-deps clean for the new code.
  • Not verified live: full SSE round-trip against a running gRPC upstream (no streaming mock with a real upstream in-repo); helpers are unit-covered, end-to-end EventSource not exercised.

Notes (out of scope)

  • Streaming handler still sends an empty upstream request (DynamicMessage::new) — path/query binding for streaming RPCs is a separate concern.
  • 204 No Content for empty unary responses (e.g. DELETE → google.protobuf.Empty) intentionally left untouched to avoid changing unary behavior here.

Closes #51

- Negotiate Server-Sent Events via `Accept: text/event-stream`; NDJSON
  stays the default, so existing clients are unaffected
- Frame SSE through axum's Sse/Event with a configurable keep-alive
  (streaming.sse_keep_alive_secs, default 15s) to survive LB/nginx
  idle read timeouts
- Deliver a gRPC error mid-stream as an explicit terminal frame
  (event: error / final JSON line) instead of truncating the body
- Share one SerializeOptions constructor between unary and streaming
  so a message serializes identically regardless of RPC kind

Closes #51
@coderabbitai

coderabbitai Bot commented Jun 21, 2026

Copy link
Copy Markdown

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro Plus

Run ID: cc7a1643-84e8-4fe6-834c-16583d40b846

📥 Commits

Reviewing files that changed from the base of the PR and between b310175 and f8285f0.

📒 Files selected for processing (1)
  • README.md

📝 Walkthrough

Summary by CodeRabbit

  • New Features
    • Server-streaming responses now support Server-Sent Events (SSE) when clients send Accept: text/event-stream; defaults to NDJSON otherwise.
    • Mid-stream failures are delivered as an explicit terminal error event/frame in the streaming response.
    • Added configurable SSE keep-alive interval via streaming.sse_keep_alive_secs (default: 15 seconds).
  • Configuration
    • Configuration validation rejects streaming.sse_keep_alive_secs set to 0.
  • Documentation
    • Updated README with NDJSON vs SSE behavior and keep-alive details.
  • Tests
    • Extended tests for defaults, validation, SSE header selection, and streaming error payload shape.

Walkthrough

Adds Server-Sent Events output for server-streaming gRPC-to-REST transcoding via Accept: text/event-stream negotiation, keeping NDJSON as the default. Introduces StreamingConfig with a configurable SSE keep-alive interval, validates the configuration, wires it through ProxyState and TranscodeState, and emits explicit terminal error frames on mid-stream gRPC failures for both formats.

Changes

SSE streaming via Accept negotiation

Layer / File(s) Summary
StreamingConfig, validation, and state propagation
src/config.rs, src/lib.rs, src/transcode/mod.rs
StreamingConfig struct with sse_keep_alive_secs: u64 (default 15) is added to ProxyConfig; validate() method rejects zero values; the value is propagated through ProxyServer into ProxyState and exposed via the new TranscodeState::sse_keep_alive_secs() trait method.
Shared JSON helpers, wants_sse, and helper visibility
src/transcode/mod.rs, src/transcode/error.rs
Adds response_serialize_options, message_to_json_string, stream_error_json helpers for consistent unary/streaming JSON serialization; introduces wants_sse to inspect the Accept header and decide between SSE and NDJSON; widens grpc_code_name to pub(crate) for error payload use; adds axum::response::sse imports.
Server-streaming handler branching and response rendering
src/transcode/mod.rs
streaming_handler now branches on wants_sse into dedicated ndjson_response and sse_response paths; adds StreamFrame model and json_frames transformer to stop after the first gRPC error or serialization failure; emits terminal error frames as event: stream-error (SSE) or final JSON line (NDJSON); configures SSE keep-alive interval from TranscodeState.
Unary response shared options and test fixture updates
src/transcode/mod.rs, src/lib.rs
Unary response path adopts response_serialize_options instead of constructing SerializeOptions inline; test_maintenance_exempt_matching updates its ProxyState literal to include the sse_keep_alive_secs field.
Unit tests, embedded config, and README docs
src/transcode/mod.rs, tests/embedded.rs, README.md
New unit tests cover wants_sse matching rules, message_to_json_string int64 stringification, and stream_error_json payload shape; embedded_config_is_constructible updated to include the streaming field; README features list and configuration example document SSE/NDJSON negotiation and the keep-alive knob.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant streaming_handler
  participant wants_sse
  participant gRPC upstream
  participant sse_response/ndjson_response

  Client->>streaming_handler: HTTP request (Accept header)
  streaming_handler->>wants_sse: inspect Accept header
  wants_sse-->>streaming_handler: true / false
  streaming_handler->>gRPC upstream: forward streaming RPC
  loop per message
    gRPC upstream-->>streaming_handler: streamed message
    streaming_handler->>sse_response/ndjson_response: serialize via message_to_json_string
    sse_response/ndjson_response-->>Client: data event (SSE) or JSON line (NDJSON)
  end
  alt gRPC error mid-stream
    gRPC upstream-->>streaming_handler: Status error
    streaming_handler->>sse_response/ndjson_response: stream_error_json terminal frame
    sse_response/ndjson_response-->>Client: event: stream-error (SSE) or error JSON line (NDJSON)
  end
  streaming_handler-->>Client: clean stream close
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • structured-world/structured-proxy#46: Both PRs modify src/config.rs around ProxyConfig::from_yaml_str and update tests/embedded.rs to keep embedded ProxyConfig construction valid after config struct changes.
  • structured-world/structured-proxy#44: Both PRs modify src/config.rs's ProxyConfig declaration; this PR adds the streaming field and validate() method while that PR adds #[non_exhaustive].
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: exposing server-streaming RPCs as SSE via Accept negotiation, which is the primary feature added in this PR.
Description check ✅ Passed The description thoroughly explains the problem, solution, and implementation details related to the changeset, covering SSE support, error handling, and configuration.
Linked Issues check ✅ Passed All code changes meet the primary objectives from #51: Accept negotiation for SSE, configurable keep-alive, terminal error frames, shared SerializeOptions, and comprehensive unit test coverage with README documentation.
Out of Scope Changes check ✅ Passed All changes directly support the PR objectives; visibility change to grpc_code_name is narrowly scoped to enable error serialization, and no unrelated modifications are present.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/#51-sse-streaming

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/lib.rs (1)

149-159: ⚠️ Potential issue | 🟠 Major

Reject a zero SSE keep-alive interval before building state.

streaming.sse_keep_alive_secs: 0 currently flows into the SSE keep-alive timer without validation. With a zero duration, axum's keep-alive mechanism would trigger immediately and repeatedly during idle periods, creating excessive keep-alive messages instead of the intended periodic heartbeat. Fail fast during router() construction by validating that sse_keep_alive_secs > 0 before building ProxyState, rather than allowing the misconfiguration to surface as a busy/unusable stream at runtime.

Proposed fix
         let service_name = self.config.service.name.clone();
         let metrics_namespace = service_name.replace('-', "_");
+        let sse_keep_alive_secs = self.config.streaming.sse_keep_alive_secs;
+        if sse_keep_alive_secs == 0 {
+            return Err(anyhow::anyhow!(
+                "streaming.sse_keep_alive_secs must be greater than 0"
+            ));
+        }
 
         let state = ProxyState {
             service_name: service_name.clone(),
             grpc_upstream,
             grpc_channel,
@@
             forwarded_headers: self.config.forwarded_headers.clone(),
             metrics_namespace,
             metrics_classes: self.config.metrics_classes.clone(),
-            sse_keep_alive_secs: self.config.streaming.sse_keep_alive_secs,
+            sse_keep_alive_secs,
         };
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/lib.rs` around lines 149 - 159, Add validation in the router() method to
reject zero values for sse_keep_alive_secs before constructing ProxyState.
Before the ProxyState initialization block that assigns sse_keep_alive_secs from
self.config.streaming.sse_keep_alive_secs, add a check that ensures the value is
greater than zero and return an error if it is not, preventing the zero duration
from being passed to the SSE keep-alive timer mechanism.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/transcode/mod.rs`:
- Around line 212-222: The `wants_sse` function currently uses HeaderMap::get()
which only reads the first Accept header value and does not parse quality
factors. Update the function to use HeaderMap::get_all() to handle multiple
Accept headers and parse the quality factor (q parameter) from each media type,
rejecting the request when q is 0 or less (as per RFC 7231). Additionally, add
regression tests to verify that repeated Accept headers are properly handled and
that text/event-stream with q=0 is correctly rejected.
- Around line 304-308: The Response::builder() method chain in the streaming
response handler is manually setting the transfer-encoding header to chunked,
which is unnecessary and violates HTTP/2 specifications. Axum and Hyper
automatically handle the appropriate body framing for streaming responses, so
remove the .header("transfer-encoding", "chunked") line from the
Response::builder() chain. This will allow Hyper to manage the correct framing
behavior based on the HTTP protocol version being used.

---

Outside diff comments:
In `@src/lib.rs`:
- Around line 149-159: Add validation in the router() method to reject zero
values for sse_keep_alive_secs before constructing ProxyState. Before the
ProxyState initialization block that assigns sse_keep_alive_secs from
self.config.streaming.sse_keep_alive_secs, add a check that ensures the value is
greater than zero and return an error if it is not, preventing the zero duration
from being passed to the SSE keep-alive timer mechanism.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro Plus

Run ID: 1482efca-3932-491c-b8b6-6ea12e09389e

📥 Commits

Reviewing files that changed from the base of the PR and between 78d8ab2 and 853dff3.

📒 Files selected for processing (6)
  • README.md
  • src/config.rs
  • src/lib.rs
  • src/transcode/error.rs
  • src/transcode/mod.rs
  • tests/embedded.rs

Comment thread src/transcode/mod.rs Outdated
Comment thread src/transcode/mod.rs
polaz added 2 commits June 21, 2026 14:40
Cover RFC 7231 q=0 rejection and multi-line Accept headers for the
SSE content negotiation. Both fail against the current single-header,
q-unaware wants_sse and pass once it parses get_all() + quality factors.
wants_sse now scans every Accept header line via get_all() instead of
only the first, and respects the RFC 7231 quality factor so
text/event-stream;q=0 no longer selects the SSE path.
@greptile-apps

greptile-apps Bot commented Jun 21, 2026

Copy link
Copy Markdown

Greptile Summary

This PR adds Accept: text/event-stream content negotiation to server-streaming RPCs, enabling browser EventSource consumption alongside the existing NDJSON default. It also addresses three pre-existing deficiencies: gRPC errors mid-stream now emit an explicit terminal frame instead of truncating the body, serialization errors are similarly terminal (via a scan-based adapter that drops subsequent upstream messages), and SSE uses the stream-error event type to avoid colliding with the browser EventSource onerror transport callback.

  • Content negotiation: wants_sse inspects all Accept header lines, splits on ,, and honors q=0 as "not acceptable" per RFC 7231 §5.3.1.
  • Terminal error frames: json_frames uses futures::StreamExt::scan with a stopped flag so the first error (gRPC status or serialization failure) is yielded and the stream then terminates; downstream messages are dropped.
  • Config guard: StreamingConfig.sse_keep_alive_secs = 0 is rejected at load time via ProxyConfig::validate(), called both from from_yaml_str and from router() for the embedded path.

Confidence Score: 5/5

Safe to merge; the streaming and config changes are well-contained and the previous review concerns were properly addressed.

The Accept negotiation logic, the scan-based error-termination, the keep-alive guard, and the SSE event-name fix are all implemented correctly and covered by unit tests. The only note is that serialization error frames omit the code integer field included in gRPC-status error frames, creating a minor structural inconsistency in the terminal frame shape, but this does not affect correctness of the stream protocol.

src/transcode/mod.rs — specifically the inline serialization error JSON in json_frames, which produces a slightly different shape than stream_error_json.

Important Files Changed

Filename Overview
src/transcode/mod.rs Core of the PR: adds wants_sse/accept_range_selects_sse for content negotiation, json_frames scan-based stream that makes error frames terminal, and ndjson_response/sse_response builders. Logic is sound; one minor inconsistency: serialization error frames omit the code field that gRPC-status error frames carry.
src/config.rs Adds StreamingConfig with sse_keep_alive_secs (default 15), validate() method that rejects a zero keep-alive, and a test covering that rejection. Clean implementation.
src/lib.rs Threads sse_keep_alive_secs into ProxyState and calls config.validate() at the start of router() to enforce invariants on the embedded (programmatic) path.
src/transcode/error.rs Widens grpc_code_name visibility to pub(crate) so stream_error_json in mod.rs can reuse it — minimal, correct change.
tests/embedded.rs Adds streaming: Default::default() to the manually-constructed ProxyConfig so the embedded path compiles and passes validate().
README.md Documents SSE content negotiation, stream-error event name distinction, and sse_keep_alive_secs config knob. Accurate and complete.

Reviews (3): Last reviewed commit: "docs(readme): trim streaming feature bul..." | Re-trigger Greptile

Comment thread src/transcode/mod.rs
Comment thread src/transcode/mod.rs Outdated
Comment thread src/transcode/mod.rs Outdated
Comment thread src/config.rs
polaz added 4 commits June 21, 2026 14:42
Drop the hand-set transfer-encoding: chunked header. hyper selects
chunked on HTTP/1.1 and DATA frames on HTTP/2 from the protocol
version; a manual transfer-encoding is redundant on h1 and illegal
on h2.
A zero sse_keep_alive_secs makes axum's SSE keep-alive timer fire
continuously instead of as a periodic heartbeat. Validate it at
config load and at router build (covering the embedded path) so the
misconfiguration fails fast rather than producing a busy stream.
Assert that an error mid-stream is the last frame in both NDJSON and
SSE (later messages dropped), and that SSE labels it event: stream-error
rather than the reserved error type. Both fail until the stream stops
after an error and the SSE event is renamed.
- Stop the stream after the first error so an error frame is always the
  last thing a client sees; a serialization failure and a gRPC status
  now share terminal semantics instead of the former silently continuing
  past a serialization-error frame
- Emit the terminal SSE error as event: stream-error rather than the
  reserved error type, which the browser EventSource dispatches only for
  transport failures and would never deliver to a custom handler

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@README.md`:
- Line 19: Simplify the Server-streaming RPC feature bullet at line 19 to a
brief, scannable headline such as "Server-streaming RPC with NDJSON or
Server-Sent Events" instead of the current ~270-character description that
bundles negotiation semantics, configuration options, error handling, and API
distinctions together. The detailed explanation of keep-alive configuration,
error frame delivery, and SSE event handling already exists in the configuration
section at lines 84-91, so those technical details can safely be omitted from
the feature list to improve readability.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro Plus

Run ID: 6c0eac09-c858-4f0d-b5b5-cda0ea311afa

📥 Commits

Reviewing files that changed from the base of the PR and between 853dff3 and b310175.

📒 Files selected for processing (4)
  • README.md
  • src/config.rs
  • src/lib.rs
  • src/transcode/mod.rs

Comment thread README.md Outdated
Move the terminal-error-frame and SSE stream-error/onerror detail from
the dense feature bullet into the streaming config section, leaving the
features list scannable.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(streaming): expose server-streaming RPCs as SSE via Accept negotiation

1 participant