Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix thread pool hang #56346

Merged
merged 4 commits into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -595,9 +595,7 @@ internal override int Read(Span<byte> buffer)
byte[] rentedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
try
{
Task<int> t = ReadAsync(new Memory<byte>(rentedBuffer, 0, buffer.Length)).AsTask();
((IAsyncResult)t).AsyncWaitHandle.WaitOne();
int readLength = t.GetAwaiter().GetResult();
int readLength = ReadAsync(new Memory<byte>(rentedBuffer, 0, buffer.Length)).AsTask().GetAwaiter().GetResult();
rentedBuffer.AsSpan(0, readLength).CopyTo(buffer);
return readLength;
}
Expand All @@ -612,9 +610,7 @@ internal override void Write(ReadOnlySpan<byte> buffer)
ThrowIfDisposed();

// TODO: optimize this.
Task t = WriteAsync(buffer.ToArray()).AsTask();
((IAsyncResult)t).AsyncWaitHandle.WaitOne();
t.GetAwaiter().GetResult();
WriteAsync(buffer.ToArray()).AsTask().GetAwaiter().GetResult();
}

// MsQuic doesn't support explicit flushing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public short MinThreadsGoal
get
{
_threadAdjustmentLock.VerifyIsLocked();
return Math.Min(_separated.numThreadsGoal, TargetThreadsGoalForBlockingAdjustment);
return Math.Min(_separated.counts.NumThreadsGoal, TargetThreadsGoalForBlockingAdjustment);
}
}

Expand Down Expand Up @@ -44,7 +44,7 @@ public bool NotifyThreadBlocked()
Debug.Assert(_numBlockedThreads > 0);

if (_pendingBlockingAdjustment != PendingBlockingAdjustment.WithDelayIfNecessary &&
_separated.numThreadsGoal < TargetThreadsGoalForBlockingAdjustment)
_separated.counts.NumThreadsGoal < TargetThreadsGoalForBlockingAdjustment)
{
if (_pendingBlockingAdjustment == PendingBlockingAdjustment.None)
{
Expand Down Expand Up @@ -79,7 +79,7 @@ public void NotifyThreadUnblocked()

if (_pendingBlockingAdjustment != PendingBlockingAdjustment.Immediately &&
_numThreadsAddedDueToBlocking > 0 &&
_separated.numThreadsGoal > TargetThreadsGoalForBlockingAdjustment)
_separated.counts.NumThreadsGoal > TargetThreadsGoalForBlockingAdjustment)
{
wakeGateThread = true;
_pendingBlockingAdjustment = PendingBlockingAdjustment.Immediately;
Expand Down Expand Up @@ -126,7 +126,8 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo
addWorker = false;

short targetThreadsGoal = TargetThreadsGoalForBlockingAdjustment;
short numThreadsGoal = _separated.numThreadsGoal;
ThreadCounts counts = _separated.counts;
short numThreadsGoal = counts.NumThreadsGoal;
if (numThreadsGoal == targetThreadsGoal)
{
return 0;
Expand All @@ -144,7 +145,8 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo

short toSubtract = Math.Min((short)(numThreadsGoal - targetThreadsGoal), _numThreadsAddedDueToBlocking);
_numThreadsAddedDueToBlocking -= toSubtract;
_separated.numThreadsGoal = numThreadsGoal -= toSubtract;
numThreadsGoal -= toSubtract;
_separated.counts.InterlockedSetNumThreadsGoal(numThreadsGoal);
HillClimbing.ThreadPoolHillClimber.ForceChange(
numThreadsGoal,
HillClimbing.StateOrTransition.CooperativeBlocking);
Expand All @@ -158,7 +160,6 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo
{
// Calculate how many threads can be added without a delay. Threads that were already created but may be just
// waiting for work can be released for work without a delay, but creating a new thread may need a delay.
ThreadCounts counts = _separated.counts;
short maxThreadsGoalWithoutDelay =
Math.Max(configuredMaxThreadsWithoutDelay, Math.Min(counts.NumExistingThreads, _maxThreads));
short targetThreadsGoalWithoutDelay = Math.Min(targetThreadsGoal, maxThreadsGoalWithoutDelay);
Expand Down Expand Up @@ -225,7 +226,7 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo
} while (false);

_numThreadsAddedDueToBlocking += (short)(newNumThreadsGoal - numThreadsGoal);
_separated.numThreadsGoal = newNumThreadsGoal;
counts = _separated.counts.InterlockedSetNumThreadsGoal(newNumThreadsGoal);
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.CooperativeBlocking);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,20 +126,31 @@ private static void GateThreadStart()
// of the number of existing threads, is compared with the goal. There may be alternative
// solutions, for now this is only to maintain consistency in behavior.
ThreadCounts counts = threadPoolInstance._separated.counts;
if (counts.NumProcessingWork < threadPoolInstance._maxThreads &&
counts.NumProcessingWork >= threadPoolInstance._separated.numThreadsGoal)
while (
counts.NumProcessingWork < threadPoolInstance._maxThreads &&
counts.NumProcessingWork >= counts.NumThreadsGoal)
{
if (debuggerBreakOnWorkStarvation)
{
Debugger.Break();
}

ThreadCounts newCounts = counts;
short newNumThreadsGoal = (short)(counts.NumProcessingWork + 1);
threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal;
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.Starvation);
addWorker = true;
newCounts.NumThreadsGoal = newNumThreadsGoal;

ThreadCounts countsBeforeUpdate =
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (countsBeforeUpdate == counts)
{
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.Starvation);
addWorker = true;
break;
}

counts = countsBeforeUpdate;
}
}
finally
Expand Down Expand Up @@ -183,7 +194,7 @@ private static bool SufficientDelaySinceLastDequeue(PortableThreadPool threadPoo
}
else
{
minimumDelay = (uint)threadPoolInstance._separated.numThreadsGoal * DequeueDelayThresholdMs;
minimumDelay = (uint)threadPoolInstance._separated.counts.NumThreadsGoal * DequeueDelayThresholdMs;
}

return delay > minimumDelay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ private struct ThreadCounts
// SOS's ThreadPool command depends on this layout
private const byte NumProcessingWorkShift = 0;
private const byte NumExistingThreadsShift = 16;
private const byte NumThreadsGoalShift = 32;

private uint _data; // SOS's ThreadPool command depends on this name
private ulong _data; // SOS's ThreadPool command depends on this name

private ThreadCounts(uint data) => _data = data;
private ThreadCounts(ulong data) => _data = data;

private short GetInt16Value(byte shift) => (short)(_data >> shift);
private void SetInt16Value(short value, byte shift) =>
_data = (_data & ~((uint)ushort.MaxValue << shift)) | ((uint)(ushort)value << shift);
_data = (_data & ~((ulong)ushort.MaxValue << shift)) | ((ulong)(ushort)value << shift);

/// <summary>
/// Number of threads processing work items.
Expand All @@ -43,7 +44,7 @@ public void SubtractNumProcessingWork(short value)
Debug.Assert(value >= 0);
Debug.Assert(value <= NumProcessingWork);

_data -= (uint)(ushort)value << NumProcessingWorkShift;
_data -= (ulong)(ushort)value << NumProcessingWorkShift;
}

public void InterlockedDecrementNumProcessingWork()
Expand Down Expand Up @@ -72,19 +73,61 @@ public void SubtractNumExistingThreads(short value)
Debug.Assert(value >= 0);
Debug.Assert(value <= NumExistingThreads);

_data -= (uint)(ushort)value << NumExistingThreadsShift;
_data -= (ulong)(ushort)value << NumExistingThreadsShift;
}

/// <summary>
/// Max possible thread pool threads we want to have.
/// </summary>
public short NumThreadsGoal
{
get => GetInt16Value(NumThreadsGoalShift);
set
{
Debug.Assert(value > 0);
SetInt16Value(value, NumThreadsGoalShift);
}
}

public ThreadCounts InterlockedSetNumThreadsGoal(short value)
{
ThreadPoolInstance._threadAdjustmentLock.VerifyIsLocked();

ThreadCounts counts = this;
while (true)
{
ThreadCounts newCounts = counts;
newCounts.NumThreadsGoal = value;

ThreadCounts countsBeforeUpdate = InterlockedCompareExchange(newCounts, counts);
if (countsBeforeUpdate == counts)
{
return newCounts;
}

counts = countsBeforeUpdate;
}
}

public ThreadCounts VolatileRead() => new ThreadCounts(Volatile.Read(ref _data));

public ThreadCounts InterlockedCompareExchange(ThreadCounts newCounts, ThreadCounts oldCounts) =>
new ThreadCounts(Interlocked.CompareExchange(ref _data, newCounts._data, oldCounts._data));
public ThreadCounts InterlockedCompareExchange(ThreadCounts newCounts, ThreadCounts oldCounts)
{
#if DEBUG
if (newCounts.NumThreadsGoal != oldCounts.NumThreadsGoal)
{
ThreadPoolInstance._threadAdjustmentLock.VerifyIsLocked();
}
#endif

return new ThreadCounts(Interlocked.CompareExchange(ref _data, newCounts._data, oldCounts._data));
}

public static bool operator ==(ThreadCounts lhs, ThreadCounts rhs) => lhs._data == rhs._data;
public static bool operator !=(ThreadCounts lhs, ThreadCounts rhs) => lhs._data != rhs._data;

public override bool Equals([NotNullWhen(true)] object? obj) => obj is ThreadCounts other && _data == other._data;
public override int GetHashCode() => (int)_data;
public override int GetHashCode() => (int)_data + (int)(_data >> 32);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,19 @@ private static void WorkerThreadStart()
ThreadCounts newCounts = counts;
newCounts.SubtractNumExistingThreads(1);
short newNumExistingThreads = (short)(numExistingThreads - 1);

ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
short newNumThreadsGoal =
Math.Max(
threadPoolInstance.MinThreadsGoal,
Math.Min(newNumExistingThreads, counts.NumThreadsGoal));
newCounts.NumThreadsGoal = newNumThreadsGoal;

ThreadCounts oldCounts =
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
{
short newNumThreadsGoal =
Math.Max(
threadPoolInstance.MinThreadsGoal,
Math.Min(newNumExistingThreads, threadPoolInstance._separated.numThreadsGoal));
if (threadPoolInstance._separated.numThreadsGoal != newNumThreadsGoal)
{
threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal;
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.ThreadTimedOut);
}

HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.ThreadTimedOut);
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads);
Expand Down Expand Up @@ -181,7 +178,7 @@ internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance
while (true)
{
numProcessingWork = counts.NumProcessingWork;
if (numProcessingWork >= threadPoolInstance._separated.numThreadsGoal)
if (numProcessingWork >= counts.NumThreadsGoal)
{
return;
}
Expand Down Expand Up @@ -256,7 +253,7 @@ internal static bool ShouldStopProcessingWorkNow(PortableThreadPool threadPoolIn
// code from which this implementation was ported, which turns a processing thread into a retired thread
// and checks for pending requests like RemoveWorkingWorker. In this implementation there are
// no retired threads, so only the count of threads processing work is considered.
if (counts.NumProcessingWork <= threadPoolInstance._separated.numThreadsGoal)
if (counts.NumProcessingWork <= counts.NumThreadsGoal)
{
return false;
}
Expand Down
Loading