Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/src/main/paradox/client-side/websocket-support.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ the lifetime of the connection. Therefore a WebSocket connection is modelled as
@apidoc[Flow[Message, Message, Mat]] to or a @apidoc[Flow[Message, Message, Mat]] that you connect a @apidoc[Source[Message, Mat]] and
a @apidoc[Sink[Message, Mat]] to.

@@@ note
Client-side WebSocket compression is not currently implemented. Apache Pekko HTTP can negotiate
`permessage-deflate` compression for server-side WebSocket connections when a client requests it.
@@@

A WebSocket request starts with a regular HTTP request which contains an `Upgrade` header (and possibly
other regular HTTP request properties), so in addition to the flow of messages there also is an initial response
from the server, this is modelled with @apidoc[WebSocketUpgradeResponse].
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/paradox/release-notes/releases-2.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ by early adopters. This is experimental. This release should not be used in prod
* Dependency versions have been updated
* Jackson3 is supported in a new pekko-http-jackson3 lib
* Changed Java DSL methods that return Scala Durations to return Java Durations ([PR788](https://github.com/apache/pekko-http/pull/788))
* Server-side WebSocket connections can now negotiate RFC 7692 `permessage-deflate` compression when requested by the client.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2.0.0-M1 is already released. Just revert this and we will form 2.0.0-M2 release notes when that release is ready, hopefully in a month or 2.

34 changes: 34 additions & 0 deletions docs/src/main/paradox/server-side/websocket-support.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,40 @@ In case you need to keep inactive connections alive, you can either tweak your i

<a id="keep-alive-ping"></a>

## WebSocket compression

Apache Pekko HTTP can negotiate the RFC 7692 `permessage-deflate` WebSocket extension for server-side WebSocket
connections. Compression is enabled by default, but is only used when the client requests it with the
`Sec-WebSocket-Extensions: permessage-deflate` header during the WebSocket handshake.

You can disable WebSocket compression globally for the server:

```
pekko.http.server.websocket.compression.enabled = false
```

The server exposes additional settings for the negotiated extension under
`pekko.http.server.websocket.compression.permessage-deflate`, including `compression-level`,
`preferred-client-window-size`, `allow-server-no-context`, and `preferred-client-no-context`. See the
@ref[configuration reference](../configuration.md) for the complete list of settings and defaults.

If compression is enabled globally, a route can still decline compression for a single accepted WebSocket by using the
`handleMessages` or `handleMessagesWith` overload with `compressionEnabled = false`.

@@@ note
The `server_no_context_takeover` and `client_no_context_takeover` extension parameters affect whether compression
dictionaries are retained across messages. Retaining context generally improves compression ratio, while disabling
context takeover can reduce cross-message information retention and memory lifetime. The defaults keep context takeover
enabled unless explicitly configured otherwise.

Pekko HTTP uses the JDK `Deflater` and `Inflater` implementation for `permessage-deflate`. The JDK API
does not expose zlib `windowBits` or `memLevel`, so Pekko HTTP does not accept `server_max_window_bits`
values below `15` and does not provide server window-size or memory-level settings. A client may still
request `client_max_window_bits`; when it does, Pekko HTTP can include the configured
`preferred-client-window-size` in the handshake response to ask the client to use that window size for
client-to-server messages.
@@@

## Automatic keep-alive Ping support

For long running websocket connections it may be beneficial to enable automatic heartbeat using `Ping` frames.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Add server-side WebSocket compression support and per-upgrade compression control.
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.http.impl.engine.ws.Handshake#Server.buildResponse")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.model.ws.WebSocketUpgrade.handleMessagesWith")
38 changes: 38 additions & 0 deletions http-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,44 @@ pekko.http {

# Enable verbose debug logging for all ingoing and outgoing frames
log-frames = false

compression {
# Whether the server should support WebSocket compression using the RFC 7692
# permessage-deflate extension. Compression is negotiated during the
# WebSocket handshake and is only used when the client requests it.
enabled = true

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do adaptive compression?


# Maximum size of a decompressed WebSocket message. If this value is
# exceeded while inflating a compressed message, the connection is closed
# with a WebSocket protocol error.
# Set to 0 to disable this limit.
max-allocation = 64k

permessage-deflate {
# Pekko HTTP uses the JDK Deflater/Inflater implementation for
# permessage-deflate. The JDK API does not expose zlib windowBits or
# memLevel, so clients cannot negotiate a server_max_window_bits value
# below 15. client_max_window_bits can still be negotiated through
# preferred-client-window-size.

# DEFLATE compression level used for server-to-client messages. Valid
# values are 0-9, where 0 uses no compression and 9 favors compression
# ratio over CPU usage.
compression-level = 6

# The client_max_window_bits value Pekko HTTP should request for
# client-to-server messages when the client sends client_max_window_bits
# without an explicit value. Valid values are 8-15.
preferred-client-window-size = 15

# Whether a client may request server_no_context_takeover.
allow-server-no-context = false

# Whether Pekko HTTP should request client_no_context_takeover when the
# client indicates that it supports this parameter.
preferred-client-no-context = false
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ private[http] object HttpHeaderParser {
"content-type",
"expect",
"host",
"sec-websocket-extensions",
"sec-websocket-key",
"sec-websocket-protocol",
"sec-websocket-version",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.immutable.Seq
import pekko.event.LoggingAdapter
import pekko.http.impl.util._
import pekko.http.impl.engine.server.UpgradeToOtherProtocolResponseHeader
import pekko.http.impl.settings.WebSocketSettingsImpl
import pekko.http.scaladsl.model.headers._
import pekko.http.scaladsl.model.ws.Message
import pekko.http.scaladsl.model._
Expand Down Expand Up @@ -122,26 +123,47 @@ private[http] object Handshake {
case OptionVal.Some(p) => p.protocols
case _ => Nil
}
val clientRequestedExtensions = headers.collect {
case extensions: `Sec-WebSocket-Extensions` => extensions.extensions
}.flatten
val perMessageDeflate =
PerMessageDeflate.negotiate(
clientRequestedExtensions,
settings.asInstanceOf[WebSocketSettingsImpl].compression)

val header = new UpgradeToWebSocketLowLevel {
def requestedProtocols: Seq[String] = clientSupportedSubprotocols

def handle(
handler: Either[Graph[FlowShape[FrameEvent, FrameEvent], Any], Graph[FlowShape[Message, Message], Any]],
subprotocol: Option[String]): HttpResponse = {
subprotocol: Option[String],
compressionEnabled: Boolean): HttpResponse = {
require(
subprotocol.forall(chosen => clientSupportedSubprotocols.contains(chosen)),
s"Tried to choose invalid subprotocol '$subprotocol' which wasn't offered by the client: [${requestedProtocols.mkString(", ")}]")
buildResponse(key.get, handler, subprotocol, settings, log)
val acceptedPerMessageDeflate = if (compressionEnabled) perMessageDeflate else None
buildResponse(key.get, handler, subprotocol, acceptedPerMessageDeflate, settings, log)
}

def handleFrames(
handlerFlow: Graph[FlowShape[FrameEvent, FrameEvent], Any], subprotocol: Option[String]): HttpResponse =
handle(Left(handlerFlow), subprotocol)
handle(Left(handlerFlow), subprotocol, compressionEnabled = true)

override private[http] def handleFrames(
handlerFlow: Graph[FlowShape[FrameEvent, FrameEvent], Any],
subprotocol: Option[String],
compressionEnabled: Boolean): HttpResponse =
handle(Left(handlerFlow), subprotocol, compressionEnabled)

override def handleMessages(handlerFlow: Graph[FlowShape[Message, Message], Any],
subprotocol: Option[String] = None): HttpResponse =
handle(Right(handlerFlow), subprotocol)
handle(Right(handlerFlow), subprotocol, compressionEnabled = true)

override def handleMessages(
handlerFlow: Graph[FlowShape[Message, Message], Any],
subprotocol: Option[String],
compressionEnabled: Boolean): HttpResponse =
handle(Right(handlerFlow), subprotocol, compressionEnabled)
}
OptionVal.Some(header)
} else OptionVal.None
Expand Down Expand Up @@ -169,11 +191,16 @@ private[http] object Handshake {
*/
def buildResponse(key: `Sec-WebSocket-Key`,
handler: Either[Graph[FlowShape[FrameEvent, FrameEvent], Any], Graph[FlowShape[Message, Message], Any]],
subprotocol: Option[String], settings: WebSocketSettings, log: LoggingAdapter): HttpResponse = {
subprotocol: Option[String],
perMessageDeflate: Option[PerMessageDeflate.Negotiated],
settings: WebSocketSettings,
log: LoggingAdapter): HttpResponse = {
val frameHandler = handler match {
case Left(frameHandler) => frameHandler
case Left(frameHandler) =>
perMessageDeflate.map(_.frameEventBidiFlow(settings.randomFactory).join(frameHandler)).getOrElse(frameHandler)
case Right(messageHandler) =>
WebSocket.stack(serverSide = true, settings, log = log).join(messageHandler)
WebSocket.stack(serverSide = true, settings, perMessageDeflate = perMessageDeflate, log = log)
.join(messageHandler)
}

HttpResponse(
Expand All @@ -182,7 +209,9 @@ private[http] object Handshake {
List(
UpgradeHeader,
ConnectionUpgradeHeader,
`Sec-WebSocket-Accept`.forKey(key),
`Sec-WebSocket-Accept`.forKey(key)) :::
perMessageDeflate.map(p => `Sec-WebSocket-Extensions`(Seq(p.responseExtension))).toList :::
List(
UpgradeToOtherProtocolResponseHeader(WebSocket.framing.join(frameHandler))))
}
}
Expand Down
Loading