Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add no-op NIOHTTP2ConnectionDemultiplexer #377

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions Sources/NIOHTTP2/HTTP2ChannelHandler+ConnectionDemultiplexer.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOCore

/// Demultiplexes inbound HTTP/2 frames on a connection into HTTP/2 streams.
internal protocol NIOHTTP2ConnectionDemultiplexer {
/// An HTTP/2 frame has been received from the remote peer.
func receivedFrame(_ frame: HTTP2Frame)

/// A stream error was thrown when trying to send an outbound frame.
func streamError(streamID: HTTP2StreamID, error: Error)

/// A new HTTP/2 stream was created with the given ID.
func streamCreated(event: NIOHTTP2StreamCreatedEvent)

/// An HTTP/2 stream with the given ID was closed.
func streamClosed(event: StreamClosedEvent)

/// The flow control windows of the HTTP/2 stream changed.
func streamWindowUpdated(event: NIOHTTP2WindowUpdatedEvent)

/// The initial stream window for all streams changed by the given amount.
func initialStreamWindowChanged(event: NIOHTTP2BulkStreamWindowChangeEvent)
}

extension NIOHTTP2Handler {
/// Abstracts over the integrated stream multiplexing (new) and the chained channel handler (legacy) multiplexing approaches.
///
/// We use an enum for this purpose since we can't use a generic (for API compatibility reasons) and it allows us to avoid the cost of using an existential.
internal enum ConnectionDemultiplexer: NIOHTTP2ConnectionDemultiplexer {
case legacy(LegacyHTTP2ConnectionDemultiplexer)
case new

func receivedFrame(_ frame: HTTP2Frame) {
switch self {
case .legacy(let demultiplexer):
demultiplexer.receivedFrame(frame)
case .new:
fatalError("Not yet implemented.")
}
}

func streamError(streamID: HTTP2StreamID, error: Error) {
switch self {
case .legacy(let demultiplexer):
demultiplexer.streamError(streamID: streamID, error: error)
case .new:
fatalError("Not yet implemented.")
}
}

func streamCreated(event: NIOHTTP2StreamCreatedEvent) {
switch self {
case .legacy(let demultiplexer):
demultiplexer.streamCreated(event: event)
case .new:
fatalError("Not yet implemented.")
}
}

func streamClosed(event: StreamClosedEvent) {
switch self {
case .legacy(let demultiplexer):
demultiplexer.streamClosed(event: event)
case .new:
fatalError("Not yet implemented.")
}
}

func streamWindowUpdated(event: NIOHTTP2WindowUpdatedEvent) {
switch self {
case .legacy(let demultiplexer):
demultiplexer.streamWindowUpdated(event: event)
case .new:
fatalError("Not yet implemented.")
}
}

func initialStreamWindowChanged(event: NIOHTTP2BulkStreamWindowChangeEvent) {
switch self {
case .legacy(let demultiplexer):
demultiplexer.initialStreamWindowChanged(event: event)
case .new:
fatalError("Not yet implemented.")
}
}
}
}

/// Provides a 'demultiplexer' interface for legacy compatibility.
///
/// This doesn't actually do any demultiplexing but communicates with the `HTTP2StreamChannel` which does - mostly via `UserInboundEvent`s.
internal struct LegacyHTTP2ConnectionDemultiplexer {
let context: ChannelHandlerContext
}

extension LegacyHTTP2ConnectionDemultiplexer: NIOHTTP2ConnectionDemultiplexer {
func receivedFrame(_ frame: HTTP2Frame) {
self.context.fireChannelRead(NIOAny(frame))
}

func streamError(streamID: HTTP2StreamID, error: Error) {
self.context.fireErrorCaught(NIOHTTP2Errors.streamError(streamID: streamID, baseError: error))
}

func streamCreated(event: NIOHTTP2StreamCreatedEvent) {
self.context.fireUserInboundEventTriggered(event)
}

func streamClosed(event: StreamClosedEvent) {
self.context.fireUserInboundEventTriggered(event)
}

func streamWindowUpdated(event: NIOHTTP2WindowUpdatedEvent) {
self.context.fireUserInboundEventTriggered(event)
}

func initialStreamWindowChanged(event: NIOHTTP2BulkStreamWindowChangeEvent) {
self.context.fireUserInboundEventTriggered(event)
}
}
33 changes: 27 additions & 6 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
/// trigger them.
private let tolerateImpossibleStateTransitionsInDebugMode: Bool

/// The delegate for demultiplexing streams. Set on `handlerAdded` and `nil`-ed out on `handlerRemoved`.
private var demultiplexer: ConnectionDemultiplexer?

/// The mode for this parser to operate in: client or server.
public enum ParserMode {
/// Client mode
Expand Down Expand Up @@ -195,6 +198,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
self.frameDecoder = HTTP2FrameDecoder(allocator: context.channel.allocator, expectClientMagic: self.mode == .server)
self.frameEncoder = HTTP2FrameEncoder(allocator: context.channel.allocator)
self.writeBuffer = context.channel.allocator.buffer(capacity: 128)
self.demultiplexer = .legacy(LegacyHTTP2ConnectionDemultiplexer(context: context))

if context.channel.isActive {
// We jump immediately to activated here, as channelActive has probably already passed.
Expand All @@ -209,6 +213,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
public func handlerRemoved(context: ChannelHandlerContext) {
// Any frames we're buffering need to be dropped.
self.outboundBuffer.invalidateBuffer()
self.demultiplexer = nil
}

public func channelActive(context: ChannelHandlerContext) {
Expand Down Expand Up @@ -443,7 +448,7 @@ extension NIOHTTP2Handler {
switch result.result {
case .succeed:
// Frame is good, we can pass it on.
context.fireChannelRead(self.wrapInboundOut(frame))
self.demultiplexer?.receivedFrame(frame)
returnValue = .continue
case .ignoreFrame:
// Frame is good but no action needs to be taken.
Expand Down Expand Up @@ -491,7 +496,7 @@ extension NIOHTTP2Handler {
/// Emit any pending user events.
private func processPendingUserEvents(context: ChannelHandlerContext) {
for event in self.inboundEventBuffer {
context.fireUserInboundEventTriggered(event)
self.demultiplexer?.process(event: event)
}
}

Expand All @@ -505,6 +510,21 @@ extension NIOHTTP2Handler {
}
}

extension NIOHTTP2Handler.ConnectionDemultiplexer {
func process(event: InboundEventBuffer.BufferedHTTP2UserEvent) {
switch event {
case .streamCreated(let event):
self.streamCreated(event: event)
case .initialStreamWindowChanged(let event):
self.initialStreamWindowChanged(event: event)
case .streamWindowUpdated(let event):
self.streamWindowUpdated(event: event)
case .streamClosed(let event):
self.streamClosed(event: event)
}
}
}


/// Outbound frame handling.
extension NIOHTTP2Handler {
Expand Down Expand Up @@ -650,7 +670,7 @@ extension NIOHTTP2Handler {
/// A stream error was hit while attempting to send a frame.
private func outboundStreamErrorTriggered(context: ChannelHandlerContext, promise: EventLoopPromise<Void>?, streamID: HTTP2StreamID, underlyingError: Error) {
promise?.fail(underlyingError)
context.fireErrorCaught(NIOHTTP2Errors.streamError(streamID: streamID, baseError: underlyingError))
self.demultiplexer?.streamError(streamID: streamID, error: underlyingError)
}
}

Expand All @@ -672,9 +692,10 @@ extension NIOHTTP2Handler {
self.failDroppedPromises(droppedPromises, streamID: streamClosedData.streamID, errorCode: streamClosedData.reason ?? .cancel)
case .streamCreated(let streamCreatedData):
self.outboundBuffer.streamCreated(streamCreatedData.streamID, initialWindowSize: streamCreatedData.localStreamWindowSize.map(UInt32.init) ?? 0)
self.inboundEventBuffer.pendingUserEvent(NIOHTTP2StreamCreatedEvent(streamID: streamCreatedData.streamID,
localInitialWindowSize: streamCreatedData.localStreamWindowSize.map(UInt32.init),
remoteInitialWindowSize: streamCreatedData.remoteStreamWindowSize.map(UInt32.init)))
self.inboundEventBuffer.pendingUserEvent(
NIOHTTP2StreamCreatedEvent(streamID: streamCreatedData.streamID,
localInitialWindowSize: streamCreatedData.localStreamWindowSize.map(UInt32.init),
remoteInitialWindowSize: streamCreatedData.remoteStreamWindowSize.map(UInt32.init)))
case .bulkStreamClosure(let streamClosureData):
for droppedStream in streamClosureData.closedStreams {
self.inboundEventBuffer.pendingUserEvent(StreamClosedEvent(streamID: droppedStream, reason: .cancel))
Expand Down
10 changes: 8 additions & 2 deletions Sources/NIOHTTP2/HTTP2UserEvents.swift
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,18 @@ public struct NIOHTTP2StreamCreatedEvent {
public let localInitialWindowSize: UInt32?

/// The initial remote stream window size. May be nil if this stream may never have data received on it.
public let remoteInitialWidowSize: UInt32?
@available(*, deprecated, renamed: "remoteInitialWindowSize")
public var remoteInitialWidowSize: UInt32? {
self.remoteInitialWindowSize
}

/// The initial remote stream window size. May be nil if this stream has never had data received on it.
public let remoteInitialWindowSize: UInt32?

public init(streamID: HTTP2StreamID, localInitialWindowSize: UInt32?, remoteInitialWindowSize: UInt32?) {
self.streamID = streamID
self.localInitialWindowSize = localInitialWindowSize
self.remoteInitialWidowSize = remoteInitialWindowSize
self.remoteInitialWindowSize = remoteInitialWindowSize
}
}

Expand Down
30 changes: 26 additions & 4 deletions Sources/NIOHTTP2/InboundEventBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,39 @@ import NIOCore
/// have always delivered all pending user events before we deliver a frame.
/// Deliberately not threadsafe or `Sendable`.
class InboundEventBuffer {
fileprivate var buffer: CircularBuffer<Any> = CircularBuffer(initialCapacity: 8)
fileprivate var buffer: CircularBuffer<BufferedHTTP2UserEvent> = CircularBuffer(initialCapacity: 8)

func pendingUserEvent(_ event: Any) {
self.buffer.append(event)
func pendingUserEvent(_ event: NIOHTTP2StreamCreatedEvent) {
self.buffer.append(.streamCreated(event))
}

func pendingUserEvent(_ event: StreamClosedEvent) {
self.buffer.append(.streamClosed(event))
}

func pendingUserEvent(_ event: NIOHTTP2WindowUpdatedEvent) {
self.buffer.append(.streamWindowUpdated(event))
}

func pendingUserEvent(_ event: NIOHTTP2BulkStreamWindowChangeEvent) {
self.buffer.append(.initialStreamWindowChanged(event))
}

/// Wraps user event types.
///
/// This allows us to buffer and pass around the events without making use of an existential.
enum BufferedHTTP2UserEvent {
case streamCreated(NIOHTTP2StreamCreatedEvent)
case streamClosed(StreamClosedEvent)
case streamWindowUpdated(NIOHTTP2WindowUpdatedEvent)
case initialStreamWindowChanged(NIOHTTP2BulkStreamWindowChangeEvent)
}
}


// MARK:- Sequence conformance
extension InboundEventBuffer: Sequence {
typealias Element = Any
typealias Element = BufferedHTTP2UserEvent

func makeIterator() -> InboundEventBufferIterator {
return InboundEventBufferIterator(self)
Expand Down