Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# FrameRenderer#Frame is private[http2] internal API, constructor changed for coalesce frames optimization
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.http.impl.engine.http2.framing.FrameRenderer#Frame.this")
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ private[http] object FrameEvent {
streamId: Int,
windowSizeIncrement: Int) extends StreamFrameEvent

final case class CompositeFrame(frames: immutable.Seq[FrameEvent]) extends FrameEvent

final case class PriorityFrame(
streamId: Int,
exclusiveFlag: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ private[http2] object FrameLogger {
case WindowUpdateFrame(streamId, windowSizeIncrement) =>
LogEntry(streamId, "WIND", s"+ $windowSizeIncrement")

case CompositeFrame(frames) =>
LogEntry(0, "COMP", frames.map(entryForFrame).map(display).mkString("; "))

case other: StreamFrameEvent =>
LogEntry(other.streamId, "UNKN", other.toString)
case other =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,21 @@ private[http] final class Http2Ext(implicit val system: ActorSystem)
log: LoggingAdapter,
handler: HttpRequest => Future[HttpResponse]): HttpRequest => Future[HttpResponse] = { request =>
try {
handler(request).recover {
case NonFatal(ex) => handleHandlerError(log, ex)
}(ExecutionContext.parasitic)
val response = handler(request)
// Fast path: if the handler returned an already-completed successful Future
// (common for gRPC unary handlers), skip the .recover allocation entirely.
// .recover always allocates a Recover PartialFunction + wrapper Future via transform,
// even when the original Future is already successful.
response.value match {
case Some(Success(_)) => response
case Some(Failure(ex)) if NonFatal(ex) =>
Future.successful(handleHandlerError(log, ex))
case Some(Failure(ex)) => throw ex
case None =>
response.recover {
case NonFatal(ex) => handleHandlerError(log, ex)
}(ExecutionContext.parasitic)
}
} catch {
case NonFatal(ex) => Future.successful(handleHandlerError(log, ex))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,17 +303,27 @@ private[http] object Http2Blueprint {
implicit ec: ExecutionContext): Flow[HttpRequest, HttpResponse, NotUsed] =
Flow[HttpRequest]
.mapAsyncUnordered(parallelism) { req =>
// The handler itself may do significant work so make sure to schedule it separately. This is especially important for HTTP/2 where it is expected that
// multiple requests are handled concurrently on the same connection. The complete stream including `mapAsyncUnordered` shares one GraphInterpreter, so
// that this extra indirection will guard the GraphInterpreter from being starved by user code.
Future {
val response = handler(req)

req.attribute(Http2.streamId) match {
case Some(streamIdHeader) => response.map(_.addAttribute(Http2.streamId, streamIdHeader)) // add stream id attribute when request had it
case None => response
}
}.flatten
// mapAsyncUnordered already schedules on the execution context,
// so call the handler directly without wrapping in Future { } to avoid
// an extra EC dispatch hop. This is significant for fast handlers
// (e.g. gRPC unary handlers returning Future.successful) where the
// extra hop would double the scheduling overhead.
val response = try handler(req)
catch { case scala.util.control.NonFatal(ex) => Future.failed(ex) }

req.attribute(Http2.streamId) match {
case Some(streamIdHeader) =>
// Fast path: if response is already completed, add attribute synchronously
response.value match {
case Some(scala.util.Success(resp)) =>
Future.successful(resp.addAttribute(Http2.streamId, streamIdHeader))
case Some(scala.util.Failure(ex)) =>
Future.failed(ex)
case None =>
response.map(_.addAttribute(Http2.streamId, streamIdHeader))
}
case None => response
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjfanning Should I extract these commits one by one by one as seperated PR?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with leaving this as is. I'm hoping that other people will review this but if not, I'll ok this in a few days.

}

private[http2] def logParsingError(info: ErrorInfo, log: LoggingAdapter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,18 +348,30 @@ private[http2] abstract class Http2Demux(http2Settings: Http2CommonSettings,

def onPush(): Unit = {
val frame = grab(frameIn)
// Single pattern match: PingFrame first (no onDataFrameSeen),
// all other frames call onDataFrameSeen at the start.
frame match {
case _: PingFrame => // handle later
case _ => pingState.onDataFrameSeen()
}
frame match {
case PingFrame(true, data) =>
if (data != ConfigurablePing.Ping.data) {
// We only ever push static data, responding with anything else is wrong
pushGOAWAY(ErrorCode.PROTOCOL_ERROR, "Ping ack contained unexpected data")
} else {
pingState.onPingAck()
}
case PingFrame(false, data) =>
multiplexer.pushControlFrame(PingFrame(ack = true, data))

case WindowUpdateFrame(streamId, increment)
if streamId == 0 /* else fall through to StreamFrameEvent */ =>
pingState.onDataFrameSeen()
if (!multiplexer.updateConnectionLevelWindow(increment))
pushGOAWAY(ErrorCode.FLOW_CONTROL_ERROR,
"WINDOW_UPDATE would exceed maximum connection-level flow-control window size")
case p: PriorityFrame => multiplexer.updatePriority(p)
case p: PriorityFrame =>
pingState.onDataFrameSeen()
multiplexer.updatePriority(p)
case s: StreamFrameEvent =>
pingState.onDataFrameSeen()
if (!terminating)
handleStreamEvent(s)
else if (s.streamId <= lastIdBeforeTermination)
Expand All @@ -369,31 +381,24 @@ private[http2] abstract class Http2Demux(http2Settings: Http2CommonSettings,
multiplexer.pushControlFrame(RstStreamFrame(s.streamId, ErrorCode.REFUSED_STREAM))

case SettingsFrame(settings) =>
pingState.onDataFrameSeen()
if (settings.nonEmpty) debug(s"Got ${settings.length} settings!")

val settingsAppliedOk = applyRemoteSettings(settings)
if (settingsAppliedOk) {
multiplexer.pushControlFrame(SettingsAckFrame(settings))
}

case SettingsAckFrame(_) =>
case _: SettingsAckFrame =>
pingState.onDataFrameSeen()
// Currently, we only expect an ack for the initial settings frame, sent
// above in preStart. Since only some settings are supported, and those
// settings are non-modifiable and known at construction time, these settings
// are enforced from the start of the connection so there's no need to invoke
// `enforceSettings(initialLocalSettings)`

case PingFrame(true, data) =>
if (data != ConfigurablePing.Ping.data) {
// We only ever push static data, responding with anything else is wrong
pushGOAWAY(ErrorCode.PROTOCOL_ERROR, "Ping ack contained unexpected data")
} else {
pingState.onPingAck()
}
case PingFrame(false, data) =>
multiplexer.pushControlFrame(PingFrame(ack = true, data))

case e =>
pingState.onDataFrameSeen()
debug(s"Got unhandled event $e")
// ignore unknown frames
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,10 @@ private[http2] trait Http2MultiplexerSupport { logic: GraphStageLogic with Stage
else WaitingForNetworkToSendData
}
case PullFrameResult.SendFrameAndTrailer(frame, trailer) =>
send(frame)
controlFrameBuffer += trailer
WaitingForNetworkToSendControlFrames
pushFrameOut(CompositeFrame(frame :: trailer :: Nil))
connectionWindowLeft -= frame.payload.length
if (sendableOutstreams.isEmpty) Idle
else WaitingForNetworkToSendData
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,32 +114,51 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper
def activeStreamCount(): Int = streamStates.size

/** Called by Http2ServerDemux to let the state machine handle StreamFrameEvents */
def handleStreamEvent(e: StreamFrameEvent): Unit =
updateState(e.streamId, _.handle(e), "handleStreamEvent", e.frameTypeName)
def handleStreamEvent(e: StreamFrameEvent): Unit = {
// Inline state transition to avoid _.handle(e) lambda allocation per frame.
// This is the hottest call site - invoked for every incoming HTTP/2 frame.
require(!stateMachineRunning, "State machine already running")
stateMachineRunning = true

val streamId = e.streamId
val oldState = streamFor(streamId)
val newState = oldState.handle(e)
commitStreamState(streamId, oldState, newState)
}

/** Called by Http2ServerDemux when a stream comes in from the user-handler */
def handleOutgoingCreated(stream: Http2SubStream): Unit = {
stream.initialHeaders.priorityInfo.foreach(multiplexer.updatePriority)
if (streamFor(stream.streamId) != Closed) {
val streamId = stream.streamId
val oldState = streamFor(streamId)
if (oldState ne Closed) {
multiplexer.pushControlFrame(stream.initialHeaders)

if (stream.initialHeaders.endStream) {
updateState(stream.streamId, _.handleOutgoingCreatedAndFinished(stream.correlationAttributes),
"handleOutgoingCreatedAndFinished")
} else {
val outStream = OutStream(stream)
updateState(stream.streamId, _.handleOutgoingCreated(outStream, stream.correlationAttributes),
"handleOutgoingCreated")
}
// Inline state transition to avoid lambda allocation per response
require(!stateMachineRunning, "State machine already running")
stateMachineRunning = true
val newState =
if (stream.initialHeaders.endStream)
oldState.handleOutgoingCreatedAndFinished(stream.correlationAttributes)
else {
val outStream = OutStream(stream)
oldState.handleOutgoingCreated(outStream, stream.correlationAttributes)
}
commitStreamState(streamId, oldState, newState)
} else
// stream was cancelled by peer before our response was ready
stream.data.foreach(_.runWith(Sink.cancelled)(subFusingMaterializer))

}

// Called by the outgoing stream multiplexer when that side of the stream is ended.
def handleOutgoingEnded(streamId: Int): Unit =
updateState(streamId, _.handleOutgoingEnded(), "handleOutgoingEnded")
def handleOutgoingEnded(streamId: Int): Unit = {
require(!stateMachineRunning, "State machine already running")
stateMachineRunning = true
val oldState = streamFor(streamId)
val newState = oldState.handleOutgoingEnded()
commitStreamState(streamId, oldState, newState)
}

def handleOutgoingFailed(streamId: Int, cause: Throwable): Unit =
updateState(streamId, _.handleOutgoingFailed(cause), "handleOutgoingFailed")
Expand All @@ -163,8 +182,36 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper
streamStates.keys.foreach(streamId => updateState(streamId.toInt, handle, event, eventArg))

private def updateState(
streamId: Int, handle: StreamState => StreamState, event: String, eventArg: AnyRef = null): Unit =
updateStateAndReturn(streamId, x => (handle(x), ()), event, eventArg)
streamId: Int, handle: StreamState => StreamState, event: String, eventArg: AnyRef = null): Unit = {
require(!stateMachineRunning, "State machine already running")
stateMachineRunning = true

val oldState = streamFor(streamId)
val newState = handle(oldState)
commitStreamState(streamId, oldState, newState)
}

/** Commits a stream state transition with bookkeeping (map update, debug logging, deferred enqueue). */
private def commitStreamState(streamId: Int, oldState: StreamState, newState: StreamState): Unit = {
newState match {
case Closed =>
streamStates.remove(streamId)
if (streamStates.isEmpty) onAllStreamsClosed()
tryPullSubStreams()
case newState => streamStates.put(streamId, newState)
}

debug(
s"Incoming side of stream [$streamId] changed state: ${oldState.stateName} -> ${newState.stateName}")

stateMachineRunning = false
if (deferredStreamToEnqueue != -1) {
val deferredId = deferredStreamToEnqueue
deferredStreamToEnqueue = -1
if (streamStates.contains(deferredId))
multiplexer.enqueueOutStream(deferredId)
}
}

// Calling multiplexer.enqueueOutStream directly out of the state machine is not allowed, because it might try to
// reenter the state machine with `pullNextState`. This call defers enqueuing until the current state machine operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,29 @@ private[http2] final class RequestErrorFlow
override val shape: BidiShape[HttpResponse, HttpResponse, ParseRequestResult, HttpRequest] =
BidiShape(responseIn, responseOut, requestIn, requestOut)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandlers(requestIn, requestOut,
new InHandler with OutHandler {
override def onPush(): Unit = {
grab(requestIn) match {
case RequestParsing.OkRequest(request) => push(requestOut, request)
case notOk: RequestParsing.BadRequest =>
emit(responseOut,
HttpResponse(StatusCodes.BadRequest, entity = notOk.info.summary).addAttribute(Http2.streamId,
notOk.streamId))
pull(requestIn)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
// Response path: simple pass-through merged into GraphStageLogic
override def onPush(): Unit = push(responseOut, grab(responseIn))
override def onPull(): Unit = if (!hasBeenPulled(responseIn)) pull(responseIn)
setHandlers(responseIn, responseOut, this)

// Request path: parse result handling
setHandlers(requestIn, requestOut,
new InHandler with OutHandler {
override def onPush(): Unit = {
grab(requestIn) match {
case RequestParsing.OkRequest(request) => push(requestOut, request)
case notOk: RequestParsing.BadRequest =>
emit(responseOut,
HttpResponse(StatusCodes.BadRequest, entity = notOk.info.summary).addAttribute(Http2.streamId,
notOk.streamId))
pull(requestIn)
}
}
}

override def onPull(): Unit = pull(requestIn)
})
setHandlers(responseIn, responseOut,
new InHandler with OutHandler {
override def onPush(): Unit = push(responseOut, grab(responseIn))
override def onPull(): Unit = if (!hasBeenPulled(responseIn)) pull(responseIn)
})
override def onPull(): Unit = pull(requestIn)
})

}
}
}
Loading
Loading