diff --git a/Sources/NIOHTTP2/HTTP2ChannelHandler+ConnectionDemultiplexer.swift b/Sources/NIOHTTP2/HTTP2ChannelHandler+ConnectionDemultiplexer.swift new file mode 100644 index 00000000..c47a28c6 --- /dev/null +++ b/Sources/NIOHTTP2/HTTP2ChannelHandler+ConnectionDemultiplexer.swift @@ -0,0 +1,133 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +/// Demultiplexes inbound HTTP/2 frames on a connection into HTTP/2 streams. +internal protocol NIOHTTP2ConnectionDemultiplexer { + /// An HTTP/2 frame has been received from the remote peer. + func receivedFrame(_ frame: HTTP2Frame) + + /// A stream error was thrown when trying to send an outbound frame. + func streamError(streamID: HTTP2StreamID, error: Error) + + /// A new HTTP/2 stream was created with the given ID. + func streamCreated(event: NIOHTTP2StreamCreatedEvent) + + /// An HTTP/2 stream with the given ID was closed. + func streamClosed(event: StreamClosedEvent) + + /// The flow control windows of the HTTP/2 stream changed. + func streamWindowUpdated(event: NIOHTTP2WindowUpdatedEvent) + + /// The initial stream window for all streams changed by the given amount. + func initialStreamWindowChanged(event: NIOHTTP2BulkStreamWindowChangeEvent) +} + +extension NIOHTTP2Handler { + /// Abstracts over the integrated stream multiplexing (new) and the chained channel handler (legacy) multiplexing approaches. + /// + /// We use an enum for this purpose since we can't use a generic (for API compatibility reasons) and it allows us to avoid the cost of using an existential. + internal enum ConnectionDemultiplexer: NIOHTTP2ConnectionDemultiplexer { + case legacy(LegacyHTTP2ConnectionDemultiplexer) + case new + + func receivedFrame(_ frame: HTTP2Frame) { + switch self { + case .legacy(let demultiplexer): + demultiplexer.receivedFrame(frame) + case .new: + fatalError("Not yet implemented.") + } + } + + func streamError(streamID: HTTP2StreamID, error: Error) { + switch self { + case .legacy(let demultiplexer): + demultiplexer.streamError(streamID: streamID, error: error) + case .new: + fatalError("Not yet implemented.") + } + } + + func streamCreated(event: NIOHTTP2StreamCreatedEvent) { + switch self { + case .legacy(let demultiplexer): + demultiplexer.streamCreated(event: event) + case .new: + fatalError("Not yet implemented.") + } + } + + func streamClosed(event: StreamClosedEvent) { + switch self { + case .legacy(let demultiplexer): + demultiplexer.streamClosed(event: event) + case .new: + fatalError("Not yet implemented.") + } + } + + func streamWindowUpdated(event: NIOHTTP2WindowUpdatedEvent) { + switch self { + case .legacy(let demultiplexer): + demultiplexer.streamWindowUpdated(event: event) + case .new: + fatalError("Not yet implemented.") + } + } + + func initialStreamWindowChanged(event: NIOHTTP2BulkStreamWindowChangeEvent) { + switch self { + case .legacy(let demultiplexer): + demultiplexer.initialStreamWindowChanged(event: event) + case .new: + fatalError("Not yet implemented.") + } + } + } +} + +/// Provides a 'demultiplexer' interface for legacy compatibility. +/// +/// This doesn't actually do any demultiplexing but communicates with the `HTTP2StreamChannel` which does - mostly via `UserInboundEvent`s. +internal struct LegacyHTTP2ConnectionDemultiplexer { + let context: ChannelHandlerContext +} + +extension LegacyHTTP2ConnectionDemultiplexer: NIOHTTP2ConnectionDemultiplexer { + func receivedFrame(_ frame: HTTP2Frame) { + self.context.fireChannelRead(NIOAny(frame)) + } + + func streamError(streamID: HTTP2StreamID, error: Error) { + self.context.fireErrorCaught(NIOHTTP2Errors.streamError(streamID: streamID, baseError: error)) + } + + func streamCreated(event: NIOHTTP2StreamCreatedEvent) { + self.context.fireUserInboundEventTriggered(event) + } + + func streamClosed(event: StreamClosedEvent) { + self.context.fireUserInboundEventTriggered(event) + } + + func streamWindowUpdated(event: NIOHTTP2WindowUpdatedEvent) { + self.context.fireUserInboundEventTriggered(event) + } + + func initialStreamWindowChanged(event: NIOHTTP2BulkStreamWindowChangeEvent) { + self.context.fireUserInboundEventTriggered(event) + } +} diff --git a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift index 5017494c..1e67ba34 100644 --- a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift +++ b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift @@ -99,6 +99,9 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { /// trigger them. private let tolerateImpossibleStateTransitionsInDebugMode: Bool + /// The delegate for demultiplexing streams. Set on `handlerAdded` and `nil`-ed out on `handlerRemoved`. + private var demultiplexer: ConnectionDemultiplexer? + /// The mode for this parser to operate in: client or server. public enum ParserMode { /// Client mode @@ -195,6 +198,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { self.frameDecoder = HTTP2FrameDecoder(allocator: context.channel.allocator, expectClientMagic: self.mode == .server) self.frameEncoder = HTTP2FrameEncoder(allocator: context.channel.allocator) self.writeBuffer = context.channel.allocator.buffer(capacity: 128) + self.demultiplexer = .legacy(LegacyHTTP2ConnectionDemultiplexer(context: context)) if context.channel.isActive { // We jump immediately to activated here, as channelActive has probably already passed. @@ -209,6 +213,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { public func handlerRemoved(context: ChannelHandlerContext) { // Any frames we're buffering need to be dropped. self.outboundBuffer.invalidateBuffer() + self.demultiplexer = nil } public func channelActive(context: ChannelHandlerContext) { @@ -443,7 +448,7 @@ extension NIOHTTP2Handler { switch result.result { case .succeed: // Frame is good, we can pass it on. - context.fireChannelRead(self.wrapInboundOut(frame)) + self.demultiplexer?.receivedFrame(frame) returnValue = .continue case .ignoreFrame: // Frame is good but no action needs to be taken. @@ -491,7 +496,7 @@ extension NIOHTTP2Handler { /// Emit any pending user events. private func processPendingUserEvents(context: ChannelHandlerContext) { for event in self.inboundEventBuffer { - context.fireUserInboundEventTriggered(event) + self.demultiplexer?.process(event: event) } } @@ -505,6 +510,21 @@ extension NIOHTTP2Handler { } } +extension NIOHTTP2Handler.ConnectionDemultiplexer { + func process(event: InboundEventBuffer.BufferedHTTP2UserEvent) { + switch event { + case .streamCreated(let event): + self.streamCreated(event: event) + case .initialStreamWindowChanged(let event): + self.initialStreamWindowChanged(event: event) + case .streamWindowUpdated(let event): + self.streamWindowUpdated(event: event) + case .streamClosed(let event): + self.streamClosed(event: event) + } + } +} + /// Outbound frame handling. extension NIOHTTP2Handler { @@ -650,7 +670,7 @@ extension NIOHTTP2Handler { /// A stream error was hit while attempting to send a frame. private func outboundStreamErrorTriggered(context: ChannelHandlerContext, promise: EventLoopPromise?, streamID: HTTP2StreamID, underlyingError: Error) { promise?.fail(underlyingError) - context.fireErrorCaught(NIOHTTP2Errors.streamError(streamID: streamID, baseError: underlyingError)) + self.demultiplexer?.streamError(streamID: streamID, error: underlyingError) } } @@ -672,9 +692,10 @@ extension NIOHTTP2Handler { self.failDroppedPromises(droppedPromises, streamID: streamClosedData.streamID, errorCode: streamClosedData.reason ?? .cancel) case .streamCreated(let streamCreatedData): self.outboundBuffer.streamCreated(streamCreatedData.streamID, initialWindowSize: streamCreatedData.localStreamWindowSize.map(UInt32.init) ?? 0) - self.inboundEventBuffer.pendingUserEvent(NIOHTTP2StreamCreatedEvent(streamID: streamCreatedData.streamID, - localInitialWindowSize: streamCreatedData.localStreamWindowSize.map(UInt32.init), - remoteInitialWindowSize: streamCreatedData.remoteStreamWindowSize.map(UInt32.init))) + self.inboundEventBuffer.pendingUserEvent( + NIOHTTP2StreamCreatedEvent(streamID: streamCreatedData.streamID, + localInitialWindowSize: streamCreatedData.localStreamWindowSize.map(UInt32.init), + remoteInitialWindowSize: streamCreatedData.remoteStreamWindowSize.map(UInt32.init))) case .bulkStreamClosure(let streamClosureData): for droppedStream in streamClosureData.closedStreams { self.inboundEventBuffer.pendingUserEvent(StreamClosedEvent(streamID: droppedStream, reason: .cancel)) diff --git a/Sources/NIOHTTP2/HTTP2UserEvents.swift b/Sources/NIOHTTP2/HTTP2UserEvents.swift index 8c79efb1..b80cd6ff 100644 --- a/Sources/NIOHTTP2/HTTP2UserEvents.swift +++ b/Sources/NIOHTTP2/HTTP2UserEvents.swift @@ -95,12 +95,18 @@ public struct NIOHTTP2StreamCreatedEvent { public let localInitialWindowSize: UInt32? /// The initial remote stream window size. May be nil if this stream may never have data received on it. - public let remoteInitialWidowSize: UInt32? + @available(*, deprecated, renamed: "remoteInitialWindowSize") + public var remoteInitialWidowSize: UInt32? { + self.remoteInitialWindowSize + } + + /// The initial remote stream window size. May be nil if this stream has never had data received on it. + public let remoteInitialWindowSize: UInt32? public init(streamID: HTTP2StreamID, localInitialWindowSize: UInt32?, remoteInitialWindowSize: UInt32?) { self.streamID = streamID self.localInitialWindowSize = localInitialWindowSize - self.remoteInitialWidowSize = remoteInitialWindowSize + self.remoteInitialWindowSize = remoteInitialWindowSize } } diff --git a/Sources/NIOHTTP2/InboundEventBuffer.swift b/Sources/NIOHTTP2/InboundEventBuffer.swift index 420125c6..80193cf3 100644 --- a/Sources/NIOHTTP2/InboundEventBuffer.swift +++ b/Sources/NIOHTTP2/InboundEventBuffer.swift @@ -24,17 +24,39 @@ import NIOCore /// have always delivered all pending user events before we deliver a frame. /// Deliberately not threadsafe or `Sendable`. class InboundEventBuffer { - fileprivate var buffer: CircularBuffer = CircularBuffer(initialCapacity: 8) + fileprivate var buffer: CircularBuffer = CircularBuffer(initialCapacity: 8) - func pendingUserEvent(_ event: Any) { - self.buffer.append(event) + func pendingUserEvent(_ event: NIOHTTP2StreamCreatedEvent) { + self.buffer.append(.streamCreated(event)) + } + + func pendingUserEvent(_ event: StreamClosedEvent) { + self.buffer.append(.streamClosed(event)) + } + + func pendingUserEvent(_ event: NIOHTTP2WindowUpdatedEvent) { + self.buffer.append(.streamWindowUpdated(event)) + } + + func pendingUserEvent(_ event: NIOHTTP2BulkStreamWindowChangeEvent) { + self.buffer.append(.initialStreamWindowChanged(event)) + } + + /// Wraps user event types. + /// + /// This allows us to buffer and pass around the events without making use of an existential. + enum BufferedHTTP2UserEvent { + case streamCreated(NIOHTTP2StreamCreatedEvent) + case streamClosed(StreamClosedEvent) + case streamWindowUpdated(NIOHTTP2WindowUpdatedEvent) + case initialStreamWindowChanged(NIOHTTP2BulkStreamWindowChangeEvent) } } // MARK:- Sequence conformance extension InboundEventBuffer: Sequence { - typealias Element = Any + typealias Element = BufferedHTTP2UserEvent func makeIterator() -> InboundEventBufferIterator { return InboundEventBufferIterator(self)