Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use NIOThreadPool in NIOFileSystem #2692

Merged
merged 2 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ let package = Package(
name: "_NIOFileSystem",
dependencies: [
"NIOCore",
"NIOPosix",
"CNIOLinux",
"CNIODarwin",
swiftAtomics,
Expand Down Expand Up @@ -488,6 +489,7 @@ let package = Package(
name: "NIOFileSystemIntegrationTests",
dependencies: [
"NIOCore",
"NIOPosix",
"_NIOFileSystem",
"NIOFoundationCompat",
],
Expand Down
62 changes: 32 additions & 30 deletions Sources/NIOFileSystem/DirectoryEntries.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import CNIODarwin
import CNIOLinux
import NIOConcurrencyHelpers
import NIOPosix
@preconcurrency import SystemPackage

/// An `AsyncSequence` of entries in a directory.
Expand Down Expand Up @@ -151,16 +152,14 @@ extension BufferedStream where Element == [DirectoryEntry] {
)

source.onTermination = {
guard let executor = protectedState.withLockedValue({ $0.executorForClosing() }) else {
guard let threadPool = protectedState.withLockedValue({ $0.threadPoolForClosing() }) else {
return
}

executor.execute {
threadPool.submit { _ in // always run, even if cancelled
protectedState.withLockedValue { state in
state.closeIfNecessary()
}
} onCompletion: { _ in
// Ignore the result.
}
}

Expand Down Expand Up @@ -189,16 +188,21 @@ private struct DirectoryEntryProducer {
/// source it will either produce more or be scheduled to produce more. Stopping production
/// is signalled via the stream's 'onTermination' handler.
func produceMore() {
let executor = self.state.withLockedValue { state in
let threadPool = self.state.withLockedValue { state in
state.produceMore()
}

// No executor means we're done.
guard let executor = executor else { return }
// No thread pool means we're done.
guard let threadPool = threadPool else { return }

executor.execute {
try self.nextBatch()
} onCompletion: { result in
threadPool.submit {
let result: Result<[DirectoryEntry], Error>
switch $0 {
case .active:
result = Result { try self.nextBatch() }
case .cancelled:
result = .failure(CancellationError())
}
self.onNextBatchResult(result)
}
}
Expand Down Expand Up @@ -261,16 +265,14 @@ private struct DirectoryEntryProducer {
}

private func close() {
guard let executor = self.state.withLockedValue({ $0.executorForClosing() }) else {
guard let threadPool = self.state.withLockedValue({ $0.threadPoolForClosing() }) else {
return
}

executor.execute {
threadPool.submit { _ in // always run, even if cancelled
self.state.withLockedValue { state in
state.closeIfNecessary()
}
} onCompletion: { _ in
// Ignore.
}
}
}
Expand All @@ -283,7 +285,7 @@ private struct DirectoryEnumerator: Sendable {
private enum State: @unchecked Sendable {
case modifying
case idle(SystemFileHandle.SendableView, recursive: Bool)
case open(IOExecutor, Source, [DirectoryEntry])
case open(NIOThreadPool, Source, [DirectoryEntry])
case done
}

Expand Down Expand Up @@ -351,23 +353,23 @@ private struct DirectoryEnumerator: Sendable {
self.path = handle.path
}

internal func produceMore() -> IOExecutor? {
internal func produceMore() -> NIOThreadPool? {
switch self.state {
case let .idle(handle, _):
return handle.executor
case let .open(executor, _, _):
return executor
return handle.threadPool
case let .open(threadPool, _, _):
return threadPool
case .done:
return nil
case .modifying:
fatalError()
}
}

internal func executorForClosing() -> IOExecutor? {
internal func threadPoolForClosing() -> NIOThreadPool? {
switch self.state {
case let .open(executor, _, _):
return executor
case let .open(threadPool, _, _):
return threadPool
case .idle, .done:
// Don't need to close in the idle state: we don't own the handle.
return nil
Expand Down Expand Up @@ -449,7 +451,7 @@ private struct DirectoryEnumerator: Sendable {
}

private mutating func processOpenState(
executor: IOExecutor,
threadPool: NIOThreadPool,
dir: CInterop.DirPointer,
entries: inout [DirectoryEntry],
count: Int
Expand Down Expand Up @@ -499,11 +501,11 @@ private struct DirectoryEnumerator: Sendable {
}

// We must have hit our 'count' limit.
return (.open(executor, .readdir(dir), entries), .yield(.success(entries)))
return (.open(threadPool, .readdir(dir), entries), .yield(.success(entries)))
}

private mutating func processOpenState(
executor: IOExecutor,
threadPool: NIOThreadPool,
fts: CInterop.FTSPointer,
entries: inout [DirectoryEntry],
count: Int
Expand Down Expand Up @@ -580,7 +582,7 @@ private struct DirectoryEnumerator: Sendable {
}

// We must have hit our 'count' limit.
return (.open(executor, .fts(fts), entries), .yield(.success(entries)))
return (.open(threadPool, .fts(fts), entries), .yield(.success(entries)))
}

private mutating func process(_ count: Int) -> ProcessResult {
Expand All @@ -596,21 +598,21 @@ private struct DirectoryEnumerator: Sendable {

switch result {
case let .success(source):
self.state = .open(handle.executor, source, [])
self.state = .open(handle.threadPool, source, [])
return .continue

case let .failure(error):
self.state = .done
return .yield(.failure(error))
}

case .open(let executor, let mode, var entries):
case .open(let threadPool, let mode, var entries):
self.state = .modifying

switch mode {
case .readdir(let dir):
let (state, result) = self.processOpenState(
executor: executor,
threadPool: threadPool,
dir: dir,
entries: &entries,
count: count
Expand All @@ -620,7 +622,7 @@ private struct DirectoryEnumerator: Sendable {

case .fts(let fts):
let (state, result) = self.processOpenState(
executor: executor,
threadPool: threadPool,
fts: fts,
entries: &entries,
count: count
Expand Down
22 changes: 14 additions & 8 deletions Sources/NIOFileSystem/FileChunks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) || os(Linux) || os(Android)
import NIOConcurrencyHelpers
import NIOCore
import NIOPosix
@preconcurrency import SystemPackage

/// An `AsyncSequence` of ordered chunks read from a file.
Expand Down Expand Up @@ -140,16 +141,21 @@ private struct FileChunkProducer: Sendable {
/// source it will either produce more or be scheduled to produce more. Stopping production
/// is signalled via the stream's 'onTermination' handler.
func produceMore() {
let executor = self.state.withLockedValue { state in
let threadPool = self.state.withLockedValue { state in
state.shouldProduceMore()
}

// No executor means we're done.
guard let executor = executor else { return }

executor.execute {
try self.readNextChunk()
} onCompletion: { result in
guard let threadPool = threadPool else { return }

threadPool.submit {
let result: Result<ByteBuffer, Error>
switch $0 {
case .active:
result = Result { try self.readNextChunk() }
case .cancelled:
result = .failure(CancellationError())
}
self.onReadNextChunkResult(result)
}
}
Expand Down Expand Up @@ -273,10 +279,10 @@ private struct ProducerState: Sendable {
}
}

mutating func shouldProduceMore() -> IOExecutor? {
mutating func shouldProduceMore() -> NIOThreadPool? {
switch self.state {
case let .producing(state):
return state.handle.executor
return state.handle.threadPool
case .done:
return nil
}
Expand Down
Loading