Skip to content

Commit

Permalink
Alternate immediate & scheduled tasks pickup, to avoid starvation
Browse files Browse the repository at this point in the history
  • Loading branch information
gmilos committed Feb 13, 2024
1 parent 5751d70 commit 1b7ad7d
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 47 deletions.
114 changes: 67 additions & 47 deletions Sources/NIOPosix/SelectableEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,72 @@ Further information:
}
}
}

private func runLoop() -> NIODeadline? {
// We need to ensure we process all tasks, even if a task added another task again
while true {
// TODO: Better locking
let nextReadyDeadline = self._tasksLock.withLock { () -> NIODeadline? in
var moreImmediateTasksToConsider = !self._immediateTasks.isEmpty
var moreScheduledTasksToConsider = !self._scheduledTasks.isEmpty

guard moreImmediateTasksToConsider || moreScheduledTasksToConsider else {
// We will not continue to loop here. We need to be woken if a new task is enqueued.
self._pendingTaskPop = false

// Reset nextReadyDeadline to nil which means we will do a blocking select.
return nil
}

// We only fetch the time one time as this may be expensive and is generally good enough as if we miss anything we will just do a non-blocking select again anyway.
let now: NIODeadline = .now()
var nextScheduledTaskDeadline = now

while self.tasksCopy.count < self.tasksCopy.capacity &&
(moreImmediateTasksToConsider || moreScheduledTasksToConsider) {
// We pick one item from self._immediateTasks & self._scheduledTask per iteration of the loop.
// This prevents one task queue starving the other.
if moreImmediateTasksToConsider, let task = self._immediateTasks.popFirst() {
self.tasksCopy.append(task)
} else {
moreImmediateTasksToConsider = false
}

if moreScheduledTasksToConsider, let task = self._scheduledTasks.peek() {
if task.readyTime.readyIn(now) <= .nanoseconds(0) {
self._scheduledTasks.pop()
self.tasksCopy.append(.function(task.task))
} else {
nextScheduledTaskDeadline = task.readyTime
moreScheduledTasksToConsider = false
}
} else {
moreScheduledTasksToConsider = false
}
}

if self.tasksCopy.isEmpty {
// Rare, but it's possible to find no tasks to execute if all scheduled tasks are expiring in the future.
self._pendingTaskPop = false
}

// nextScheduledTaskDeadline is the overall next deadline, but iff there are no more immediate tasks left.
return moreImmediateTasksToConsider ? now : nextScheduledTaskDeadline
}

// all pending tasks are set to occur in the future, so we can stop looping.
if self.tasksCopy.isEmpty {
return nextReadyDeadline
}

// Execute all the tasks that were submitted
for task in self.tasksCopy {
run(task)
}
// Drop everything (but keep the capacity) so we can fill it again on the next iteration.
self.tasksCopy.removeAll(keepingCapacity: true)
}
}

/// Start processing I/O and tasks for this `SelectableEventLoop`. This method will continue running (and so block) until the `SelectableEventLoop` is closed.
internal func run() throws {
Expand Down Expand Up @@ -530,53 +596,7 @@ Further information:
}
}
}

// We need to ensure we process all tasks, even if a task added another task again
while true {
// TODO: Better locking
self._tasksLock.withLock { () -> Void in
if !self._immediateTasks.isEmpty {
while self.tasksCopy.count < self.tasksCopy.capacity, let task = self._immediateTasks.popFirst() {
self.tasksCopy.append(task)
}
} else
if !self._scheduledTasks.isEmpty {
// We only fetch the time one time as this may be expensive and is generally good enough as if we miss anything we will just do a non-blocking select again anyway.
let now: NIODeadline = .now()

// Make a copy of the tasks so we can execute these while not holding the lock anymore
while self.tasksCopy.count < self.tasksCopy.capacity, let task = self._scheduledTasks.peek() {
if task.readyTime.readyIn(now) <= .nanoseconds(0) {
self._scheduledTasks.pop()
self.tasksCopy.append(.function(task.task))
} else {
nextReadyDeadline = task.readyTime
break
}
}
} else {
// Reset nextreadyDeadline to nil which means we will do a blocking select.
nextReadyDeadline = nil
}

if self.tasksCopy.isEmpty {
// We will not continue to loop here. We need to be woken if a new task is enqueued.
self._pendingTaskPop = false
}
}

// all pending tasks are set to occur in the future, so we can stop looping.
if self.tasksCopy.isEmpty {
break
}

// Execute all the tasks that were submitted
for task in self.tasksCopy {
run(task)
}
// Drop everything (but keep the capacity) so we can fill it again on the next iteration.
self.tasksCopy.removeAll(keepingCapacity: true)
}
nextReadyDeadline = runLoop()
}

// This EventLoop was closed so also close the underlying selector.
Expand Down
28 changes: 28 additions & 0 deletions Tests/NIOPosixTests/EventLoopTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1660,6 +1660,34 @@ public final class EventLoopTest : XCTestCase {
XCTAssert(error is DummyError)
}
}

// Test for possible starvation discussed here: https://github.com/apple/swift-nio/pull/2645#discussion_r1486747118
func testNonStarvation() throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}

let eventLoop = group.next()
var stop = false // no additional synchronisation needed, since only one thread is used
var reExecuteTask: (() -> Void)!
reExecuteTask = {
if !stop {
eventLoop.execute(reExecuteTask)
}
}
eventLoop.execute {
// SelectableEventLoop runs batches of up to 4096.
// Submit significantly over that for good measure.
(0..<10000).forEach { _ in
eventLoop.execute(reExecuteTask)
}
}
let stopTask = eventLoop.scheduleTask(in: .microseconds(10)) {
stop = true
}
try stopTask.futureResult.wait()
}
}

fileprivate class EventLoopWithPreSucceededFuture: EventLoop {
Expand Down

0 comments on commit 1b7ad7d

Please sign in to comment.