diff --git a/http-core/src/main/mima-filters/2.0.x.backwards.excludes/coalesce-frames-optimization.excludes b/http-core/src/main/mima-filters/2.0.x.backwards.excludes/coalesce-frames-optimization.excludes new file mode 100644 index 0000000000..1cf0875d5a --- /dev/null +++ b/http-core/src/main/mima-filters/2.0.x.backwards.excludes/coalesce-frames-optimization.excludes @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# FrameRenderer#Frame is private[http2] internal API, constructor changed for coalesce frames optimization +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.http.impl.engine.http2.framing.FrameRenderer#Frame.this") diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/FrameEvent.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/FrameEvent.scala index 9888ee0ea6..5743a219ef 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/FrameEvent.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/FrameEvent.scala @@ -88,6 +88,8 @@ private[http] object FrameEvent { streamId: Int, windowSizeIncrement: Int) extends StreamFrameEvent + final case class CompositeFrame(frames: immutable.Seq[FrameEvent]) extends FrameEvent + final case class PriorityFrame( streamId: Int, exclusiveFlag: Boolean, diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/FrameLogger.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/FrameLogger.scala index 00caa04853..44fc63d259 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/FrameLogger.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/FrameLogger.scala @@ -105,6 +105,9 @@ private[http2] object FrameLogger { case WindowUpdateFrame(streamId, windowSizeIncrement) => LogEntry(streamId, "WIND", s"+ $windowSizeIncrement") + case CompositeFrame(frames) => + LogEntry(0, "COMP", frames.map(entryForFrame).map(display).mkString("; ")) + case other: StreamFrameEvent => LogEntry(other.streamId, "UNKN", other.toString) case other => diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala index 66af366415..5aefc126a2 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala @@ -146,9 +146,21 @@ private[http] final class Http2Ext(implicit val system: ActorSystem) log: LoggingAdapter, handler: HttpRequest => Future[HttpResponse]): HttpRequest => Future[HttpResponse] = { request => try { - handler(request).recover { - case NonFatal(ex) => handleHandlerError(log, ex) - }(ExecutionContext.parasitic) + val response = handler(request) + // Fast path: if the handler returned an already-completed successful Future + // (common for gRPC unary handlers), skip the .recover allocation entirely. + // .recover always allocates a Recover PartialFunction + wrapper Future via transform, + // even when the original Future is already successful. + response.value match { + case Some(Success(_)) => response + case Some(Failure(ex)) if NonFatal(ex) => + Future.successful(handleHandlerError(log, ex)) + case Some(Failure(ex)) => throw ex + case None => + response.recover { + case NonFatal(ex) => handleHandlerError(log, ex) + }(ExecutionContext.parasitic) + } } catch { case NonFatal(ex) => Future.successful(handleHandlerError(log, ex)) } diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Blueprint.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Blueprint.scala index 31454417f6..65f28a1c9b 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Blueprint.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Blueprint.scala @@ -303,17 +303,27 @@ private[http] object Http2Blueprint { implicit ec: ExecutionContext): Flow[HttpRequest, HttpResponse, NotUsed] = Flow[HttpRequest] .mapAsyncUnordered(parallelism) { req => - // The handler itself may do significant work so make sure to schedule it separately. This is especially important for HTTP/2 where it is expected that - // multiple requests are handled concurrently on the same connection. The complete stream including `mapAsyncUnordered` shares one GraphInterpreter, so - // that this extra indirection will guard the GraphInterpreter from being starved by user code. - Future { - val response = handler(req) - - req.attribute(Http2.streamId) match { - case Some(streamIdHeader) => response.map(_.addAttribute(Http2.streamId, streamIdHeader)) // add stream id attribute when request had it - case None => response - } - }.flatten + // mapAsyncUnordered already schedules on the execution context, + // so call the handler directly without wrapping in Future { } to avoid + // an extra EC dispatch hop. This is significant for fast handlers + // (e.g. gRPC unary handlers returning Future.successful) where the + // extra hop would double the scheduling overhead. + val response = try handler(req) + catch { case scala.util.control.NonFatal(ex) => Future.failed(ex) } + + req.attribute(Http2.streamId) match { + case Some(streamIdHeader) => + // Fast path: if response is already completed, add attribute synchronously + response.value match { + case Some(scala.util.Success(resp)) => + Future.successful(resp.addAttribute(Http2.streamId, streamIdHeader)) + case Some(scala.util.Failure(ex)) => + Future.failed(ex) + case None => + response.map(_.addAttribute(Http2.streamId, streamIdHeader)) + } + case None => response + } } private[http2] def logParsingError(info: ErrorInfo, log: LoggingAdapter, diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala index df829c20bc..fc559a37b6 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala @@ -348,18 +348,30 @@ private[http2] abstract class Http2Demux(http2Settings: Http2CommonSettings, def onPush(): Unit = { val frame = grab(frameIn) + // Single pattern match: PingFrame first (no onDataFrameSeen), + // all other frames call onDataFrameSeen at the start. frame match { - case _: PingFrame => // handle later - case _ => pingState.onDataFrameSeen() - } - frame match { + case PingFrame(true, data) => + if (data != ConfigurablePing.Ping.data) { + // We only ever push static data, responding with anything else is wrong + pushGOAWAY(ErrorCode.PROTOCOL_ERROR, "Ping ack contained unexpected data") + } else { + pingState.onPingAck() + } + case PingFrame(false, data) => + multiplexer.pushControlFrame(PingFrame(ack = true, data)) + case WindowUpdateFrame(streamId, increment) if streamId == 0 /* else fall through to StreamFrameEvent */ => + pingState.onDataFrameSeen() if (!multiplexer.updateConnectionLevelWindow(increment)) pushGOAWAY(ErrorCode.FLOW_CONTROL_ERROR, "WINDOW_UPDATE would exceed maximum connection-level flow-control window size") - case p: PriorityFrame => multiplexer.updatePriority(p) + case p: PriorityFrame => + pingState.onDataFrameSeen() + multiplexer.updatePriority(p) case s: StreamFrameEvent => + pingState.onDataFrameSeen() if (!terminating) handleStreamEvent(s) else if (s.streamId <= lastIdBeforeTermination) @@ -369,6 +381,7 @@ private[http2] abstract class Http2Demux(http2Settings: Http2CommonSettings, multiplexer.pushControlFrame(RstStreamFrame(s.streamId, ErrorCode.REFUSED_STREAM)) case SettingsFrame(settings) => + pingState.onDataFrameSeen() if (settings.nonEmpty) debug(s"Got ${settings.length} settings!") val settingsAppliedOk = applyRemoteSettings(settings) @@ -376,24 +389,16 @@ private[http2] abstract class Http2Demux(http2Settings: Http2CommonSettings, multiplexer.pushControlFrame(SettingsAckFrame(settings)) } - case SettingsAckFrame(_) => + case _: SettingsAckFrame => + pingState.onDataFrameSeen() // Currently, we only expect an ack for the initial settings frame, sent // above in preStart. Since only some settings are supported, and those // settings are non-modifiable and known at construction time, these settings // are enforced from the start of the connection so there's no need to invoke // `enforceSettings(initialLocalSettings)` - case PingFrame(true, data) => - if (data != ConfigurablePing.Ping.data) { - // We only ever push static data, responding with anything else is wrong - pushGOAWAY(ErrorCode.PROTOCOL_ERROR, "Ping ack contained unexpected data") - } else { - pingState.onPingAck() - } - case PingFrame(false, data) => - multiplexer.pushControlFrame(PingFrame(ack = true, data)) - case e => + pingState.onDataFrameSeen() debug(s"Got unhandled event $e") // ignore unknown frames } diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala index 60c3ad996f..512083f503 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala @@ -194,9 +194,10 @@ private[http2] trait Http2MultiplexerSupport { logic: GraphStageLogic with Stage else WaitingForNetworkToSendData } case PullFrameResult.SendFrameAndTrailer(frame, trailer) => - send(frame) - controlFrameBuffer += trailer - WaitingForNetworkToSendControlFrames + pushFrameOut(CompositeFrame(frame :: trailer :: Nil)) + connectionWindowLeft -= frame.payload.length + if (sendableOutstreams.isEmpty) Idle + else WaitingForNetworkToSendData } } } diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala index e74691ab4c..f90ffb4db6 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala @@ -114,23 +114,37 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper def activeStreamCount(): Int = streamStates.size /** Called by Http2ServerDemux to let the state machine handle StreamFrameEvents */ - def handleStreamEvent(e: StreamFrameEvent): Unit = - updateState(e.streamId, _.handle(e), "handleStreamEvent", e.frameTypeName) + def handleStreamEvent(e: StreamFrameEvent): Unit = { + // Inline state transition to avoid _.handle(e) lambda allocation per frame. + // This is the hottest call site - invoked for every incoming HTTP/2 frame. + require(!stateMachineRunning, "State machine already running") + stateMachineRunning = true + + val streamId = e.streamId + val oldState = streamFor(streamId) + val newState = oldState.handle(e) + commitStreamState(streamId, oldState, newState) + } /** Called by Http2ServerDemux when a stream comes in from the user-handler */ def handleOutgoingCreated(stream: Http2SubStream): Unit = { stream.initialHeaders.priorityInfo.foreach(multiplexer.updatePriority) - if (streamFor(stream.streamId) != Closed) { + val streamId = stream.streamId + val oldState = streamFor(streamId) + if (oldState ne Closed) { multiplexer.pushControlFrame(stream.initialHeaders) - if (stream.initialHeaders.endStream) { - updateState(stream.streamId, _.handleOutgoingCreatedAndFinished(stream.correlationAttributes), - "handleOutgoingCreatedAndFinished") - } else { - val outStream = OutStream(stream) - updateState(stream.streamId, _.handleOutgoingCreated(outStream, stream.correlationAttributes), - "handleOutgoingCreated") - } + // Inline state transition to avoid lambda allocation per response + require(!stateMachineRunning, "State machine already running") + stateMachineRunning = true + val newState = + if (stream.initialHeaders.endStream) + oldState.handleOutgoingCreatedAndFinished(stream.correlationAttributes) + else { + val outStream = OutStream(stream) + oldState.handleOutgoingCreated(outStream, stream.correlationAttributes) + } + commitStreamState(streamId, oldState, newState) } else // stream was cancelled by peer before our response was ready stream.data.foreach(_.runWith(Sink.cancelled)(subFusingMaterializer)) @@ -138,8 +152,13 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper } // Called by the outgoing stream multiplexer when that side of the stream is ended. - def handleOutgoingEnded(streamId: Int): Unit = - updateState(streamId, _.handleOutgoingEnded(), "handleOutgoingEnded") + def handleOutgoingEnded(streamId: Int): Unit = { + require(!stateMachineRunning, "State machine already running") + stateMachineRunning = true + val oldState = streamFor(streamId) + val newState = oldState.handleOutgoingEnded() + commitStreamState(streamId, oldState, newState) + } def handleOutgoingFailed(streamId: Int, cause: Throwable): Unit = updateState(streamId, _.handleOutgoingFailed(cause), "handleOutgoingFailed") @@ -163,8 +182,36 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper streamStates.keys.foreach(streamId => updateState(streamId.toInt, handle, event, eventArg)) private def updateState( - streamId: Int, handle: StreamState => StreamState, event: String, eventArg: AnyRef = null): Unit = - updateStateAndReturn(streamId, x => (handle(x), ()), event, eventArg) + streamId: Int, handle: StreamState => StreamState, event: String, eventArg: AnyRef = null): Unit = { + require(!stateMachineRunning, "State machine already running") + stateMachineRunning = true + + val oldState = streamFor(streamId) + val newState = handle(oldState) + commitStreamState(streamId, oldState, newState) + } + + /** Commits a stream state transition with bookkeeping (map update, debug logging, deferred enqueue). */ + private def commitStreamState(streamId: Int, oldState: StreamState, newState: StreamState): Unit = { + newState match { + case Closed => + streamStates.remove(streamId) + if (streamStates.isEmpty) onAllStreamsClosed() + tryPullSubStreams() + case newState => streamStates.put(streamId, newState) + } + + debug( + s"Incoming side of stream [$streamId] changed state: ${oldState.stateName} -> ${newState.stateName}") + + stateMachineRunning = false + if (deferredStreamToEnqueue != -1) { + val deferredId = deferredStreamToEnqueue + deferredStreamToEnqueue = -1 + if (streamStates.contains(deferredId)) + multiplexer.enqueueOutStream(deferredId) + } + } // Calling multiplexer.enqueueOutStream directly out of the state machine is not allowed, because it might try to // reenter the state machine with `pullNextState`. This call defers enqueuing until the current state machine operation diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/RequestErrorFlow.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/RequestErrorFlow.scala index c379449921..e974b0a7db 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/RequestErrorFlow.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/RequestErrorFlow.scala @@ -48,27 +48,29 @@ private[http2] final class RequestErrorFlow override val shape: BidiShape[HttpResponse, HttpResponse, ParseRequestResult, HttpRequest] = BidiShape(responseIn, responseOut, requestIn, requestOut) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - setHandlers(requestIn, requestOut, - new InHandler with OutHandler { - override def onPush(): Unit = { - grab(requestIn) match { - case RequestParsing.OkRequest(request) => push(requestOut, request) - case notOk: RequestParsing.BadRequest => - emit(responseOut, - HttpResponse(StatusCodes.BadRequest, entity = notOk.info.summary).addAttribute(Http2.streamId, - notOk.streamId)) - pull(requestIn) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + // Response path: simple pass-through merged into GraphStageLogic + override def onPush(): Unit = push(responseOut, grab(responseIn)) + override def onPull(): Unit = if (!hasBeenPulled(responseIn)) pull(responseIn) + setHandlers(responseIn, responseOut, this) + + // Request path: parse result handling + setHandlers(requestIn, requestOut, + new InHandler with OutHandler { + override def onPush(): Unit = { + grab(requestIn) match { + case RequestParsing.OkRequest(request) => push(requestOut, request) + case notOk: RequestParsing.BadRequest => + emit(responseOut, + HttpResponse(StatusCodes.BadRequest, entity = notOk.info.summary).addAttribute(Http2.streamId, + notOk.streamId)) + pull(requestIn) + } } - } - override def onPull(): Unit = pull(requestIn) - }) - setHandlers(responseIn, responseOut, - new InHandler with OutHandler { - override def onPush(): Unit = push(responseOut, grab(responseIn)) - override def onPull(): Unit = if (!hasBeenPulled(responseIn)) pull(responseIn) - }) + override def onPull(): Unit = pull(requestIn) + }) - } + } } diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/framing/FrameRenderer.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/framing/FrameRenderer.scala index ca6ebcc023..140d4614ac 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/framing/FrameRenderer.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/framing/FrameRenderer.scala @@ -72,6 +72,12 @@ private[http2] object FrameRenderer { .putInt32(windowSizeIncrement) .build() + case CompositeFrame((data: DataFrame) +: (headers: HeadersFrame) +: Nil) => + renderDataAndHeaders(data, headers) + + case CompositeFrame(frames) => + frames.iterator.map(render).foldLeft(ByteString.empty)(_ ++ _) + case ContinuationFrame(streamId, endHeaders, payload) => Frame( payload.length, @@ -153,14 +159,51 @@ private[http2] object FrameRenderer { .put(payload) .build() + private def renderDataAndHeaders(data: DataFrame, headers: HeadersFrame): ByteString = { + val headersPayloadSize = (if (headers.priorityInfo.isDefined) 5 else 0) + headers.headerBlockFragment.length + val buffer = new Array[Byte](9 + data.payload.length + 9 + headersPayloadSize) + val afterData = Frame + .writeTo( + buffer, + offset = 0, + data.payload.length, + Http2Protocol.FrameType.DATA, + Http2Protocol.Flags.END_STREAM.ifSet(data.endStream), + data.streamId) + .put(data.payload) + .finish() + Frame + .writeTo( + buffer, + afterData, + headersPayloadSize, + Http2Protocol.FrameType.HEADERS, + Http2Protocol.Flags.END_STREAM.ifSet(headers.endStream) | + Http2Protocol.Flags.END_HEADERS.ifSet(headers.endHeaders) | + Http2Protocol.Flags.PRIORITY.ifSet(headers.priorityInfo.isDefined), + headers.streamId) + .putPriorityInfo(headers.priorityInfo) + .put(headers.headerBlockFragment) + .finish() + ByteString.fromArrayUnsafe(buffer) + } + private object Frame { def apply(payloadSize: Int, tpe: FrameType, flags: ByteFlag, streamId: Int): Frame = - new Frame(payloadSize, tpe, flags, streamId) + new Frame(payloadSize, tpe, flags, streamId, new Array[Byte](9 + payloadSize), 0) + def writeTo(buffer: Array[Byte], offset: Int, payloadSize: Int, tpe: FrameType, flags: ByteFlag, + streamId: Int): Frame = + new Frame(payloadSize, tpe, flags, streamId, buffer, offset) } - private class Frame(payloadSize: Int, tpe: FrameType, flags: ByteFlag, streamId: Int) { - private val targetSize = 9 + payloadSize - private val buffer = new Array[Byte](targetSize) - private var pos = 0 + private class Frame( + payloadSize: Int, + tpe: FrameType, + flags: ByteFlag, + streamId: Int, + buffer: Array[Byte], + start: Int) { + private val targetSize = start + 9 + payloadSize + private var pos = start putInt24(payloadSize) putByte(tpe.id.toByte) @@ -212,8 +255,13 @@ private[http2] object FrameRenderer { this } - def build(): ByteString = + def finish(): Int = if (pos != targetSize) throw new IllegalStateException(s"Did not write exactly $targetSize bytes but $pos") - else ByteString.fromArrayUnsafe(buffer) + else pos + + def build(): ByteString = { + finish() + ByteString.fromArrayUnsafe(buffer) + } } } diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderCompression.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderCompression.scala index 60fec77d8b..8ceeb90e8b 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderCompression.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderCompression.scala @@ -43,48 +43,79 @@ private[http2] object HeaderCompression extends GraphStage[FlowShape[FrameEvent, val encoder = new pekko.http.shaded.com.twitter.hpack.Encoder(Http2Protocol.InitialMaxHeaderTableSize) val os = new ByteArrayOutputStream(128) - def onPull(): Unit = pull(eventsIn) + private def compressedHeadersFrame( + streamId: Int, + endStream: Boolean, + kvs: Seq[(String, AnyRef)], + prioInfo: Option[PriorityFrame]): FrameEvent = + // When ending the stream without any payload, use a DATA frame rather than + // a HEADERS frame to work around https://github.com/golang/go/issues/47851. + if (endStream && kvs.isEmpty) DataFrame(streamId, endStream, ByteString.empty) + else { + kvs.foreach { + case (key, value: String) => + encoder.encodeHeader(os, key, value, false) + case (key, value) => + throw new IllegalStateException( + s"Didn't expect key-value-pair [$key] -> [$value](${value.getClass}) here.") + } + val result = ByteString.fromArrayUnsafe(os.toByteArray) // BAOS.toByteArray always creates a copy + os.reset() + if (result.size <= currentMaxFrameSize) + HeadersFrame(streamId, endStream, endHeaders = true, result, prioInfo) + else { + val builder = Vector.newBuilder[FrameEvent] + builder += HeadersFrame(streamId, endStream, endHeaders = false, result.take(currentMaxFrameSize), prioInfo) + var remainingData = result.drop(currentMaxFrameSize) + while (remainingData.nonEmpty) { + val thisFragment = remainingData.take(currentMaxFrameSize) + val rest = remainingData.drop(currentMaxFrameSize) + builder += ContinuationFrame(streamId, endHeaders = rest.isEmpty, thisFragment) + remainingData = rest + } + CompositeFrame(builder.result()) + } + } + + private def compressedFrame(frame: FrameEvent): FrameEvent = + frame match { + case ParsedHeadersFrame(streamId, endStream, kvs, prioInfo, _) => + compressedHeadersFrame(streamId, endStream, kvs, prioInfo) + case other => other + } + + // Continuation frames to drain when a CompositeFrame is split. + // Using a var field on Logic avoids allocating a new OutHandler per CompositeFrame. + private var continuationFrames: Seq[FrameEvent] = null + + def onPull(): Unit = + if (continuationFrames ne null) { + push(eventsOut, continuationFrames.head) + val rest = continuationFrames.tail + continuationFrames = if (rest.isEmpty) null else rest + } else pull(eventsIn) + def onPush(): Unit = grab(eventsIn) match { case ack @ SettingsAckFrame(s) => applySettings(s) push(eventsOut, ack) case ParsedHeadersFrame(streamId, endStream, kvs, prioInfo, _) => - // When ending the stream without any payload, use a DATA frame rather than - // a HEADERS frame to work around https://github.com/golang/go/issues/47851. - if (endStream && kvs.isEmpty) push(eventsOut, DataFrame(streamId, endStream, ByteString.empty)) - else { - kvs.foreach { - case (key, value: String) => - encoder.encodeHeader(os, key, value, false) - case (key, value) => - throw new IllegalStateException( - s"Didn't expect key-value-pair [$key] -> [$value](${value.getClass}) here.") - } - val result = ByteString.fromArrayUnsafe(os.toByteArray) // BAOS.toByteArray always creates a copy - os.reset() - if (result.size <= currentMaxFrameSize) - push(eventsOut, HeadersFrame(streamId, endStream, endHeaders = true, result, prioInfo)) - else { - val first = - HeadersFrame(streamId, endStream, endHeaders = false, result.take(currentMaxFrameSize), prioInfo) - + compressedHeadersFrame(streamId, endStream, kvs, prioInfo) match { + case CompositeFrame(first +: rest) => push(eventsOut, first) - setHandler(eventsOut, - new OutHandler { - private var remainingData = result.drop(currentMaxFrameSize) - - def onPull(): Unit = { - val thisFragment = remainingData.take(currentMaxFrameSize) - val rest = remainingData.drop(currentMaxFrameSize) - val last = rest.isEmpty - - push(eventsOut, ContinuationFrame(streamId, endHeaders = last, thisFragment)) - if (last) setHandler(eventsOut, logic) - else remainingData = rest - } - }) - } + if (rest.nonEmpty) continuationFrames = rest + case frame => push(eventsOut, frame) + } + case CompositeFrame(frames) => + val builder = Vector.newBuilder[FrameEvent] + frames.foreach { + case frame => + compressedFrame(frame) match { + case CompositeFrame(compressed) => builder ++= compressed + case compressed => builder += compressed + } } + push(eventsOut, CompositeFrame(builder.result())) case x => push(eventsOut, x) } diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderDecompression.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderDecompression.scala index 6d992e36b0..1d3dd55c1f 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderDecompression.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderDecompression.scala @@ -54,6 +54,10 @@ private[http2] final class HeaderDecompression(masterHeaderParser: HttpHeaderPar val decoder = new pekko.http.shaded.com.twitter.hpack.Decoder(Http2Protocol.InitialMaxHeaderListSize, Http2Protocol.InitialMaxHeaderTableSize) + // Cache for common gRPC headers to avoid repeated parsing + // Key: (name, value), Value: parsed header object + private val headerCache = new java.util.concurrent.ConcurrentHashMap[(String, String), AnyRef](64) + become(Idle) // simple state machine @@ -70,28 +74,41 @@ private[http2] final class HeaderDecompression(masterHeaderParser: HttpHeaderPar parsed } else { import Http2HeaderParsing._ - def handle(parsed: AnyRef): AnyRef = { - headers += name -> parsed - parsed - } - name match { - case "content-type" => handle(ContentType.parse(name, value, parserSettings)) - case ":authority" => handle(Authority.parse(name, value, parserSettings)) - case ":path" => - if (value.isEmpty) - throw new Http2ProtocolException("Malformed request: ':path' must not be empty") - handle(PathAndQuery.parse(name, value, parserSettings)) - case ":method" => handle(Method.parse(name, value, parserSettings)) - case ":scheme" => handle(Scheme.parse(name, value, parserSettings)) - case "content-length" => handle(ContentLength.parse(name, value, parserSettings)) - case "cookie" => handle(Cookie.parse(name, value, parserSettings)) - case x if x(0) == ':' => handle(value) - case _ => - // cannot use OtherHeader.parse because that doesn't has access to header parser - val header = parseHeaderPair(httpHeaderParser, name, value) - RequestParsing.validateHeader(header) - handle(header) + // Try cache first for common headers + val cacheKey = (name, value) + val cached = headerCache.get(cacheKey) + if (cached ne null) { + headers += name -> cached + cached + } else { + def handle(parsed: AnyRef): AnyRef = { + // Cache the parsed result for future use (limit cache size to avoid memory issues) + if (headerCache.size() < 1024) { + headerCache.put(cacheKey, parsed) + } + headers += name -> parsed + parsed + } + + name match { + case "content-type" => handle(ContentType.parse(name, value, parserSettings)) + case ":authority" => handle(Authority.parse(name, value, parserSettings)) + case ":path" => + if (value.isEmpty) + throw new Http2ProtocolException("Malformed request: ':path' must not be empty") + handle(PathAndQuery.parse(name, value, parserSettings)) + case ":method" => handle(Method.parse(name, value, parserSettings)) + case ":scheme" => handle(Scheme.parse(name, value, parserSettings)) + case "content-length" => handle(ContentLength.parse(name, value, parserSettings)) + case "cookie" => handle(Cookie.parse(name, value, parserSettings)) + case x if x(0) == ':' => handle(value) + case _ => + // cannot use OtherHeader.parse because that doesn't has access to header parser + val header = parseHeaderPair(httpHeaderParser, name, value) + RequestParsing.validateHeader(header) + handle(header) + } } } }