From 280fffa62bd0feed6326c5d95ec02b53f8062d99 Mon Sep 17 00:00:00 2001 From: Rick Newton-Rogers Date: Fri, 3 Mar 2023 10:41:27 +0000 Subject: [PATCH 1/3] Add no-op NIOHTTP2ConnectionDemultiplexer Motivation: This change prepares for future work to embed the connection demultiplexer within the `HTTP2ChannelHandler` which will improve performance by removing the need for passing expensive `InboundUserEvent`s. Modifications: This first step defines a new protocol (`NIOHTTP2ConnectionDemultiplexer`) which will later be used to abstract over the embedded and legacy separate demultiplexer code but at the moment should not change the logic. Result: Behaviour should be unchanged. --- ...annelHandler+ConnectionDemultiplexer.swift | 133 ++++++++++++++++++ Sources/NIOHTTP2/HTTP2ChannelHandler.swift | 41 ++++-- Sources/NIOHTTP2/HTTP2UserEvents.swift | 10 +- Sources/NIOHTTP2/InboundEventBuffer.swift | 16 ++- 4 files changed, 183 insertions(+), 17 deletions(-) create mode 100644 Sources/NIOHTTP2/HTTP2ChannelHandler+ConnectionDemultiplexer.swift diff --git a/Sources/NIOHTTP2/HTTP2ChannelHandler+ConnectionDemultiplexer.swift b/Sources/NIOHTTP2/HTTP2ChannelHandler+ConnectionDemultiplexer.swift new file mode 100644 index 00000000..0d7b0a56 --- /dev/null +++ b/Sources/NIOHTTP2/HTTP2ChannelHandler+ConnectionDemultiplexer.swift @@ -0,0 +1,133 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +/// Demultiplexes inbound HTTP/2 frames on a connection into HTTP/2 streams. +internal protocol NIOHTTP2ConnectionDemultiplexer { + /// An HTTP/2 frame has been received from the remote peer. + func receivedFrame(_ frame: HTTP2Frame) + + /// A stream error was thrown when trying to send an outbound frame. + func streamError(streamID: HTTP2StreamID, error: Error) + + /// A new HTTP/2 stream was created with the given ID. + func streamCreated(_ id: HTTP2StreamID, localInitialWindowSize: UInt32?, remoteInitialWindowSize: UInt32?) + + /// An HTTP/2 stream with the given ID was closed. + func streamClosed(_ id: HTTP2StreamID, reason: HTTP2ErrorCode?) + + /// The flow control windows of the HTTP/2 stream changed. + func streamWindowUpdated(_ id: HTTP2StreamID, inboundWindowSize: Int?, outboundWindowSize: Int?) + + /// The initial stream window for all streams changed by the given amount. + func initialStreamWindowChanged(by delta: Int) +} + +extension NIOHTTP2Handler { + /// Abstracts over the integrated stream multiplexing (new) and the chained channel handler (legacy) multiplexing approaches. + /// + /// We use an enum for this purpose since we can't use a generic (for API compatibility reasons) and it allows us to avoid the cost of using an existential. + internal enum ConnectionDemultiplexer: NIOHTTP2ConnectionDemultiplexer { + case legacy(LegacyHTTP2ConnectionDemultiplexer) + case new + + func receivedFrame(_ frame: HTTP2Frame) { + switch self { + case .legacy(let demultiplexer): + demultiplexer.receivedFrame(frame) + case .new: + fatalError("Not yet implemented.") + } + } + + func streamError(streamID: HTTP2StreamID, error: Error) { + switch self { + case .legacy(let demultiplexer): + demultiplexer.streamError(streamID: streamID, error: error) + case .new: + fatalError("Not yet implemented.") + } + } + + func streamCreated(_ id: HTTP2StreamID, localInitialWindowSize: UInt32?, remoteInitialWindowSize: UInt32?) { + switch self { + case .legacy(let demultiplexer): + demultiplexer.streamCreated(id, localInitialWindowSize: localInitialWindowSize, remoteInitialWindowSize: remoteInitialWindowSize) + case .new: + fatalError("Not yet implemented.") + } + } + + func streamClosed(_ id: HTTP2StreamID, reason: HTTP2ErrorCode?) { + switch self { + case .legacy(let demultiplexer): + demultiplexer.streamClosed(id, reason: reason) + case .new: + fatalError("Not yet implemented.") + } + } + + func streamWindowUpdated(_ id: HTTP2StreamID, inboundWindowSize: Int?, outboundWindowSize: Int?) { + switch self { + case .legacy(let demultiplexer): + demultiplexer.streamWindowUpdated(id, inboundWindowSize: inboundWindowSize, outboundWindowSize: outboundWindowSize) + case .new: + fatalError("Not yet implemented.") + } + } + + func initialStreamWindowChanged(by delta: Int) { + switch self { + case .legacy(let demultiplexer): + demultiplexer.initialStreamWindowChanged(by: delta) + case .new: + fatalError("Not yet implemented.") + } + } + } +} + +/// Provides a 'demultiplexer' interface for legacy compatibility. +/// +/// This doesn't actually do any demultiplexing but communicates with the `HTTP2StreamChannel` which does - mostly via `UserInboundEvent`s. +internal struct LegacyHTTP2ConnectionDemultiplexer { + let context: ChannelHandlerContext +} + +extension LegacyHTTP2ConnectionDemultiplexer: NIOHTTP2ConnectionDemultiplexer { + func receivedFrame(_ frame: HTTP2Frame) { + self.context.fireChannelRead(NIOAny(frame)) + } + + func streamError(streamID: HTTP2StreamID, error: Error) { + self.context.fireErrorCaught(NIOHTTP2Errors.streamError(streamID: streamID, baseError: error)) + } + + func streamCreated(_ id: HTTP2StreamID, localInitialWindowSize: UInt32?, remoteInitialWindowSize: UInt32?) { + self.context.fireUserInboundEventTriggered(NIOHTTP2StreamCreatedEvent(streamID: id, localInitialWindowSize: localInitialWindowSize, remoteInitialWindowSize: remoteInitialWindowSize)) + } + + func streamClosed(_ id: HTTP2StreamID, reason: HTTP2ErrorCode?) { + self.context.fireUserInboundEventTriggered(StreamClosedEvent(streamID: id, reason: reason)) + } + + func streamWindowUpdated(_ id: HTTP2StreamID, inboundWindowSize: Int?, outboundWindowSize: Int?) { + self.context.fireUserInboundEventTriggered(NIOHTTP2WindowUpdatedEvent(streamID: id, inboundWindowSize: inboundWindowSize, outboundWindowSize: outboundWindowSize)) + } + + func initialStreamWindowChanged(by delta: Int) { + self.context.fireUserInboundEventTriggered(NIOHTTP2BulkStreamWindowChangeEvent(delta: delta)) + } +} diff --git a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift index 5017494c..07fffe6b 100644 --- a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift +++ b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift @@ -99,6 +99,10 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { /// trigger them. private let tolerateImpossibleStateTransitionsInDebugMode: Bool + /// The delegate for demultiplexing streams. Set on `handlerAdded` and `nil`-ed out on + /// `channelInactive` and `handlerRemoved`. + private var demultiplexer: ConnectionDemultiplexer? + /// The mode for this parser to operate in: client or server. public enum ParserMode { /// Client mode @@ -195,6 +199,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { self.frameDecoder = HTTP2FrameDecoder(allocator: context.channel.allocator, expectClientMagic: self.mode == .server) self.frameEncoder = HTTP2FrameEncoder(allocator: context.channel.allocator) self.writeBuffer = context.channel.allocator.buffer(capacity: 128) + self.demultiplexer = .legacy(LegacyHTTP2ConnectionDemultiplexer(context: context)) if context.channel.isActive { // We jump immediately to activated here, as channelActive has probably already passed. @@ -209,6 +214,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { public func handlerRemoved(context: ChannelHandlerContext) { // Any frames we're buffering need to be dropped. self.outboundBuffer.invalidateBuffer() + self.demultiplexer = nil } public func channelActive(context: ChannelHandlerContext) { @@ -277,6 +283,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { // This is the easy one. We were active, now we aren't. self.activationState = .inactive self.channelClosed = true + self.demultiplexer = nil context.fireChannelInactive() case .activating: // Huh, we got channelInactive during activation. We need to maintain @@ -443,7 +450,7 @@ extension NIOHTTP2Handler { switch result.result { case .succeed: // Frame is good, we can pass it on. - context.fireChannelRead(self.wrapInboundOut(frame)) + self.demultiplexer?.receivedFrame(frame) returnValue = .continue case .ignoreFrame: // Frame is good but no action needs to be taken. @@ -491,7 +498,16 @@ extension NIOHTTP2Handler { /// Emit any pending user events. private func processPendingUserEvents(context: ChannelHandlerContext) { for event in self.inboundEventBuffer { - context.fireUserInboundEventTriggered(event) + switch event { + case .streamCreated(let event): + self.demultiplexer?.streamCreated(event.streamID, localInitialWindowSize: event.localInitialWindowSize, remoteInitialWindowSize: event.remoteInitialWindowSize) + case .initialStreamWindowChanged(let event): + self.demultiplexer?.initialStreamWindowChanged(by: event.delta) + case .streamWindowUpdated(let event): + self.demultiplexer?.streamWindowUpdated(event.streamID, inboundWindowSize: event.inboundWindowSize, outboundWindowSize: event.outboundWindowSize) + case .streamClosed(let event): + self.demultiplexer?.streamClosed(event.streamID, reason: event.reason) + } } } @@ -650,7 +666,7 @@ extension NIOHTTP2Handler { /// A stream error was hit while attempting to send a frame. private func outboundStreamErrorTriggered(context: ChannelHandlerContext, promise: EventLoopPromise?, streamID: HTTP2StreamID, underlyingError: Error) { promise?.fail(underlyingError) - context.fireErrorCaught(NIOHTTP2Errors.streamError(streamID: streamID, baseError: underlyingError)) + self.demultiplexer?.streamError(streamID: streamID, error: underlyingError) } } @@ -665,29 +681,30 @@ extension NIOHTTP2Handler { switch stateChange { case .streamClosed(let streamClosedData): self.outboundBuffer.connectionWindowSize = streamClosedData.localConnectionWindowSize - self.inboundEventBuffer.pendingUserEvent(StreamClosedEvent(streamID: streamClosedData.streamID, reason: streamClosedData.reason)) - self.inboundEventBuffer.pendingUserEvent(NIOHTTP2WindowUpdatedEvent(streamID: .rootStream, inboundWindowSize: streamClosedData.remoteConnectionWindowSize, outboundWindowSize: streamClosedData.localConnectionWindowSize)) + self.inboundEventBuffer.pendingUserEvent(.streamClosed(StreamClosedEvent(streamID: streamClosedData.streamID, reason: streamClosedData.reason))) + self.inboundEventBuffer.pendingUserEvent(.streamWindowUpdated(NIOHTTP2WindowUpdatedEvent(streamID: .rootStream, inboundWindowSize: streamClosedData.remoteConnectionWindowSize, outboundWindowSize: streamClosedData.localConnectionWindowSize))) let droppedPromises = self.outboundBuffer.streamClosed(streamClosedData.streamID) self.failDroppedPromises(droppedPromises, streamID: streamClosedData.streamID, errorCode: streamClosedData.reason ?? .cancel) case .streamCreated(let streamCreatedData): self.outboundBuffer.streamCreated(streamCreatedData.streamID, initialWindowSize: streamCreatedData.localStreamWindowSize.map(UInt32.init) ?? 0) - self.inboundEventBuffer.pendingUserEvent(NIOHTTP2StreamCreatedEvent(streamID: streamCreatedData.streamID, - localInitialWindowSize: streamCreatedData.localStreamWindowSize.map(UInt32.init), - remoteInitialWindowSize: streamCreatedData.remoteStreamWindowSize.map(UInt32.init))) + self.inboundEventBuffer.pendingUserEvent( + .streamCreated(NIOHTTP2StreamCreatedEvent(streamID: streamCreatedData.streamID, + localInitialWindowSize: streamCreatedData.localStreamWindowSize.map(UInt32.init), + remoteInitialWindowSize: streamCreatedData.remoteStreamWindowSize.map(UInt32.init)))) case .bulkStreamClosure(let streamClosureData): for droppedStream in streamClosureData.closedStreams { - self.inboundEventBuffer.pendingUserEvent(StreamClosedEvent(streamID: droppedStream, reason: .cancel)) + self.inboundEventBuffer.pendingUserEvent(.streamClosed(StreamClosedEvent(streamID: droppedStream, reason: .cancel))) let droppedPromises = self.outboundBuffer.streamClosed(droppedStream) self.failDroppedPromises(droppedPromises, streamID: droppedStream, errorCode: .cancel) } case .flowControlChange(let change): self.outboundBuffer.connectionWindowSize = change.localConnectionWindowSize - self.inboundEventBuffer.pendingUserEvent(NIOHTTP2WindowUpdatedEvent(streamID: .rootStream, inboundWindowSize: change.remoteConnectionWindowSize, outboundWindowSize: change.localConnectionWindowSize)) + self.inboundEventBuffer.pendingUserEvent(.streamWindowUpdated(NIOHTTP2WindowUpdatedEvent(streamID: .rootStream, inboundWindowSize: change.remoteConnectionWindowSize, outboundWindowSize: change.localConnectionWindowSize))) if let streamSize = change.localStreamWindowSize { self.outboundBuffer.updateStreamWindow(streamSize.streamID, newSize: streamSize.localStreamWindowSize.map(Int32.init) ?? 0) - self.inboundEventBuffer.pendingUserEvent(NIOHTTP2WindowUpdatedEvent(streamID: streamSize.streamID, inboundWindowSize: streamSize.remoteStreamWindowSize, outboundWindowSize: streamSize.localStreamWindowSize)) + self.inboundEventBuffer.pendingUserEvent(.streamWindowUpdated(NIOHTTP2WindowUpdatedEvent(streamID: streamSize.streamID, inboundWindowSize: streamSize.remoteStreamWindowSize, outboundWindowSize: streamSize.localStreamWindowSize))) } case .streamCreatedAndClosed(let cAndCData): self.outboundBuffer.streamCreated(cAndCData.streamID, initialWindowSize: 0) @@ -706,7 +723,7 @@ extension NIOHTTP2Handler { } case .localSettingsChanged(let settingsChange): if settingsChange.streamWindowSizeChange != 0 { - self.inboundEventBuffer.pendingUserEvent(NIOHTTP2BulkStreamWindowChangeEvent(delta: settingsChange.streamWindowSizeChange)) + self.inboundEventBuffer.pendingUserEvent(.initialStreamWindowChanged(NIOHTTP2BulkStreamWindowChangeEvent(delta: settingsChange.streamWindowSizeChange))) } if let newMaxFrameSize = settingsChange.newMaxFrameSize { self.frameDecoder.maxFrameSize = newMaxFrameSize diff --git a/Sources/NIOHTTP2/HTTP2UserEvents.swift b/Sources/NIOHTTP2/HTTP2UserEvents.swift index 8c79efb1..b80cd6ff 100644 --- a/Sources/NIOHTTP2/HTTP2UserEvents.swift +++ b/Sources/NIOHTTP2/HTTP2UserEvents.swift @@ -95,12 +95,18 @@ public struct NIOHTTP2StreamCreatedEvent { public let localInitialWindowSize: UInt32? /// The initial remote stream window size. May be nil if this stream may never have data received on it. - public let remoteInitialWidowSize: UInt32? + @available(*, deprecated, renamed: "remoteInitialWindowSize") + public var remoteInitialWidowSize: UInt32? { + self.remoteInitialWindowSize + } + + /// The initial remote stream window size. May be nil if this stream has never had data received on it. + public let remoteInitialWindowSize: UInt32? public init(streamID: HTTP2StreamID, localInitialWindowSize: UInt32?, remoteInitialWindowSize: UInt32?) { self.streamID = streamID self.localInitialWindowSize = localInitialWindowSize - self.remoteInitialWidowSize = remoteInitialWindowSize + self.remoteInitialWindowSize = remoteInitialWindowSize } } diff --git a/Sources/NIOHTTP2/InboundEventBuffer.swift b/Sources/NIOHTTP2/InboundEventBuffer.swift index 420125c6..c49ccb6b 100644 --- a/Sources/NIOHTTP2/InboundEventBuffer.swift +++ b/Sources/NIOHTTP2/InboundEventBuffer.swift @@ -24,17 +24,27 @@ import NIOCore /// have always delivered all pending user events before we deliver a frame. /// Deliberately not threadsafe or `Sendable`. class InboundEventBuffer { - fileprivate var buffer: CircularBuffer = CircularBuffer(initialCapacity: 8) + fileprivate var buffer: CircularBuffer = CircularBuffer(initialCapacity: 8) - func pendingUserEvent(_ event: Any) { + func pendingUserEvent(_ event: BufferedHTTP2UserEvent) { self.buffer.append(event) } + + /// Wraps user event types. + /// + /// This allows us to buffer and pass around the events without making use of a generic. + enum BufferedHTTP2UserEvent { + case streamCreated(NIOHTTP2StreamCreatedEvent) + case streamClosed(StreamClosedEvent) + case streamWindowUpdated(NIOHTTP2WindowUpdatedEvent) + case initialStreamWindowChanged(NIOHTTP2BulkStreamWindowChangeEvent) + } } // MARK:- Sequence conformance extension InboundEventBuffer: Sequence { - typealias Element = Any + typealias Element = BufferedHTTP2UserEvent func makeIterator() -> InboundEventBufferIterator { return InboundEventBufferIterator(self) From bfe93cea51b69c516331c78d532ec9316b20a946 Mon Sep 17 00:00:00 2001 From: Rick Newton-Rogers Date: Fri, 3 Mar 2023 13:48:43 +0000 Subject: [PATCH 2/3] review comments * no longer `nil` out demultiplexer on channelInactive * restructure calling `pendingUserEvent` & processing events --- Sources/NIOHTTP2/HTTP2ChannelHandler.swift | 46 ++++++++++++---------- Sources/NIOHTTP2/InboundEventBuffer.swift | 18 +++++++-- 2 files changed, 40 insertions(+), 24 deletions(-) diff --git a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift index 07fffe6b..e6af31f3 100644 --- a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift +++ b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift @@ -99,8 +99,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { /// trigger them. private let tolerateImpossibleStateTransitionsInDebugMode: Bool - /// The delegate for demultiplexing streams. Set on `handlerAdded` and `nil`-ed out on - /// `channelInactive` and `handlerRemoved`. + /// The delegate for demultiplexing streams. Set on `handlerAdded` and `nil`-ed out on `handlerRemoved`. private var demultiplexer: ConnectionDemultiplexer? /// The mode for this parser to operate in: client or server. @@ -283,7 +282,6 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { // This is the easy one. We were active, now we aren't. self.activationState = .inactive self.channelClosed = true - self.demultiplexer = nil context.fireChannelInactive() case .activating: // Huh, we got channelInactive during activation. We need to maintain @@ -498,16 +496,7 @@ extension NIOHTTP2Handler { /// Emit any pending user events. private func processPendingUserEvents(context: ChannelHandlerContext) { for event in self.inboundEventBuffer { - switch event { - case .streamCreated(let event): - self.demultiplexer?.streamCreated(event.streamID, localInitialWindowSize: event.localInitialWindowSize, remoteInitialWindowSize: event.remoteInitialWindowSize) - case .initialStreamWindowChanged(let event): - self.demultiplexer?.initialStreamWindowChanged(by: event.delta) - case .streamWindowUpdated(let event): - self.demultiplexer?.streamWindowUpdated(event.streamID, inboundWindowSize: event.inboundWindowSize, outboundWindowSize: event.outboundWindowSize) - case .streamClosed(let event): - self.demultiplexer?.streamClosed(event.streamID, reason: event.reason) - } + self.demultiplexer?.process(event: event) } } @@ -521,6 +510,21 @@ extension NIOHTTP2Handler { } } +extension NIOHTTP2Handler.ConnectionDemultiplexer { + func process(event: InboundEventBuffer.BufferedHTTP2UserEvent) { + switch event { + case .streamCreated(let event): + self.streamCreated(event.streamID, localInitialWindowSize: event.localInitialWindowSize, remoteInitialWindowSize: event.remoteInitialWindowSize) + case .initialStreamWindowChanged(let event): + self.initialStreamWindowChanged(by: event.delta) + case .streamWindowUpdated(let event): + self.streamWindowUpdated(event.streamID, inboundWindowSize: event.inboundWindowSize, outboundWindowSize: event.outboundWindowSize) + case .streamClosed(let event): + self.streamClosed(event.streamID, reason: event.reason) + } + } +} + /// Outbound frame handling. extension NIOHTTP2Handler { @@ -681,30 +685,30 @@ extension NIOHTTP2Handler { switch stateChange { case .streamClosed(let streamClosedData): self.outboundBuffer.connectionWindowSize = streamClosedData.localConnectionWindowSize - self.inboundEventBuffer.pendingUserEvent(.streamClosed(StreamClosedEvent(streamID: streamClosedData.streamID, reason: streamClosedData.reason))) - self.inboundEventBuffer.pendingUserEvent(.streamWindowUpdated(NIOHTTP2WindowUpdatedEvent(streamID: .rootStream, inboundWindowSize: streamClosedData.remoteConnectionWindowSize, outboundWindowSize: streamClosedData.localConnectionWindowSize))) + self.inboundEventBuffer.pendingUserEvent(StreamClosedEvent(streamID: streamClosedData.streamID, reason: streamClosedData.reason)) + self.inboundEventBuffer.pendingUserEvent(NIOHTTP2WindowUpdatedEvent(streamID: .rootStream, inboundWindowSize: streamClosedData.remoteConnectionWindowSize, outboundWindowSize: streamClosedData.localConnectionWindowSize)) let droppedPromises = self.outboundBuffer.streamClosed(streamClosedData.streamID) self.failDroppedPromises(droppedPromises, streamID: streamClosedData.streamID, errorCode: streamClosedData.reason ?? .cancel) case .streamCreated(let streamCreatedData): self.outboundBuffer.streamCreated(streamCreatedData.streamID, initialWindowSize: streamCreatedData.localStreamWindowSize.map(UInt32.init) ?? 0) self.inboundEventBuffer.pendingUserEvent( - .streamCreated(NIOHTTP2StreamCreatedEvent(streamID: streamCreatedData.streamID, + NIOHTTP2StreamCreatedEvent(streamID: streamCreatedData.streamID, localInitialWindowSize: streamCreatedData.localStreamWindowSize.map(UInt32.init), - remoteInitialWindowSize: streamCreatedData.remoteStreamWindowSize.map(UInt32.init)))) + remoteInitialWindowSize: streamCreatedData.remoteStreamWindowSize.map(UInt32.init))) case .bulkStreamClosure(let streamClosureData): for droppedStream in streamClosureData.closedStreams { - self.inboundEventBuffer.pendingUserEvent(.streamClosed(StreamClosedEvent(streamID: droppedStream, reason: .cancel))) + self.inboundEventBuffer.pendingUserEvent(StreamClosedEvent(streamID: droppedStream, reason: .cancel)) let droppedPromises = self.outboundBuffer.streamClosed(droppedStream) self.failDroppedPromises(droppedPromises, streamID: droppedStream, errorCode: .cancel) } case .flowControlChange(let change): self.outboundBuffer.connectionWindowSize = change.localConnectionWindowSize - self.inboundEventBuffer.pendingUserEvent(.streamWindowUpdated(NIOHTTP2WindowUpdatedEvent(streamID: .rootStream, inboundWindowSize: change.remoteConnectionWindowSize, outboundWindowSize: change.localConnectionWindowSize))) + self.inboundEventBuffer.pendingUserEvent(NIOHTTP2WindowUpdatedEvent(streamID: .rootStream, inboundWindowSize: change.remoteConnectionWindowSize, outboundWindowSize: change.localConnectionWindowSize)) if let streamSize = change.localStreamWindowSize { self.outboundBuffer.updateStreamWindow(streamSize.streamID, newSize: streamSize.localStreamWindowSize.map(Int32.init) ?? 0) - self.inboundEventBuffer.pendingUserEvent(.streamWindowUpdated(NIOHTTP2WindowUpdatedEvent(streamID: streamSize.streamID, inboundWindowSize: streamSize.remoteStreamWindowSize, outboundWindowSize: streamSize.localStreamWindowSize))) + self.inboundEventBuffer.pendingUserEvent(NIOHTTP2WindowUpdatedEvent(streamID: streamSize.streamID, inboundWindowSize: streamSize.remoteStreamWindowSize, outboundWindowSize: streamSize.localStreamWindowSize)) } case .streamCreatedAndClosed(let cAndCData): self.outboundBuffer.streamCreated(cAndCData.streamID, initialWindowSize: 0) @@ -723,7 +727,7 @@ extension NIOHTTP2Handler { } case .localSettingsChanged(let settingsChange): if settingsChange.streamWindowSizeChange != 0 { - self.inboundEventBuffer.pendingUserEvent(.initialStreamWindowChanged(NIOHTTP2BulkStreamWindowChangeEvent(delta: settingsChange.streamWindowSizeChange))) + self.inboundEventBuffer.pendingUserEvent(NIOHTTP2BulkStreamWindowChangeEvent(delta: settingsChange.streamWindowSizeChange)) } if let newMaxFrameSize = settingsChange.newMaxFrameSize { self.frameDecoder.maxFrameSize = newMaxFrameSize diff --git a/Sources/NIOHTTP2/InboundEventBuffer.swift b/Sources/NIOHTTP2/InboundEventBuffer.swift index c49ccb6b..80193cf3 100644 --- a/Sources/NIOHTTP2/InboundEventBuffer.swift +++ b/Sources/NIOHTTP2/InboundEventBuffer.swift @@ -26,13 +26,25 @@ import NIOCore class InboundEventBuffer { fileprivate var buffer: CircularBuffer = CircularBuffer(initialCapacity: 8) - func pendingUserEvent(_ event: BufferedHTTP2UserEvent) { - self.buffer.append(event) + func pendingUserEvent(_ event: NIOHTTP2StreamCreatedEvent) { + self.buffer.append(.streamCreated(event)) + } + + func pendingUserEvent(_ event: StreamClosedEvent) { + self.buffer.append(.streamClosed(event)) + } + + func pendingUserEvent(_ event: NIOHTTP2WindowUpdatedEvent) { + self.buffer.append(.streamWindowUpdated(event)) + } + + func pendingUserEvent(_ event: NIOHTTP2BulkStreamWindowChangeEvent) { + self.buffer.append(.initialStreamWindowChanged(event)) } /// Wraps user event types. /// - /// This allows us to buffer and pass around the events without making use of a generic. + /// This allows us to buffer and pass around the events without making use of an existential. enum BufferedHTTP2UserEvent { case streamCreated(NIOHTTP2StreamCreatedEvent) case streamClosed(StreamClosedEvent) From d51e9d7d1cc7f6a8b64c0fc77ffb0902a7ebd4d1 Mon Sep 17 00:00:00 2001 From: Rick Newton-Rogers Date: Mon, 6 Mar 2023 10:56:10 +0000 Subject: [PATCH 3/3] demultiplexer funcs use events, not components NIOHTTP2ConnectionDemultiplexer stream events now use event types directly rather than individual parameters which map to the event fields. --- ...annelHandler+ConnectionDemultiplexer.swift | 40 +++++++++---------- Sources/NIOHTTP2/HTTP2ChannelHandler.swift | 12 +++--- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/Sources/NIOHTTP2/HTTP2ChannelHandler+ConnectionDemultiplexer.swift b/Sources/NIOHTTP2/HTTP2ChannelHandler+ConnectionDemultiplexer.swift index 0d7b0a56..c47a28c6 100644 --- a/Sources/NIOHTTP2/HTTP2ChannelHandler+ConnectionDemultiplexer.swift +++ b/Sources/NIOHTTP2/HTTP2ChannelHandler+ConnectionDemultiplexer.swift @@ -23,16 +23,16 @@ internal protocol NIOHTTP2ConnectionDemultiplexer { func streamError(streamID: HTTP2StreamID, error: Error) /// A new HTTP/2 stream was created with the given ID. - func streamCreated(_ id: HTTP2StreamID, localInitialWindowSize: UInt32?, remoteInitialWindowSize: UInt32?) + func streamCreated(event: NIOHTTP2StreamCreatedEvent) /// An HTTP/2 stream with the given ID was closed. - func streamClosed(_ id: HTTP2StreamID, reason: HTTP2ErrorCode?) + func streamClosed(event: StreamClosedEvent) /// The flow control windows of the HTTP/2 stream changed. - func streamWindowUpdated(_ id: HTTP2StreamID, inboundWindowSize: Int?, outboundWindowSize: Int?) + func streamWindowUpdated(event: NIOHTTP2WindowUpdatedEvent) /// The initial stream window for all streams changed by the given amount. - func initialStreamWindowChanged(by delta: Int) + func initialStreamWindowChanged(event: NIOHTTP2BulkStreamWindowChangeEvent) } extension NIOHTTP2Handler { @@ -61,37 +61,37 @@ extension NIOHTTP2Handler { } } - func streamCreated(_ id: HTTP2StreamID, localInitialWindowSize: UInt32?, remoteInitialWindowSize: UInt32?) { + func streamCreated(event: NIOHTTP2StreamCreatedEvent) { switch self { case .legacy(let demultiplexer): - demultiplexer.streamCreated(id, localInitialWindowSize: localInitialWindowSize, remoteInitialWindowSize: remoteInitialWindowSize) + demultiplexer.streamCreated(event: event) case .new: fatalError("Not yet implemented.") } } - func streamClosed(_ id: HTTP2StreamID, reason: HTTP2ErrorCode?) { + func streamClosed(event: StreamClosedEvent) { switch self { case .legacy(let demultiplexer): - demultiplexer.streamClosed(id, reason: reason) + demultiplexer.streamClosed(event: event) case .new: fatalError("Not yet implemented.") } } - func streamWindowUpdated(_ id: HTTP2StreamID, inboundWindowSize: Int?, outboundWindowSize: Int?) { + func streamWindowUpdated(event: NIOHTTP2WindowUpdatedEvent) { switch self { case .legacy(let demultiplexer): - demultiplexer.streamWindowUpdated(id, inboundWindowSize: inboundWindowSize, outboundWindowSize: outboundWindowSize) + demultiplexer.streamWindowUpdated(event: event) case .new: fatalError("Not yet implemented.") } } - func initialStreamWindowChanged(by delta: Int) { + func initialStreamWindowChanged(event: NIOHTTP2BulkStreamWindowChangeEvent) { switch self { case .legacy(let demultiplexer): - demultiplexer.initialStreamWindowChanged(by: delta) + demultiplexer.initialStreamWindowChanged(event: event) case .new: fatalError("Not yet implemented.") } @@ -115,19 +115,19 @@ extension LegacyHTTP2ConnectionDemultiplexer: NIOHTTP2ConnectionDemultiplexer { self.context.fireErrorCaught(NIOHTTP2Errors.streamError(streamID: streamID, baseError: error)) } - func streamCreated(_ id: HTTP2StreamID, localInitialWindowSize: UInt32?, remoteInitialWindowSize: UInt32?) { - self.context.fireUserInboundEventTriggered(NIOHTTP2StreamCreatedEvent(streamID: id, localInitialWindowSize: localInitialWindowSize, remoteInitialWindowSize: remoteInitialWindowSize)) + func streamCreated(event: NIOHTTP2StreamCreatedEvent) { + self.context.fireUserInboundEventTriggered(event) } - func streamClosed(_ id: HTTP2StreamID, reason: HTTP2ErrorCode?) { - self.context.fireUserInboundEventTriggered(StreamClosedEvent(streamID: id, reason: reason)) + func streamClosed(event: StreamClosedEvent) { + self.context.fireUserInboundEventTriggered(event) } - func streamWindowUpdated(_ id: HTTP2StreamID, inboundWindowSize: Int?, outboundWindowSize: Int?) { - self.context.fireUserInboundEventTriggered(NIOHTTP2WindowUpdatedEvent(streamID: id, inboundWindowSize: inboundWindowSize, outboundWindowSize: outboundWindowSize)) + func streamWindowUpdated(event: NIOHTTP2WindowUpdatedEvent) { + self.context.fireUserInboundEventTriggered(event) } - func initialStreamWindowChanged(by delta: Int) { - self.context.fireUserInboundEventTriggered(NIOHTTP2BulkStreamWindowChangeEvent(delta: delta)) + func initialStreamWindowChanged(event: NIOHTTP2BulkStreamWindowChangeEvent) { + self.context.fireUserInboundEventTriggered(event) } } diff --git a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift index e6af31f3..1e67ba34 100644 --- a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift +++ b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift @@ -514,13 +514,13 @@ extension NIOHTTP2Handler.ConnectionDemultiplexer { func process(event: InboundEventBuffer.BufferedHTTP2UserEvent) { switch event { case .streamCreated(let event): - self.streamCreated(event.streamID, localInitialWindowSize: event.localInitialWindowSize, remoteInitialWindowSize: event.remoteInitialWindowSize) + self.streamCreated(event: event) case .initialStreamWindowChanged(let event): - self.initialStreamWindowChanged(by: event.delta) + self.initialStreamWindowChanged(event: event) case .streamWindowUpdated(let event): - self.streamWindowUpdated(event.streamID, inboundWindowSize: event.inboundWindowSize, outboundWindowSize: event.outboundWindowSize) + self.streamWindowUpdated(event: event) case .streamClosed(let event): - self.streamClosed(event.streamID, reason: event.reason) + self.streamClosed(event: event) } } } @@ -694,8 +694,8 @@ extension NIOHTTP2Handler { self.outboundBuffer.streamCreated(streamCreatedData.streamID, initialWindowSize: streamCreatedData.localStreamWindowSize.map(UInt32.init) ?? 0) self.inboundEventBuffer.pendingUserEvent( NIOHTTP2StreamCreatedEvent(streamID: streamCreatedData.streamID, - localInitialWindowSize: streamCreatedData.localStreamWindowSize.map(UInt32.init), - remoteInitialWindowSize: streamCreatedData.remoteStreamWindowSize.map(UInt32.init))) + localInitialWindowSize: streamCreatedData.localStreamWindowSize.map(UInt32.init), + remoteInitialWindowSize: streamCreatedData.remoteStreamWindowSize.map(UInt32.init))) case .bulkStreamClosure(let streamClosureData): for droppedStream in streamClosureData.closedStreams { self.inboundEventBuffer.pendingUserEvent(StreamClosedEvent(streamID: droppedStream, reason: .cancel))