diff --git a/Package.swift b/Package.swift index c4e4771aad..5f490442f3 100644 --- a/Package.swift +++ b/Package.swift @@ -197,6 +197,7 @@ let package = Package( name: "_NIOFileSystem", dependencies: [ "NIOCore", + "NIOPosix", "CNIOLinux", "CNIODarwin", swiftAtomics, @@ -488,6 +489,7 @@ let package = Package( name: "NIOFileSystemIntegrationTests", dependencies: [ "NIOCore", + "NIOPosix", "_NIOFileSystem", "NIOFoundationCompat", ], diff --git a/Sources/NIOFileSystem/DirectoryEntries.swift b/Sources/NIOFileSystem/DirectoryEntries.swift index f827e09119..08d6a983f4 100644 --- a/Sources/NIOFileSystem/DirectoryEntries.swift +++ b/Sources/NIOFileSystem/DirectoryEntries.swift @@ -16,6 +16,7 @@ import CNIODarwin import CNIOLinux import NIOConcurrencyHelpers +import NIOPosix @preconcurrency import SystemPackage /// An `AsyncSequence` of entries in a directory. @@ -151,16 +152,14 @@ extension BufferedStream where Element == [DirectoryEntry] { ) source.onTermination = { - guard let executor = protectedState.withLockedValue({ $0.executorForClosing() }) else { + guard let threadPool = protectedState.withLockedValue({ $0.threadPoolForClosing() }) else { return } - executor.execute { + threadPool.submit { _ in // always run, even if cancelled protectedState.withLockedValue { state in state.closeIfNecessary() } - } onCompletion: { _ in - // Ignore the result. } } @@ -189,16 +188,21 @@ private struct DirectoryEntryProducer { /// source it will either produce more or be scheduled to produce more. Stopping production /// is signalled via the stream's 'onTermination' handler. func produceMore() { - let executor = self.state.withLockedValue { state in + let threadPool = self.state.withLockedValue { state in state.produceMore() } - // No executor means we're done. - guard let executor = executor else { return } + // No thread pool means we're done. + guard let threadPool = threadPool else { return } - executor.execute { - try self.nextBatch() - } onCompletion: { result in + threadPool.submit { + let result: Result<[DirectoryEntry], Error> + switch $0 { + case .active: + result = Result { try self.nextBatch() } + case .cancelled: + result = .failure(CancellationError()) + } self.onNextBatchResult(result) } } @@ -261,16 +265,14 @@ private struct DirectoryEntryProducer { } private func close() { - guard let executor = self.state.withLockedValue({ $0.executorForClosing() }) else { + guard let threadPool = self.state.withLockedValue({ $0.threadPoolForClosing() }) else { return } - executor.execute { + threadPool.submit { _ in // always run, even if cancelled self.state.withLockedValue { state in state.closeIfNecessary() } - } onCompletion: { _ in - // Ignore. } } } @@ -283,7 +285,7 @@ private struct DirectoryEnumerator: Sendable { private enum State: @unchecked Sendable { case modifying case idle(SystemFileHandle.SendableView, recursive: Bool) - case open(IOExecutor, Source, [DirectoryEntry]) + case open(NIOThreadPool, Source, [DirectoryEntry]) case done } @@ -351,12 +353,12 @@ private struct DirectoryEnumerator: Sendable { self.path = handle.path } - internal func produceMore() -> IOExecutor? { + internal func produceMore() -> NIOThreadPool? { switch self.state { case let .idle(handle, _): - return handle.executor - case let .open(executor, _, _): - return executor + return handle.threadPool + case let .open(threadPool, _, _): + return threadPool case .done: return nil case .modifying: @@ -364,10 +366,10 @@ private struct DirectoryEnumerator: Sendable { } } - internal func executorForClosing() -> IOExecutor? { + internal func threadPoolForClosing() -> NIOThreadPool? { switch self.state { - case let .open(executor, _, _): - return executor + case let .open(threadPool, _, _): + return threadPool case .idle, .done: // Don't need to close in the idle state: we don't own the handle. return nil @@ -449,7 +451,7 @@ private struct DirectoryEnumerator: Sendable { } private mutating func processOpenState( - executor: IOExecutor, + threadPool: NIOThreadPool, dir: CInterop.DirPointer, entries: inout [DirectoryEntry], count: Int @@ -499,11 +501,11 @@ private struct DirectoryEnumerator: Sendable { } // We must have hit our 'count' limit. - return (.open(executor, .readdir(dir), entries), .yield(.success(entries))) + return (.open(threadPool, .readdir(dir), entries), .yield(.success(entries))) } private mutating func processOpenState( - executor: IOExecutor, + threadPool: NIOThreadPool, fts: CInterop.FTSPointer, entries: inout [DirectoryEntry], count: Int @@ -580,7 +582,7 @@ private struct DirectoryEnumerator: Sendable { } // We must have hit our 'count' limit. - return (.open(executor, .fts(fts), entries), .yield(.success(entries))) + return (.open(threadPool, .fts(fts), entries), .yield(.success(entries))) } private mutating func process(_ count: Int) -> ProcessResult { @@ -596,7 +598,7 @@ private struct DirectoryEnumerator: Sendable { switch result { case let .success(source): - self.state = .open(handle.executor, source, []) + self.state = .open(handle.threadPool, source, []) return .continue case let .failure(error): @@ -604,13 +606,13 @@ private struct DirectoryEnumerator: Sendable { return .yield(.failure(error)) } - case .open(let executor, let mode, var entries): + case .open(let threadPool, let mode, var entries): self.state = .modifying switch mode { case .readdir(let dir): let (state, result) = self.processOpenState( - executor: executor, + threadPool: threadPool, dir: dir, entries: &entries, count: count @@ -620,7 +622,7 @@ private struct DirectoryEnumerator: Sendable { case .fts(let fts): let (state, result) = self.processOpenState( - executor: executor, + threadPool: threadPool, fts: fts, entries: &entries, count: count diff --git a/Sources/NIOFileSystem/FileChunks.swift b/Sources/NIOFileSystem/FileChunks.swift index 9db55ce36a..4b87221410 100644 --- a/Sources/NIOFileSystem/FileChunks.swift +++ b/Sources/NIOFileSystem/FileChunks.swift @@ -15,6 +15,7 @@ #if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) || os(Linux) || os(Android) import NIOConcurrencyHelpers import NIOCore +import NIOPosix @preconcurrency import SystemPackage /// An `AsyncSequence` of ordered chunks read from a file. @@ -140,16 +141,21 @@ private struct FileChunkProducer: Sendable { /// source it will either produce more or be scheduled to produce more. Stopping production /// is signalled via the stream's 'onTermination' handler. func produceMore() { - let executor = self.state.withLockedValue { state in + let threadPool = self.state.withLockedValue { state in state.shouldProduceMore() } // No executor means we're done. - guard let executor = executor else { return } - - executor.execute { - try self.readNextChunk() - } onCompletion: { result in + guard let threadPool = threadPool else { return } + + threadPool.submit { + let result: Result + switch $0 { + case .active: + result = Result { try self.readNextChunk() } + case .cancelled: + result = .failure(CancellationError()) + } self.onReadNextChunkResult(result) } } @@ -273,10 +279,10 @@ private struct ProducerState: Sendable { } } - mutating func shouldProduceMore() -> IOExecutor? { + mutating func shouldProduceMore() -> NIOThreadPool? { switch self.state { case let .producing(state): - return state.handle.executor + return state.handle.threadPool case .done: return nil } diff --git a/Sources/NIOFileSystem/FileSystem.swift b/Sources/NIOFileSystem/FileSystem.swift index 4799a3d9db..1062e46f6a 100644 --- a/Sources/NIOFileSystem/FileSystem.swift +++ b/Sources/NIOFileSystem/FileSystem.swift @@ -16,6 +16,7 @@ import Atomics import NIOCore +import NIOPosix @preconcurrency import SystemPackage #if canImport(Darwin) @@ -36,7 +37,7 @@ import Musl /// environment variable is set. /// /// If you require more granular control you can create a ``FileSystem`` with the required number -/// of threads by calling ``withFileSystem(numberOfThreads:_:)``. +/// of threads by calling ``withFileSystem(numberOfThreads:_:)`` or by using ``init(threadPool:)``. /// /// ### Errors /// @@ -53,23 +54,39 @@ public struct FileSystem: Sendable, FileSystemProtocol { /// Returns a shared global instance of the ``FileSystem``. /// /// The file system executes blocking work in a thread pool which defaults to having two - /// threads. This can be modified by `fileSystemThreadCountSuggestion` or by - /// setting the `NIO_SINGLETON_FILESYSTEM_THREAD_COUNT` environment variable. + /// threads. This can be modified by `blockingPoolThreadCountSuggestion` or by + /// setting the `NIO_SINGLETON_BLOCKING_POOL_THREAD_COUNT` environment variable. public static var shared: FileSystem { globalFileSystem } - private let executor: IOExecutor + private let threadPool: NIOThreadPool + private let ownsThreadPool: Bool fileprivate func shutdown() async { - await self.executor.drain() + if self.ownsThreadPool { + try? await self.threadPool.shutdownGracefully() + } } - fileprivate init(executor: IOExecutor) { - self.executor = executor + /// Creates a new ``FileSystem`` using the provided thread pool. + /// + /// - Parameter threadPool: A started thread pool to execute blocking system calls on. The + /// ``FileSystem`` doesn't take ownership of the thread pool and you remain responsible + /// for shutting it down when necessary. + public init(threadPool: NIOThreadPool) { + self.init(threadPool: threadPool, ownsThreadPool: false) + } + + fileprivate init(threadPool: NIOThreadPool, ownsThreadPool: Bool) { + self.threadPool = threadPool + self.ownsThreadPool = ownsThreadPool } fileprivate init(numberOfThreads: Int) async { - let executor = await IOExecutor.running(numberOfThreads: numberOfThreads) - self.init(executor: executor) + let threadPool = NIOThreadPool(numberOfThreads: numberOfThreads) + threadPool.start() + // Wait for the thread pool to start. + try? await threadPool.runIfActive { } + self.init(threadPool: threadPool, ownsThreadPool: true) } /// Open the file at `path` for reading. @@ -91,7 +108,7 @@ public struct FileSystem: Sendable, FileSystemProtocol { forReadingAt path: FilePath, options: OpenOptions.Read ) async throws -> ReadFileHandle { - let handle = try await self.executor.execute { + let handle = try await self.threadPool.runIfActive { let handle = try self._openFile(forReadingAt: path, options: options).get() // Okay to transfer: we just created it and are now moving back to the callers task. return UnsafeTransfer(handle) @@ -121,7 +138,7 @@ public struct FileSystem: Sendable, FileSystemProtocol { forWritingAt path: FilePath, options: OpenOptions.Write ) async throws -> WriteFileHandle { - let handle = try await self.executor.execute { + let handle = try await self.threadPool.runIfActive { let handle = try self._openFile(forWritingAt: path, options: options).get() // Okay to transfer: we just created it and are now moving back to the callers task. return UnsafeTransfer(handle) @@ -151,7 +168,7 @@ public struct FileSystem: Sendable, FileSystemProtocol { forReadingAndWritingAt path: FilePath, options: OpenOptions.Write ) async throws -> ReadWriteFileHandle { - let handle = try await self.executor.execute { + let handle = try await self.threadPool.runIfActive { let handle = try self._openFile(forReadingAndWritingAt: path, options: options).get() // Okay to transfer: we just created it and are now moving back to the callers task. return UnsafeTransfer(handle) @@ -177,7 +194,7 @@ public struct FileSystem: Sendable, FileSystemProtocol { atPath path: FilePath, options: OpenOptions.Directory ) async throws -> DirectoryFileHandle { - let handle = try await self.executor.execute { + let handle = try await self.threadPool.runIfActive { let handle = try self._openDirectory(at: path, options: options).get() // Okay to transfer: we just created it and are now moving back to the callers task. return UnsafeTransfer(handle) @@ -216,7 +233,7 @@ public struct FileSystem: Sendable, FileSystemProtocol { withIntermediateDirectories createIntermediateDirectories: Bool, permissions: FilePermissions? ) async throws { - try await self.executor.execute { + try await self.threadPool.runIfActive { try self._createDirectory( at: path, withIntermediateDirectories: createIntermediateDirectories, @@ -246,7 +263,7 @@ public struct FileSystem: Sendable, FileSystemProtocol { public func createTemporaryDirectory( template: FilePath ) async throws -> FilePath { - return try await self.executor.execute { + return try await self.threadPool.runIfActive { try self._createTemporaryDirectory(template: template).get() } } @@ -267,7 +284,7 @@ public struct FileSystem: Sendable, FileSystemProtocol { forFileAt path: FilePath, infoAboutSymbolicLink: Bool ) async throws -> FileInfo? { - return try await self.executor.execute { + return try await self.threadPool.runIfActive { try self._info(forFileAt: path, infoAboutSymbolicLink: infoAboutSymbolicLink).get() } } @@ -389,7 +406,7 @@ public struct FileSystem: Sendable, FileSystemProtocol { recursively removeItemRecursively: Bool ) async throws -> Int { // Try to remove the item: we might just get lucky. - let result = try await self.executor.execute { Libc.remove(path) } + let result = try await self.threadPool.runIfActive { Libc.remove(path) } switch result { case .success: @@ -474,7 +491,7 @@ public struct FileSystem: Sendable, FileSystemProtocol { /// - sourcePath: The path to the item to move. /// - destinationPath: The path at which to place the item. public func moveItem(at sourcePath: FilePath, to destinationPath: FilePath) async throws { - let result = try await self.executor.execute { + let result = try await self.threadPool.runIfActive { try self._moveItem(at: sourcePath, to: destinationPath).get() } @@ -547,7 +564,7 @@ public struct FileSystem: Sendable, FileSystemProtocol { at linkPath: FilePath, withDestination destinationPath: FilePath ) async throws { - return try await self.executor.execute { + return try await self.threadPool.runIfActive { try self._createSymbolicLink(at: linkPath, withDestination: destinationPath).get() } } @@ -578,7 +595,7 @@ public struct FileSystem: Sendable, FileSystemProtocol { public func destinationOfSymbolicLink( at path: FilePath ) async throws -> FilePath { - return try await self.executor.execute { + return try await self.threadPool.runIfActive { try self._destinationOfSymbolicLink(at: path).get() } } @@ -592,7 +609,7 @@ public struct FileSystem: Sendable, FileSystemProtocol { /// - Returns: The path to the current working directory. public var currentWorkingDirectory: FilePath { get async throws { - try await self.executor.execute { + try await self.threadPool.runIfActive { try Libc.getcwd().mapError { errno in FileSystemError.getcwd(errno: errno, location: .here()) }.get() @@ -614,7 +631,7 @@ public struct FileSystem: Sendable, FileSystemProtocol { public var temporaryDirectory: FilePath { get async throws { #if canImport(Darwin) - return try await self.executor.execute { + return try await self.threadPool.runIfActive { return try Libc.constr(_CS_DARWIN_USER_TEMP_DIR).map { path in FilePath(path) }.mapError { errno in @@ -643,96 +660,13 @@ extension NIOSingletons { /// `NIO_SINGLETON_FILESYSTEM_THREAD_COUNT` is set or this value was set manually by the user. /// /// - note: This value must be set _before_ any singletons are used and must only be set once. + @available(*, deprecated, renamed: "blockingPoolThreadCountSuggestion") public static var fileSystemThreadCountSuggestion: Int { - set { - Self.userSetSingletonThreadCount(rawStorage: globalRawSuggestedFileSystemThreadCount, userValue: newValue) - } - - get { - return Self.getTrustworthyThreadCount( - rawStorage: globalRawSuggestedFileSystemThreadCount, - environmentVariable: "NIO_SINGLETON_FILESYSTEM_THREAD_COUNT" - ) - } - } - - // Copied from NIOCore/GlobalSingletons.swift - private static func userSetSingletonThreadCount(rawStorage: ManagedAtomic, userValue: Int) { - precondition(userValue > 0, "illegal value: needs to be strictly positive") - - // The user is trying to set it. We can only do this if the value is at 0 and we will set the - // negative value. So if the user wants `5`, we will set `-5`. Once it's used (set getter), it'll be upped - // to 5. - let (exchanged, _) = rawStorage.compareExchange(expected: 0, desired: -userValue, ordering: .relaxed) - guard exchanged else { - fatalError( - """ - Bug in user code: Global singleton suggested loop/thread count has been changed after \ - user or has been changed more than once. Either is an error, you must set this value very early \ - and only once. - """ - ) - } - } - - // Copied from NIOCore/GlobalSingletons.swift - private static func validateTrustedThreadCount(_ threadCount: Int) { - assert( - threadCount > 0, - "BUG IN NIO, please report: negative suggested loop/thread count: \(threadCount)" - ) - assert( - threadCount <= 1024, - "BUG IN NIO, please report: overly big suggested loop/thread count: \(threadCount)" - ) - } - - // Copied from NIOCore/GlobalSingletons.swift - private static func getTrustworthyThreadCount(rawStorage: ManagedAtomic, environmentVariable: String) -> Int { - let returnedValueUnchecked: Int - - let rawSuggestion = rawStorage.load(ordering: .relaxed) - switch rawSuggestion { - case 0: // == 0 - // Not set by user, not yet finalised, let's try to get it from the env var and fall back to - // `System.coreCount`. - let envVarString = getenv(environmentVariable).map { String(cString: $0) } - returnedValueUnchecked = envVarString.flatMap(Int.init) ?? System.coreCount - case .min..<0: // < 0 - // Untrusted and unchecked user value. Let's invert and then sanitise/check. - returnedValueUnchecked = -rawSuggestion - case 1 ... .max: // > 0 - // Trustworthy value that has been evaluated and sanitised before. - let returnValue = rawSuggestion - Self.validateTrustedThreadCount(returnValue) - return returnValue - default: - // Unreachable - preconditionFailure() - } - - // Can't have fewer than 1, don't want more than 1024. - let returnValue = max(1, min(1024, returnedValueUnchecked)) - Self.validateTrustedThreadCount(returnValue) - - // Store it for next time. - let (exchanged, _) = rawStorage.compareExchange( - expected: rawSuggestion, - desired: returnValue, - ordering: .relaxed - ) - if !exchanged { - // We lost the race, this must mean it has been concurrently set correctly so we can safely recurse - // and try again. - return Self.getTrustworthyThreadCount(rawStorage: rawStorage, environmentVariable: environmentVariable) - } - return returnValue + set { Self.blockingPoolThreadCountSuggestion = newValue } + get { Self.blockingPoolThreadCountSuggestion } } } -// DO NOT TOUCH THIS DIRECTLY, use `userSetSingletonThreadCount` and `getTrustworthyThreadCount`. -private let globalRawSuggestedFileSystemThreadCount = ManagedAtomic(0) - @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) private let globalFileSystem: FileSystem = { guard NIOSingletons.singletonsEnabledSuggestion else { @@ -743,9 +677,7 @@ private let globalFileSystem: FileSystem = { """ ) } - - let threadCount = NIOSingletons.fileSystemThreadCountSuggestion - return FileSystem(executor: .runningAsync(numberOfThreads: threadCount)) + return FileSystem(threadPool: NIOSingletons.posixBlockingThreadPool, ownsThreadPool: false) }() @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @@ -753,7 +685,7 @@ extension NIOSingletons { /// Returns a shared global instance of the ``FileSystem``. /// /// The file system executes blocking work in a thread pool which defaults to having two - /// threads. This can be modified by `fileSystemThreadCountSuggestion` or by + /// threads. This can be modified by `blockingPoolThreadCountSuggestion` or by /// setting the `NIO_SINGLETON_FILESYSTEM_THREAD_COUNT` environment variable. public static var fileSystem: FileSystem { globalFileSystem } } @@ -793,7 +725,7 @@ extension FileSystem { options: options.descriptorOptions, permissions: nil, transactionalIfPossible: false, - executor: self.executor + threadPool: self.threadPool ).map { ReadFileHandle(wrapping: $0) } @@ -810,7 +742,7 @@ extension FileSystem { options: options.descriptorOptions, permissions: options.permissionsForRegularFile, transactionalIfPossible: options.newFile?.transactionalCreation ?? false, - executor: self.executor + threadPool: self.threadPool ).map { WriteFileHandle(wrapping: $0) } @@ -827,7 +759,7 @@ extension FileSystem { options: options.descriptorOptions, permissions: options.permissionsForRegularFile, transactionalIfPossible: options.newFile?.transactionalCreation ?? false, - executor: self.executor + threadPool: self.threadPool ).map { ReadWriteFileHandle(wrapping: $0) } @@ -844,7 +776,7 @@ extension FileSystem { options: options.descriptorOptions, permissions: nil, transactionalIfPossible: false, - executor: self.executor + threadPool: self.threadPool ).map { DirectoryFileHandle(wrapping: $0) } @@ -1056,7 +988,7 @@ extension FileSystem { from sourcePath: FilePath, to destinationPath: FilePath ) async throws { - try await self.executor.execute { + try await self.threadPool.runIfActive { try self._copyRegularFile(from: sourcePath, to: destinationPath).get() } } @@ -1198,7 +1130,7 @@ extension FileSystem { from sourcePath: FilePath, to destinationPath: FilePath ) async throws { - try await self.executor.execute { + try await self.threadPool.runIfActive { try self._copySymbolicLink(from: sourcePath, to: destinationPath).get() } } @@ -1219,7 +1151,7 @@ extension FileSystem { file: String = #fileID, line: Int = #line ) async throws -> Int { - try await self.executor.execute { + try await self.threadPool.runIfActive { switch Libc.remove(path) { case .success: return 1 diff --git a/Sources/NIOFileSystem/Internal/Concurrency Primitives/IOExecutor.swift b/Sources/NIOFileSystem/Internal/Concurrency Primitives/IOExecutor.swift deleted file mode 100644 index dccac6bb2b..0000000000 --- a/Sources/NIOFileSystem/Internal/Concurrency Primitives/IOExecutor.swift +++ /dev/null @@ -1,423 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftNIO open source project -// -// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftNIO project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) || os(Linux) || os(Android) -import Atomics -import DequeModule -import Dispatch -import NIOConcurrencyHelpers - -@_spi(Testing) -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public final class IOExecutor: Sendable { - /// Used to generate IDs for work items. Don't use directly, - /// use ``generateWorkID()`` instead. - private let workID = ManagedAtomic(UInt64(0)) - /// The workers. - private let workers: [Worker] - - /// Generates a unique ID for a work item. This is used to identify the work - /// for cancellation. - private func generateWorkID() -> UInt64 { - // We only require uniqueness: relaxed is sufficient. - return self.workID.loadThenWrappingIncrement(ordering: .relaxed) - } - - /// Create a running executor with the given number of threads. - /// - /// - Precondition: `numberOfThreads` must be greater than zero. - /// - Returns: a running executor which does work on `numberOfThreads` threads. - @_spi(Testing) - public static func running(numberOfThreads: Int) async -> IOExecutor { - let executor = IOExecutor(numberOfThreads: numberOfThreads) - await executor.start() - return executor - } - - /// Create a running executor with the given number of threads. - /// - /// - Precondition: `numberOfThreads` must be greater than zero. - /// - Returns: a running executor which does work on `numberOfThreads` threads. - @_spi(Testing) - public static func runningAsync(numberOfThreads: Int) -> IOExecutor { - let executor = IOExecutor(numberOfThreads: numberOfThreads) - Task { await executor.start() } - return executor - } - - private init(numberOfThreads: Int) { - precondition(numberOfThreads > 0, "numberOfThreads must be greater than zero") - var workers = [Worker]() - workers.reserveCapacity(numberOfThreads) - for _ in 0..( - _ work: @Sendable @escaping () throws -> R - ) async throws -> R { - let workerIndex = self.pickWorker() - let workID = self.generateWorkID() - - return try await withTaskCancellationHandler { - return try await withUnsafeThrowingContinuation { continuation in - self.workers[workerIndex].enqueue(id: workID) { action in - switch action { - case .run: - continuation.resume(with: Result(catching: work)) - case .cancel: - continuation.resume(throwing: CancellationError()) - case .reject: - let error = FileSystemError( - code: .unavailable, - message: "The executor has been shutdown.", - cause: nil, - location: .here() - ) - continuation.resume(throwing: error) - } - } - } - } onCancel: { - self.workers[workerIndex].cancel(workID: workID) - } - } - - /// Executes work on a thread owned by the executor and notifies a completion - /// handler with the result. - /// - /// - Parameters: - /// - work: A closure to execute. - /// - onCompletion: A closure to notify with the result of the work. - @_spi(Testing) - public func execute( - work: @Sendable @escaping () throws -> R, - onCompletion: @Sendable @escaping (Result) -> Void - ) { - let workerIndex = self.pickWorker() - let workID = self.generateWorkID() - self.workers[workerIndex].enqueue(id: workID) { action in - switch action { - case .run: - onCompletion(Result(catching: work)) - case .cancel: - onCompletion(.failure(CancellationError())) - case .reject: - let error = FileSystemError( - code: .unavailable, - message: "The executor has been shutdown.", - cause: nil, - location: .here() - ) - onCompletion(.failure(error)) - } - } - } - - /// Stop accepting new tasks and wait for all enqueued work to finish. - /// - /// Any work submitted to the executor whilst it is draining or once it - /// has finished draining will not be executed. - @_spi(Testing) - public func drain() async { - await withTaskGroup(of: Void.self) { group in - for worker in self.workers { - group.addTask { - await worker.stop() - } - } - } - } - - /// Returns the index of the work to submit the next item of work on. - /// - /// If there are more than two workers then the "power of two random choices" - /// approach (https://www.eecs.harvard.edu/~michaelm/postscripts/tpds2001.pdf) is - /// used whereby two workers are selected at random and the one with the least - /// amount of work is chosen. - /// - /// If there are one or two workers then the one with the least work is - /// chosen. - private func pickWorker() -> Int { - // 'numberOfThreads' is guaranteed to be > 0. - switch self.workers.count { - case 1: - return 0 - case 2: - return self.indexOfLeastBusyWorkerAtIndices(0, 1) - default: - // For more than two threads use the 'power of two random choices'. - let i = self.workers.indices.randomElement()! - let j = self.workers.indices.randomElement()! - - if i == j { - return i - } else { - return self.indexOfLeastBusyWorkerAtIndices(i, j) - } - } - } - - private func indexOfLeastBusyWorkerAtIndices(_ i: Int, _ j: Int) -> Int { - let workOnI = self.workers[i].numberOfQueuedTasks - let workOnJ = self.workers[j].numberOfQueuedTasks - return workOnI < workOnJ ? i : j - } -} - -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -extension IOExecutor { - final class Worker: @unchecked Sendable { - /// An item of work executed by the worker. - typealias WorkItem = @Sendable (WorkAction) -> Void - - /// Whether the work should be executed or cancelled. - enum WorkAction { - case run - case cancel - case reject - } - - /// The state of the worker. - private enum State { - case notStarted - case starting(CheckedContinuation) - case active(Thread) - case draining(CheckedContinuation) - case stopped - - mutating func starting(continuation: CheckedContinuation) { - switch self { - case .notStarted: - self = .starting(continuation) - case .active, .starting, .draining, .stopped: - fatalError("\(#function) while worker was active/starting/draining/stopped") - } - } - - mutating func activate(_ thread: Thread) -> CheckedContinuation { - switch self { - case let .starting(continuation): - self = .active(thread) - return continuation - case .notStarted, .active, .draining, .stopped: - fatalError("\(#function) while worker was notStarted/active/draining/stopped") - } - } - - mutating func drained() -> CheckedContinuation { - switch self { - case let .draining(continuation): - self = .stopped - return continuation - case .notStarted, .starting, .active, .stopped: - fatalError("\(#function) while worker was notStarted/starting/active/stopped") - } - } - - var isStopped: Bool { - switch self { - case .stopped: - return true - case .notStarted, .starting, .active, .draining: - return true - } - } - } - - private let lock = NIOLock() - private var state: State - /// Items of work waiting to be executed and their ID. - private var workQueue: Deque<(UInt64, WorkItem)> - /// IDs of work items which have been marked as cancelled but not yet - /// processed. - private var cancelledTasks: [UInt64] - // TODO: make checking for cancellation cheaper by maintaining a heap of cancelled - // IDs and ensuring work is in order of increasing ID. With that we can check the ID - // of the popped work item against the top of the heap. - - /// Signalled when an item of work is added to the work queue. - private let semaphore: DispatchSemaphore - /// The number of items in the work queue. This is used by the executor to - /// make decisions on where to enqueue work so relaxed memory ordering is fine. - private let queuedWork = ManagedAtomic(0) - - internal init() { - self.state = .notStarted - self.semaphore = DispatchSemaphore(value: 0) - self.workQueue = [] - self.cancelledTasks = [] - } - - deinit { - assert(self.state.isStopped, "The IOExecutor MUST be shutdown before deinit") - } - } -} - -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -extension IOExecutor.Worker { - func start(name: String) async { - self.lock.lock() - await withCheckedContinuation { continuation in - self.state.starting(continuation: continuation) - self.lock.unlock() - - Thread.spawnAndRun(name: name, detachThread: false) { thread in - let continuation = self.lock.withLock { self.state.activate(thread) } - continuation.resume() - self.process() - } - } - } - - var numberOfQueuedTasks: Int { - return self.queuedWork.load(ordering: .relaxed) - } - - func enqueue(id: UInt64, _ item: @escaping WorkItem) { - let didEnqueue = self.lock.withLock { - self._enqueue(id: id, item) - } - - if didEnqueue { - self.queuedWork.wrappingIncrement(ordering: .relaxed) - self.semaphore.signal() - } else { - // Draining or stopped. - item(.reject) - } - } - - private func _enqueue(id: UInt64, _ item: @escaping WorkItem) -> Bool { - switch self.state { - case .notStarted, .starting, .active: - self.workQueue.append((id, item)) - return true - case .draining, .stopped: - // New work is rejected in these states. - return false - } - } - - func cancel(workID: UInt64) { - self.lock.withLock { - // The work will be cancelled when pulled from the work queue. This means - // there's a delay before the work is actually cancelled; we trade that off - // against the cost of scanning the work queue for an item to cancel and then - // removing it which is O(n) (which could be O(n^2) for bulk cancellation). - self.cancelledTasks.append(workID) - } - } - - func stop() async { - self.lock.lock() - switch self.state { - case .notStarted: - self.state = .stopped - while let work = self.workQueue.popFirst() { - work.1(.reject) - } - - case let .active(thread): - await withCheckedContinuation { continuation in - self.state = .draining(continuation) - self.lock.unlock() - - // Signal the semaphore: 'process()' waits on the semaphore for work so - // always expects the queue to be non-empty. This is the exception that - // indicates to 'process()' that it can stop. - self.semaphore.signal() - } - precondition(self.lock.withLock({ self.state.isStopped })) - // This won't block, we just drained the work queue so 'self.process()' will have - // returned - thread.join() - - case .starting, .draining: - fatalError("Worker is already starting/draining") - - case .stopped: - self.lock.unlock() - } - } - - private func process() { - enum Instruction { - case run(WorkItem) - case cancel(WorkItem) - case stopWorking(CheckedContinuation) - } - - while true { - // Wait for work to be signalled via the semaphore. It is signalled for every time an - // item of work is added to the queue. It is signalled an additional time when the worker - // is shutting down: in that case there is no corresponding work item in the queue so - // 'popFirst()' will return 'nil'; that's the signal to stop looping. - self.semaphore.wait() - - let instruction: Instruction = self.lock.withLock { - switch self.state { - case let .draining(continuation): - if self.workQueue.isEmpty { - self.state = .stopped - return .stopWorking(continuation) - } - // There's still work to do: continue as if active. - fallthrough - - case .active: - let (id, work) = self.workQueue.removeFirst() - if let index = self.cancelledTasks.firstIndex(of: id) { - self.cancelledTasks.remove(at: index) - return .cancel(work) - } else { - return .run(work) - } - - case .notStarted, .starting, .stopped: - fatalError("Impossible state") - } - } - - switch instruction { - case let .run(work): - work(.run) - case let .cancel(work): - work(.cancel) - case let .stopWorking(continuation): - continuation.resume() - return - } - } - } -} -#endif diff --git a/Sources/NIOFileSystem/Internal/Concurrency Primitives/Thread.swift b/Sources/NIOFileSystem/Internal/Concurrency Primitives/Thread.swift deleted file mode 100644 index 5e5eb58ab1..0000000000 --- a/Sources/NIOFileSystem/Internal/Concurrency Primitives/Thread.swift +++ /dev/null @@ -1,99 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftNIO open source project -// -// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftNIO project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) || os(Linux) || os(Android) -#if os(Linux) || os(FreeBSD) || os(Android) -import CNIOLinux -#endif - -protocol ThreadOps { - associatedtype ThreadHandle - associatedtype ThreadSpecificKey - associatedtype ThreadSpecificKeyDestructor - - static func run( - handle: inout ThreadHandle?, - args: Ref, - detachThread: Bool - ) - static func joinThread(_ thread: ThreadHandle) - static func threadName(_ thread: ThreadHandle) -> String? - static func compareThreads(_ lhs: ThreadHandle, _ rhs: ThreadHandle) -> Bool -} - -/// A Thread that executes some runnable block. -/// -/// All methods exposed are thread-safe. -final class Thread { - internal typealias ThreadBoxValue = (body: (Thread) -> Void, name: String?) - internal typealias ThreadBox = Ref - - private let desiredName: String? - - /// The thread handle used by this instance. - private let handle: ThreadOpsSystem.ThreadHandle - - /// Create a new instance - /// - /// - arguments: - /// - handle: The `ThreadOpsSystem.ThreadHandle` that is wrapped and used by the `Thread`. - internal init(handle: ThreadOpsSystem.ThreadHandle, desiredName: String?) { - self.handle = handle - self.desiredName = desiredName - } - - /// Execute the given body with the `pthread_t` that is used by this `Thread` as argument. - /// - /// - warning: Do not escape `pthread_t` from the closure for later use. - /// - /// - parameters: - /// - body: The closure that will accept the `pthread_t`. - /// - returns: The value returned by `body`. - internal func withUnsafeThreadHandle( - _ body: (ThreadOpsSystem.ThreadHandle) throws -> T - ) rethrows -> T { - return try body(self.handle) - } - - /// Get current name of the `Thread` or `nil` if not set. - var currentName: String? { - return ThreadOpsSystem.threadName(self.handle) - } - - func join() { - ThreadOpsSystem.joinThread(self.handle) - } - - /// Spawns and runs some task in a `Thread`. - /// - /// - arguments: - /// - name: The name of the `Thread` or `nil` if no specific name should be set. - /// - body: The function to execute within the spawned `Thread`. - /// - detach: Whether to detach the thread. If the thread is not detached it must be `join`ed. - static func spawnAndRun( - name: String? = nil, - detachThread: Bool = true, - body: @escaping (Thread) -> Void - ) { - var handle: ThreadOpsSystem.ThreadHandle? = nil - - // Store everything we want to pass into the c function in a Box so we - // can hand-over the reference. - let tuple: ThreadBoxValue = (body: body, name: name) - let box = ThreadBox(tuple) - - ThreadOpsSystem.run(handle: &handle, args: box, detachThread: detachThread) - } -} -#endif diff --git a/Sources/NIOFileSystem/Internal/Concurrency Primitives/ThreadPosix.swift b/Sources/NIOFileSystem/Internal/Concurrency Primitives/ThreadPosix.swift deleted file mode 100644 index e6421dfd48..0000000000 --- a/Sources/NIOFileSystem/Internal/Concurrency Primitives/ThreadPosix.swift +++ /dev/null @@ -1,156 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftNIO open source project -// -// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftNIO project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) || os(Linux) || os(Android) -#if canImport(Glibc) || canImport(Musl) -import CNIOLinux - -private let sys_pthread_getname_np = CNIOLinux_pthread_getname_np -private let sys_pthread_setname_np = CNIOLinux_pthread_setname_np -#if os(Android) -private typealias ThreadDestructor = @convention(c) (UnsafeMutableRawPointer) -> - UnsafeMutableRawPointer -#else -private typealias ThreadDestructor = @convention(c) (UnsafeMutableRawPointer?) -> - UnsafeMutableRawPointer? -#endif -#elseif canImport(Darwin) -import Darwin - -private let sys_pthread_getname_np = pthread_getname_np -// Emulate the same method signature as pthread_setname_np on Linux. -private func sys_pthread_setname_np(_ p: pthread_t, _ pointer: UnsafePointer) -> Int32 { - assert(pthread_equal(pthread_self(), p) != 0) - pthread_setname_np(pointer) - // Will never fail on macOS so just return 0 which will be used on linux to signal it not failed. - return 0 -} -private typealias ThreadDestructor = @convention(c) (UnsafeMutableRawPointer) -> - UnsafeMutableRawPointer? -#endif - -private func sysPthread_create( - handle: UnsafeMutablePointer, - destructor: @escaping ThreadDestructor, - args: UnsafeMutableRawPointer? -) -> CInt { - #if canImport(Darwin) - return pthread_create(handle, nil, destructor, args) - #elseif canImport(Glibc) || canImport(Musl) - #if canImport(Glibc) - var handleLinux = pthread_t() - #else - var handleLinux = pthread_t(bitPattern: 0) - #endif - let result = pthread_create( - &handleLinux, - nil, - destructor, - args - ) - handle.pointee = handleLinux - return result - #endif -} - -typealias ThreadOpsSystem = ThreadOpsPosix - -enum ThreadOpsPosix: ThreadOps { - typealias ThreadHandle = pthread_t - typealias ThreadSpecificKey = pthread_key_t - #if canImport(Darwin) - typealias ThreadSpecificKeyDestructor = @convention(c) (UnsafeMutableRawPointer) -> Void - #elseif canImport(Glibc) || canImport(Musl) - typealias ThreadSpecificKeyDestructor = @convention(c) (UnsafeMutableRawPointer?) -> Void - #endif - - static func threadName(_ thread: ThreadOpsSystem.ThreadHandle) -> String? { - // 64 bytes should be good enough as on Linux the limit is usually 16 - // and it's very unlikely a user will ever set something longer - // anyway. - var chars: [CChar] = Array(repeating: 0, count: 64) - return chars.withUnsafeMutableBufferPointer { ptr in - guard sys_pthread_getname_np(thread, ptr.baseAddress!, ptr.count) == 0 else { - return nil - } - - let buffer: UnsafeRawBufferPointer = - UnsafeRawBufferPointer(UnsafeBufferPointer(rebasing: ptr.prefix { $0 != 0 })) - return String(decoding: buffer, as: Unicode.UTF8.self) - } - } - - static func run( - handle: inout ThreadOpsSystem.ThreadHandle?, - args: Ref, - detachThread: Bool - ) { - let argv0 = Unmanaged.passRetained(args).toOpaque() - let res = sysPthread_create( - handle: &handle, - destructor: { - // Cast to UnsafeMutableRawPointer? and force unwrap to make the - // same code work on macOS and Linux. - let boxed = Unmanaged - .fromOpaque(($0 as UnsafeMutableRawPointer?)!) - .takeRetainedValue() - let (body, name) = (boxed.value.body, boxed.value.name) - let hThread: ThreadOpsSystem.ThreadHandle = pthread_self() - - if let name = name { - let maximumThreadNameLength: Int - #if canImport(Glibc) || canImport(Musl) - maximumThreadNameLength = 15 - #elseif canImport(Darwin) - maximumThreadNameLength = .max - #endif - name.prefix(maximumThreadNameLength).withCString { namePtr in - // this is non-critical so we ignore the result here, we've seen - // EPERM in containers. - _ = sys_pthread_setname_np(hThread, namePtr) - } - } - - body(Thread(handle: hThread, desiredName: name)) - - #if os(Android) - return UnsafeMutableRawPointer(bitPattern: 0xdeadbee)! - #else - return nil - #endif - }, - args: argv0 - ) - precondition(res == 0, "Unable to create thread: \(res)") - - if detachThread { - let detachError = pthread_detach(handle!) - precondition(detachError == 0, "pthread_detach failed with error \(detachError)") - } - - } - - static func joinThread(_ thread: ThreadOpsSystem.ThreadHandle) { - let err = pthread_join(thread, nil) - assert(err == 0, "pthread_join failed with \(err)") - } - - static func compareThreads( - _ lhs: ThreadOpsSystem.ThreadHandle, - _ rhs: ThreadOpsSystem.ThreadHandle - ) -> Bool { - return pthread_equal(lhs, rhs) != 0 - } -} -#endif diff --git a/Sources/NIOFileSystem/Internal/SystemFileHandle.swift b/Sources/NIOFileSystem/Internal/SystemFileHandle.swift index 597c902eb0..d080b39523 100644 --- a/Sources/NIOFileSystem/Internal/SystemFileHandle.swift +++ b/Sources/NIOFileSystem/Internal/SystemFileHandle.swift @@ -15,6 +15,7 @@ #if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) || os(Linux) || os(Android) import NIOConcurrencyHelpers import NIOCore +import NIOPosix @preconcurrency import SystemPackage #if canImport(Darwin) @@ -31,7 +32,7 @@ import Musl @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) public final class SystemFileHandle { /// The executor on which to execute system calls. - internal var executor: IOExecutor { self.sendableView.executor } + internal var threadPool: NIOThreadPool { self.sendableView.threadPool } /// The path used to open this handle. internal var path: FilePath { self.sendableView.path } @@ -77,7 +78,7 @@ public final class SystemFileHandle { fileprivate let lifecycle: NIOLockedValueBox /// The executor on which to execute system calls. - internal let executor: IOExecutor + internal let threadPool: NIOThreadPool /// The path used to open this handle. internal let path: FilePath @@ -87,12 +88,12 @@ public final class SystemFileHandle { fileprivate init( lifecycle: Lifecycle, - executor: IOExecutor, + threadPool: NIOThreadPool, path: FilePath, materialization: Materialization? ) { self.lifecycle = NIOLockedValueBox(lifecycle) - self.executor = executor + self.threadPool = threadPool self.path = path self.materialization = materialization } @@ -110,11 +111,11 @@ public final class SystemFileHandle { takingOwnershipOf descriptor: FileDescriptor, path: FilePath, materialization: Materialization? = nil, - executor: IOExecutor + threadPool: NIOThreadPool ) { self.sendableView = SendableView( lifecycle: .open(descriptor), - executor: executor, + threadPool: threadPool, path: path, materialization: materialization ) @@ -193,37 +194,37 @@ extension SystemFileHandle: FileHandleProtocol { // currently using. public func info() async throws -> FileInfo { - return try await self.executor.execute { [sendableView] in + return try await self.threadPool.runIfActive { [sendableView] in try sendableView._info().get() } } public func replacePermissions(_ permissions: FilePermissions) async throws { - return try await self.executor.execute { [sendableView] in + return try await self.threadPool.runIfActive { [sendableView] in try sendableView._replacePermissions(permissions) } } public func addPermissions(_ permissions: FilePermissions) async throws -> FilePermissions { - return try await self.executor.execute { [sendableView] in + return try await self.threadPool.runIfActive { [sendableView] in try sendableView._addPermissions(permissions) } } public func removePermissions(_ permissions: FilePermissions) async throws -> FilePermissions { - return try await self.executor.execute { [sendableView] in + return try await self.threadPool.runIfActive { [sendableView] in try sendableView._removePermissions(permissions) } } public func attributeNames() async throws -> [String] { - return try await self.executor.execute { [sendableView] in + return try await self.threadPool.runIfActive { [sendableView] in try sendableView._attributeNames() } } public func valueForAttribute(_ name: String) async throws -> [UInt8] { - return try await self.executor.execute { [sendableView] in + return try await self.threadPool.runIfActive { [sendableView] in try sendableView._valueForAttribute(name) } } @@ -232,19 +233,19 @@ extension SystemFileHandle: FileHandleProtocol { _ bytes: some (Sendable & RandomAccessCollection), attribute name: String ) async throws { - return try await self.executor.execute { [sendableView] in + return try await self.threadPool.runIfActive { [sendableView] in try sendableView._updateValueForAttribute(bytes, attribute: name) } } public func removeValueForAttribute(_ name: String) async throws { - return try await self.executor.execute { [sendableView] in + return try await self.threadPool.runIfActive { [sendableView] in try sendableView._removeValueForAttribute(name) } } public func synchronize() async throws { - return try await self.executor.execute { [sendableView] in + return try await self.threadPool.runIfActive { [sendableView] in try sendableView._synchronize() } } @@ -252,7 +253,7 @@ extension SystemFileHandle: FileHandleProtocol { public func withUnsafeDescriptor( _ execute: @Sendable @escaping (FileDescriptor) throws -> R ) async throws -> R { - try await self.executor.execute { [sendableView] in + try await self.threadPool.runIfActive { [sendableView] in try sendableView._withUnsafeDescriptor { return try execute($0) } onUnavailable: { @@ -343,13 +344,13 @@ extension SystemFileHandle: FileHandleProtocol { } public func close() async throws { - try await self.executor.execute { [sendableView] in + try await self.threadPool.runIfActive { [sendableView] in try sendableView._close(materialize: true).get() } } public func close(makeChangesVisible: Bool) async throws { - try await self.executor.execute { [sendableView] in + try await self.threadPool.runIfActive { [sendableView] in try sendableView._close(materialize: makeChangesVisible).get() } } @@ -864,7 +865,7 @@ extension SystemFileHandle: ReadableFileHandleProtocol { fromAbsoluteOffset offset: Int64, length: ByteCount ) async throws -> ByteBuffer { - return try await self.executor.execute { [sendableView] in + return try await self.threadPool.runIfActive { [sendableView] in return try sendableView._withUnsafeDescriptor { descriptor in try descriptor.readChunk( fromAbsoluteOffset: offset, @@ -930,7 +931,7 @@ extension SystemFileHandle: WritableFileHandleProtocol { contentsOf bytes: some (Sequence & Sendable), toAbsoluteOffset offset: Int64 ) async throws -> Int64 { - return try await self.executor.execute { [sendableView] in + return try await self.threadPool.runIfActive { [sendableView] in return try sendableView._withUnsafeDescriptor { descriptor in try descriptor.write(contentsOf: bytes, toAbsoluteOffset: offset) .flatMapError { error in @@ -979,7 +980,7 @@ extension SystemFileHandle: WritableFileHandleProtocol { } public func resize(to size: ByteCount) async throws { - try await self.executor.execute { [sendableView] in + try await self.threadPool.runIfActive { [sendableView] in try sendableView._resize(to: size).get() } } @@ -1022,7 +1023,7 @@ extension SystemFileHandle: DirectoryFileHandleProtocol { options: OpenOptions.Read ) async throws -> SystemFileHandle { let opts = options.descriptorOptions.union(.nonBlocking) - let handle = try await self.executor.execute { [sendableView] in + let handle = try await self.threadPool.runIfActive { [sendableView] in let handle = try sendableView._open( atPath: path, mode: .readOnly, @@ -1041,7 +1042,7 @@ extension SystemFileHandle: DirectoryFileHandleProtocol { ) async throws -> SystemFileHandle { let perms = options.permissionsForRegularFile let opts = options.descriptorOptions.union(.nonBlocking) - let handle = try await self.executor.execute { [sendableView] in + let handle = try await self.threadPool.runIfActive { [sendableView] in let handle = try sendableView._open( atPath: path, mode: .readWrite, @@ -1061,7 +1062,7 @@ extension SystemFileHandle: DirectoryFileHandleProtocol { ) async throws -> SystemFileHandle { let perms = options.permissionsForRegularFile let opts = options.descriptorOptions.union(.nonBlocking) - let handle = try await self.executor.execute { [sendableView] in + let handle = try await self.threadPool.runIfActive { [sendableView] in let handle = try sendableView._open( atPath: path, mode: .writeOnly, @@ -1080,7 +1081,7 @@ extension SystemFileHandle: DirectoryFileHandleProtocol { options: OpenOptions.Directory ) async throws -> SystemFileHandle { let opts = options.descriptorOptions.union(.nonBlocking) - let handle = try await self.executor.execute { [sendableView] in + let handle = try await self.threadPool.runIfActive { [sendableView] in let handle = try sendableView._open( atPath: path, mode: .readOnly, @@ -1112,7 +1113,7 @@ extension SystemFileHandle.SendableView { options: options, permissions: permissions, transactionalIfPossible: transactional, - executor: self.executor + threadPool: self.threadPool ) } else if self.path.isAbsolute { // The parent path is absolute and the provided path is relative; combine them. @@ -1122,7 +1123,7 @@ extension SystemFileHandle.SendableView { options: options, permissions: permissions, transactionalIfPossible: transactional, - executor: self.executor + threadPool: self.threadPool ) } @@ -1143,7 +1144,7 @@ extension SystemFileHandle.SendableView { SystemFileHandle( takingOwnershipOf: newDescriptor, path: self.path.appending(path.components).lexicallyNormalized(), - executor: self.executor + threadPool: self.threadPool ) }.mapError { errno in .open("openat", error: errno, path: self.path, location: .here()) @@ -1170,7 +1171,7 @@ extension SystemFileHandle { options: FileDescriptor.OpenOptions, permissions: FilePermissions?, transactionalIfPossible transactional: Bool, - executor: IOExecutor + threadPool: NIOThreadPool ) -> Result { let isWritable = (mode == .writeOnly || mode == .readWrite) let exclusiveCreate = options.contains(.exclusiveCreate) @@ -1194,7 +1195,7 @@ extension SystemFileHandle { mode: mode, options: options, permissions: permissions, - executor: executor, + threadPool: threadPool, useTemporaryFileIfPossible: temporaryHardLink ) } else { @@ -1203,7 +1204,7 @@ extension SystemFileHandle { mode: mode, options: options, permissions: permissions, - executor: executor + threadPool: threadPool ) } } @@ -1213,7 +1214,7 @@ extension SystemFileHandle { mode: FileDescriptor.AccessMode, options: FileDescriptor.OpenOptions, permissions: FilePermissions?, - executor: IOExecutor + threadPool: NIOThreadPool ) -> Result { return Result { try FileDescriptor.open( @@ -1226,7 +1227,7 @@ extension SystemFileHandle { SystemFileHandle( takingOwnershipOf: descriptor, path: path, - executor: executor + threadPool: threadPool ) }.mapError { errno in FileSystemError.open("open", error: errno, path: path, location: .here()) @@ -1238,7 +1239,7 @@ extension SystemFileHandle { mode: FileDescriptor.AccessMode, options originalOptions: FileDescriptor.OpenOptions, permissions: FilePermissions?, - executor: IOExecutor, + threadPool: NIOThreadPool, useTemporaryFileIfPossible: Bool = true ) -> Result { let openedPath: FilePath @@ -1351,7 +1352,7 @@ extension SystemFileHandle { takingOwnershipOf: descriptor, path: openedPath, materialization: materialization, - executor: executor + threadPool: threadPool ) return .success(handle) @@ -1365,7 +1366,7 @@ extension SystemFileHandle { mode: mode, options: originalOptions, permissions: permissions, - executor: executor, + threadPool: threadPool, useTemporaryFileIfPossible: false ) } diff --git a/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift b/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift index 01beefcb70..9a5e845d6e 100644 --- a/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift +++ b/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift @@ -14,6 +14,7 @@ #if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) || os(Linux) || os(Android) import NIOCore +import NIOPosix @_spi(Testing) import _NIOFileSystem import NIOFoundationCompat import XCTest @@ -83,24 +84,24 @@ final class FileHandleTests: XCTestCase { options: options, permissions: permissions ) - let executor = await IOExecutor.running(numberOfThreads: 1) - let handle = SystemFileHandle(takingOwnershipOf: descriptor, path: path, executor: executor) + let handle = SystemFileHandle( + takingOwnershipOf: descriptor, + path: path, + threadPool: .singleton + ) do { try await execute(handle) if autoClose { try? await handle.close() } - await executor.drain() } catch let skip as XCTSkip { try? await handle.close() - await executor.drain() throw skip } catch { XCTFail("Test threw error: '\(error)'") // Always close on error. try await handle.close() - await executor.drain() } } diff --git a/Tests/NIOFileSystemTests/FileHandleTests.swift b/Tests/NIOFileSystemTests/FileHandleTests.swift index cb889e16fd..22ba37a32c 100644 --- a/Tests/NIOFileSystemTests/FileHandleTests.swift +++ b/Tests/NIOFileSystemTests/FileHandleTests.swift @@ -14,6 +14,7 @@ #if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) || os(Linux) || os(Android) @_spi(Testing) import _NIOFileSystem +import NIOPosix import XCTest #if ENABLE_MOCKING @@ -26,12 +27,12 @@ internal final class FileHandleTests: XCTestCase { #if ENABLE_MOCKING // The executor is required to create the handle, we won't do any async work if we're // mocking as the driver requires synchronous code (as it uses thread local storage). - let executor = await IOExecutor.running(numberOfThreads: 1) - await executor.drain() + let threadPool = NIOThreadPool(numberOfThreads: 1) + try! await threadPool.shutdownGracefully() // Not a real descriptor. let descriptor = FileDescriptor(rawValue: -1) - let handle = SystemFileHandle(takingOwnershipOf: descriptor, path: path, executor: executor) + let handle = SystemFileHandle(takingOwnershipOf: descriptor, path: path, threadPool: threadPool) defer { // Not a 'real' descriptor so just detach to avoid "leaking" the descriptor and // trapping in the deinit of handle. diff --git a/Tests/NIOFileSystemTests/Internal/Concurrency Primitives/IOExecutorTests.swift b/Tests/NIOFileSystemTests/Internal/Concurrency Primitives/IOExecutorTests.swift deleted file mode 100644 index d9cb596967..0000000000 --- a/Tests/NIOFileSystemTests/Internal/Concurrency Primitives/IOExecutorTests.swift +++ /dev/null @@ -1,176 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftNIO open source project -// -// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftNIO project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) || os(Linux) || os(Android) -@_spi(Testing) import _NIOFileSystem -import XCTest - -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -internal final class IOExecutorTests: XCTestCase { - func testExecuteAsync() async throws { - try await withExecutor { executor in - let n = try await executor.execute { - return fibonacci(13) - } - XCTAssertEqual(n, 233) - } - } - - func testExecuteWithCallback() async throws { - try await withExecutor { executor in - await withCheckedContinuation { continuation in - executor.execute { - fibonacci(13) - } onCompletion: { result in - switch result { - case let .success(n): - XCTAssertEqual(n, 233) - case let .failure(error): - XCTFail("Unexpected error: \(error)") - } - continuation.resume() - } - } - } - } - - func testExecuteManyWorkItemsOn2Threads() async throws { - // There are (slightly) different code paths depending on the number - // of threads which determine which executor to pick so run this test - // on 2 and 4 threads to exercise those paths. - try await self.testExecuteManyWorkItems(threadCount: 2) - } - - func testExecuteManyWorkItemsOn4Threads() async throws { - // There are (slightly) different code paths depending on the number - // of threads which determine which executor to pick so run this test - // on 2 and 4 threads to exercise those paths. - try await self.testExecuteManyWorkItems(threadCount: 4) - } - - func testExecuteManyWorkItems(threadCount threads: Int) async throws { - try await withExecutor(numberOfThreads: threads) { executor in - try await withThrowingTaskGroup(of: Int.self) { group in - let values = Array(0..<10_000) - - for value in values { - group.addTask { - try await executor.execute { return value } - } - } - - // Wait for the values. - let processed = try await group.reduce(into: [], { $0.append($1) }) - - // They should be the same but may be in a different order. - XCTAssertEqual(processed.sorted(), values) - } - } - } - - func testExecuteCancellation() async throws { - try await withExecutor { executor in - await withThrowingTaskGroup(of: Void.self) { group in - group.cancelAll() - group.addTask { - try await executor.execute { - XCTFail("Should be cancelled before executed") - } - } - - await XCTAssertThrowsErrorAsync { - try await group.waitForAll() - } onError: { error in - XCTAssert(error is CancellationError) - } - } - } - } - - func testDrainWhenEmpty() async throws { - let executor = await IOExecutor.running(numberOfThreads: 1) - await executor.drain() - } - - func testExecuteAfterDraining() async throws { - let executor = await IOExecutor.running(numberOfThreads: 1) - await executor.drain() - - // Callback should fire with an error. - executor.execute { - XCTFail("Work unexpectedly ran in drained pool.") - } onCompletion: { result in - switch result { - case .success: - XCTFail("Work unexpectedly ran in drained pool.") - case let .failure(error): - if let fsError = error as? FileSystemError { - XCTAssertEqual(fsError.code, .unavailable) - } else { - XCTFail("Unexpected error: \(error)") - } - } - } - - // Should throw immediately. - await XCTAssertThrowsErrorAsync { - try await executor.execute { - XCTFail("Work unexpectedly ran in drained pool.") - } - } onError: { error in - if let fsError = error as? FileSystemError { - XCTAssertEqual(fsError.code, .unavailable) - } else { - XCTFail("Unexpected error: \(error)") - } - } - } - - func testWorkWhenStartingAsync() async throws { - try await withThrowingTaskGroup(of: Int.self) { group in - let executor = IOExecutor.runningAsync(numberOfThreads: 4) - for _ in 0..<1000 { - group.addTask { - try await executor.execute { fibonacci(8) } - } - } - - try await group.waitForAll() - await executor.drain() - } - } -} - -private func fibonacci(_ n: Int) -> Int { - if n < 2 { - return n - } else { - return fibonacci(n - 1) + fibonacci(n - 2) - } -} - -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -private func withExecutor( - numberOfThreads: Int = 1, - execute: (IOExecutor) async throws -> Void -) async throws { - let executor = await IOExecutor.running(numberOfThreads: numberOfThreads) - do { - try await execute(executor) - await executor.drain() - } catch { - await executor.drain() - } -} -#endif