Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for UDP_GRO #2385

Merged
merged 2 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Sources/CNIOLinux/include/CNIOLinux.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <sys/sysinfo.h>
#include <sys/socket.h>
#include <sched.h>
#include <stdbool.h>
#include <errno.h>
#include <pthread.h>
#include <netinet/ip.h>
Expand Down Expand Up @@ -107,6 +108,7 @@ size_t CNIOLinux_CMSG_SPACE(size_t);
extern const int CNIOLinux_SO_TIMESTAMP;
extern const int CNIOLinux_SO_RCVTIMEO;

int CNIOLinux_supports_udp_segment();
bool CNIOLinux_supports_udp_segment();
bool CNIOLinux_supports_udp_gro();
#endif
#endif
28 changes: 19 additions & 9 deletions Sources/CNIOLinux/shim.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,29 @@ size_t CNIOLinux_CMSG_SPACE(size_t payloadSizeBytes) {
const int CNIOLinux_SO_TIMESTAMP = SO_TIMESTAMP;
const int CNIOLinux_SO_RCVTIMEO = SO_RCVTIMEO;

int CNIOLinux_supports_udp_segment() {
#ifndef UDP_SEGMENT
return -1;
#else
bool supports_udp_sockopt(int opt, int value) {
int fd = socket(AF_INET, SOCK_DGRAM, 0);
if (fd == -1) {
return -1;
return false;
}

int gso_size = 512;
int rc = setsockopt(fd, IPPROTO_UDP, UDP_SEGMENT, &gso_size, sizeof(gso_size));
int rc = setsockopt(fd, IPPROTO_UDP, opt, &value, sizeof(value));
close(fd);
return rc;
return rc == 0;
}

bool CNIOLinux_supports_udp_segment() {
#ifndef UDP_SEGMENT
return false;
#else
return supports_udp_sockopt(UDP_SEGMENT, 512);
#endif
}

bool CNIOLinux_supports_udp_gro() {
#ifndef UDP_GRO
return false;
#else
return supports_udp_sockopt(UDP_GRO, 1);
#endif
}

Expand Down
6 changes: 5 additions & 1 deletion Sources/NIOCore/BSDSocketAPI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,14 @@ extension NIOBSDSocket.Option {

#if os(Linux)
extension NIOBSDSocket.Option {
// Note: UDP_SEGMENT is not available on all Linux platforms so the value is hardcoded.
// Note: UDP_SEGMENT and UDP_GRO are not available on all Linux platforms so values are
// hardcoded.

/// Use UDP segmentation offload (UDP_SEGMENT, or 'GSO'). Only available on Linux.
public static let udp_segment = NIOBSDSocket.Option(rawValue: 103)

/// Use UDP generic receive offload (GRO). Only available on Linux.
public static let udp_gro = NIOBSDSocket.Option(rawValue: 104)
}
#endif

Expand Down
16 changes: 16 additions & 0 deletions Sources/NIOCore/ChannelOption.swift
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,19 @@ extension ChannelOptions {
public init() { }
}

/// ``DatagramReceiveOffload`` sets the 'UDP_GRO' socket option which allows for datagrams to be accumulated
/// by the kernel (or in some cases, the NIC) and reduces traversals in the kernel's networking layer.
///
/// This option is currently only supported on Linux (5.10 and newer). Support can be checked
/// using ``System/supportsUDPReceiveOffload``.
///
/// - Note: users should ensure they use an appropriate receive buffer allocator when enabling this option.
/// The default allocator for datagram channels uses fixed sized buffers of 2048 bytes.
public struct DatagramReceiveOffload: ChannelOption, Sendable {
public typealias Value = Bool
public init() { }
}

/// When set to true IP level ECN information will be reported through `AddressedEnvelope.Metadata`
public struct ExplicitCongestionNotificationsOption: ChannelOption, Sendable {
public typealias Value = Bool
Expand Down Expand Up @@ -333,6 +346,9 @@ public struct ChannelOptions {
/// - seealso: `DatagramSegmentSize`
public static let datagramSegmentSize = Types.DatagramSegmentSize()

/// - seealso: `DatagramReceiveOffload`
public static let datagramReceiveOffload = Types.DatagramReceiveOffload()

/// - seealso: `ExplicitCongestionNotificationsOption`
public static let explicitCongestionNotification = Types.ExplicitCongestionNotificationsOption()

Expand Down
14 changes: 13 additions & 1 deletion Sources/NIOCore/Utilities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,23 @@ extension System {
/// Returns true if the platform supports 'UDP_SEGMENT' (GSO).
///
/// The option can be enabled by setting the ``DatagramSegmentSize`` channel option.
public static let supportsUDPSegmentationOffload: Bool = CNIOLinux_supports_udp_segment() == 0
public static let supportsUDPSegmentationOffload: Bool = CNIOLinux_supports_udp_segment()
#else
/// Returns true if the platform supports 'UDP_SEGMENT' (GSO).
///
/// The option can be enabled by setting the ``DatagramSegmentSize`` channel option.
public static let supportsUDPSegmentationOffload: Bool = false
#endif

#if os(Linux)
/// Returns true if the platform supports 'UDP_GRO'.
///
/// The option can be enabled by setting the ``DatagramReceiveOffload`` channel option.
public static let supportsUDPReceiveOffload: Bool = CNIOLinux_supports_udp_gro()
#else
/// Returns true if the platform supports 'UDP_GRO'.
///
/// The option can be enabled by setting the ``DatagramReceiveOffload`` channel option.
public static let supportsUDPReceiveOffload: Bool = false
#endif
}
30 changes: 30 additions & 0 deletions Sources/NIOPosix/BSDSocketAPIPosix.swift
Original file line number Diff line number Diff line change
Expand Up @@ -297,5 +297,35 @@ extension NIOBSDSocket {
throw ChannelError.operationUnsupported
#endif
}

static func setUDPReceiveOffload(_ enabled: Bool, socket: NIOBSDSocket.Handle) throws {
#if os(Linux)
var isEnabled: CInt = enabled ? 1 : 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be var?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so: option_value in setsockopt takes an UnsafeRawPointer

try Self.setsockopt(socket: socket,
level: .udp,
option_name: .udp_gro,
option_value: &isEnabled,
option_len: socklen_t(MemoryLayout<CInt>.size))
#else
throw ChannelError.operationUnsupported
#endif
}

static func getUDPReceiveOffload(socket: NIOBSDSocket.Handle) throws -> Bool {
#if os(Linux)
var enabled: CInt = 0
var optionLength = socklen_t(MemoryLayout<CInt>.size)
try withUnsafeMutablePointer(to: &enabled) { enabledBytes in
try Self.getsockopt(socket: socket,
level: .udp,
option_name: .udp_gro,
option_value: enabledBytes,
option_len: &optionLength)
}
return enabled != 0
#else
throw ChannelError.operationUnsupported
#endif
}
}
#endif
8 changes: 8 additions & 0 deletions Sources/NIOPosix/BSDSocketAPIWindows.swift
Original file line number Diff line number Diff line change
Expand Up @@ -560,5 +560,13 @@ extension NIOBSDSocket {
static func getUDPSegmentSize(socket: NIOBSDSocket.Handle) throws -> CInt {
throw ChannelError.operationUnsupported
}

static func setUDPReceiveOffload(_ enabled: Bool, socket: NIOBSDSocket.Handle) throws {
throw ChannelError.operationUnsupported
}

static func getUDPReceiveOffload(socket: NIOBSDSocket.Handle) throws -> Bool {
throw ChannelError.operationUnsupported
}
}
#endif
14 changes: 14 additions & 0 deletions Sources/NIOPosix/Socket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -338,4 +338,18 @@ typealias IOVector = iovec
try NIOBSDSocket.getUDPSegmentSize(socket: $0)
}
}

/// Sets the value for the 'UDP_GRO' socket option.
func setUDPReceiveOffload(_ enabled: Bool) throws {
try self.withUnsafeHandle {
try NIOBSDSocket.setUDPReceiveOffload(enabled, socket: $0)
}
}

/// Returns the value of the 'UDP_GRO' socket option.
func getUDPReceiveOffload() throws -> Bool {
return try self.withUnsafeHandle {
try NIOBSDSocket.getUDPReceiveOffload(socket: $0)
}
}
}
11 changes: 11 additions & 0 deletions Sources/NIOPosix/SocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,12 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
}
let segmentSize = value as! ChannelOptions.Types.DatagramSegmentSize.Value
try self.socket.setUDPSegmentSize(segmentSize)
case _ as ChannelOptions.Types.DatagramReceiveOffload:
guard System.supportsUDPReceiveOffload else {
throw ChannelError.operationUnsupported
}
let enable = value as! ChannelOptions.Types.DatagramReceiveOffload.Value
try self.socket.setUDPReceiveOffload(enable)
default:
try super.setOption0(option, value: value)
}
Expand Down Expand Up @@ -562,6 +568,11 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
throw ChannelError.operationUnsupported
}
return try self.socket.getUDPSegmentSize() as! Option.Value
case _ as ChannelOptions.Types.DatagramReceiveOffload:
guard System.supportsUDPReceiveOffload else {
throw ChannelError.operationUnsupported
}
return try self.socket.getUDPReceiveOffload() as! Option.Value
default:
return try super.getOption0(option)
}
Expand Down
7 changes: 7 additions & 0 deletions Tests/NIOPosixTests/DatagramChannelTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ extension DatagramChannelTests {
("testLargeVectorWriteWithGSO", testLargeVectorWriteWithGSO),
("testWriteBufferAtGSOSegmentCountLimit", testWriteBufferAtGSOSegmentCountLimit),
("testWriteBufferAboveGSOSegmentCountLimitShouldError", testWriteBufferAboveGSOSegmentCountLimitShouldError),
("testGROIsUnsupportedOnNonLinuxPlatforms", testGROIsUnsupportedOnNonLinuxPlatforms),
("testSetGROOption", testSetGROOption),
("testGetGROOption", testGetGROOption),
("testChannelCanReceiveLargeBufferWithGROUsingScalarReads", testChannelCanReceiveLargeBufferWithGROUsingScalarReads),
("testChannelCanReceiveLargeBufferWithGROUsingVectorReads", testChannelCanReceiveLargeBufferWithGROUsingVectorReads),
("testChannelCanReceiveMultipleLargeBuffersWithGROUsingScalarReads", testChannelCanReceiveMultipleLargeBuffersWithGROUsingScalarReads),
("testChannelCanReceiveMultipleLargeBuffersWithGROUsingVectorReads", testChannelCanReceiveMultipleLargeBuffersWithGROUsingVectorReads),
]
}
}
Expand Down
101 changes: 101 additions & 0 deletions Tests/NIOPosixTests/DatagramChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1308,4 +1308,105 @@ class DatagramChannelTests: XCTestCase {
XCTAssert($0 is IOError)
}
}

func testGROIsUnsupportedOnNonLinuxPlatforms() throws {
#if !os(Linux)
XCTAssertFalse(System.supportsUDPReceiveOffload)
#endif
}

func testSetGROOption() throws {
let didSet = self.firstChannel.setOption(ChannelOptions.datagramReceiveOffload, value: true)
if System.supportsUDPReceiveOffload {
XCTAssertNoThrow(try didSet.wait())
} else {
XCTAssertThrowsError(try didSet.wait()) { error in
XCTAssertEqual(error as? ChannelError, .operationUnsupported)
}
}
}

func testGetGROOption() throws {
let getOption = self.firstChannel.getOption(ChannelOptions.datagramReceiveOffload)
if System.supportsUDPReceiveOffload {
XCTAssertEqual(try getOption.wait(), false) // not-set

// Now set and check.
XCTAssertNoThrow(try self.firstChannel.setOption(ChannelOptions.datagramReceiveOffload, value: true).wait())
XCTAssertTrue(try self.firstChannel.getOption(ChannelOptions.datagramReceiveOffload).wait())
} else {
XCTAssertThrowsError(try getOption.wait()) { error in
XCTAssertEqual(error as? ChannelError, .operationUnsupported)
}
}
}

func testReceiveLargeBufferWithGRO(segments: Int, segmentSize: Int, writes: Int, vectorReads: Int? = nil) throws {
try XCTSkipUnless(System.supportsUDPSegmentationOffload, "UDP_SEGMENT (GSO) is not supported on this platform")
try XCTSkipUnless(System.supportsUDPReceiveOffload, "UDP_GRO is not supported on this platform")

/// Set GSO on the first channel.
XCTAssertNoThrow(try self.firstChannel.setOption(ChannelOptions.datagramSegmentSize, value: CInt(segmentSize)).wait())
/// Set GRO on the second channel.
XCTAssertNoThrow(try self.secondChannel.setOption(ChannelOptions.datagramReceiveOffload, value: true).wait())
/// The third channel has neither set.

// Enable on second channel
if let vectorReads = vectorReads {
XCTAssertNoThrow(try self.secondChannel.setOption(ChannelOptions.datagramVectorReadMessageCount, value: vectorReads).wait())
}

/// Increase the size of the read buffer for the second and third channels.
let fixed = FixedSizeRecvByteBufferAllocator(capacity: 1 << 16)
XCTAssertNoThrow(try self.secondChannel.setOption(ChannelOptions.recvAllocator, value: fixed).wait())
XCTAssertNoThrow(try self.thirdChannel.setOption(ChannelOptions.recvAllocator, value: fixed).wait())

// Write a large datagrams on the first channel. They should be split and then accumulated on the receive side.
// Form a large buffer to write from the first channel.
let buffer = self.firstChannel.allocator.buffer(repeating: 1, count: segmentSize * segments)

// Write to the channel with GRO enabled.
do {
let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer)
let promises = (0 ..< writes).map { _ in self.firstChannel.write(NIOAny(writeData)) }
self.firstChannel.flush()
XCTAssertNoThrow(try EventLoopFuture.andAllSucceed(promises, on: self.firstChannel.eventLoop).wait())

// GRO is enabled so we expect a `writes` datagrams.
let datagrams = try self.secondChannel.waitForDatagrams(count: writes)
for datagram in datagrams {
XCTAssertEqual(datagram.data.readableBytes, segments * segmentSize)
}
}

// Write to the channel whithout GRO.
do {
let writeData = AddressedEnvelope(remoteAddress: self.thirdChannel.localAddress!, data: buffer)
let promises = (0 ..< writes).map { _ in self.firstChannel.write(NIOAny(writeData)) }
self.firstChannel.flush()
XCTAssertNoThrow(try EventLoopFuture.andAllSucceed(promises, on: self.firstChannel.eventLoop).wait())

// GRO is not enabled so we expect a `writes * segments` datagrams.
let datagrams = try self.thirdChannel.waitForDatagrams(count: writes * segments)
for datagram in datagrams {
XCTAssertEqual(datagram.data.readableBytes, segmentSize)
}
}
}

func testChannelCanReceiveLargeBufferWithGROUsingScalarReads() throws {
try self.testReceiveLargeBufferWithGRO(segments: 10, segmentSize: 1000, writes: 1)
}

func testChannelCanReceiveLargeBufferWithGROUsingVectorReads() throws {
try self.testReceiveLargeBufferWithGRO(segments: 10, segmentSize: 1000, writes: 1, vectorReads: 4)
}

func testChannelCanReceiveMultipleLargeBuffersWithGROUsingScalarReads() throws {
try self.testReceiveLargeBufferWithGRO(segments: 10, segmentSize: 1000, writes: 4)
}

func testChannelCanReceiveMultipleLargeBuffersWithGROUsingVectorReads() throws {
try self.testReceiveLargeBufferWithGRO(segments: 10, segmentSize: 1000, writes: 4, vectorReads: 4)
}
}