Skip to content

Commit

Permalink
Backport Akka.NET v1.4.51 performance fixes (#6821)
Browse files Browse the repository at this point in the history
* Do not use expression based options for backoff props (#6805)

* Do not use expression based props for long lived streams (#6807)

* Do not use expression based props for long lived streams

* Update ActorMaterializerImpl.cs

---------

Co-authored-by: Aaron Stannard <aaron@petabridge.com>

---------

Co-authored-by: Drew <laingas@gmail.com>
  • Loading branch information
Aaronontheweb and to11mtm authored Jun 28, 2023
1 parent 819af0f commit 17a9c77
Show file tree
Hide file tree
Showing 12 changed files with 29 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ public Children(IImmutableSet<IActorRef> refs)
/// <param name="haveShutdown">TBD</param>
/// <returns>TBD</returns>
public static Props Props(ActorMaterializerSettings settings, AtomicBoolean haveShutdown)
=> Actor.Props.Create(() => new StreamSupervisor(settings, haveShutdown)).WithDeploy(Deploy.Local);
=> Actor.Props.Create<StreamSupervisor>(settings, haveShutdown).WithDeploy(Deploy.Local);

/// <summary>
/// TBD
Expand All @@ -631,6 +631,7 @@ public static Props Props(ActorMaterializerSettings settings, AtomicBoolean have
/// </summary>
/// <param name="settings">TBD</param>
/// <param name="haveShutdown">TBD</param>
/// If this changes you must also change StreamSupervisor.Props as well!
public StreamSupervisor(ActorMaterializerSettings settings, AtomicBoolean haveShutdown)
{
Settings = settings;
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Streams/Implementation/ActorRefSourceActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static Props Props(int bufferSize, OverflowStrategy overflowStrategy, Act
throw new NotSupportedException("Backpressure overflow strategy not supported");

var maxFixedBufferSize = settings.MaxFixedBufferSize;
return Actor.Props.Create(() => new ActorRefSourceActor<T>(bufferSize, overflowStrategy, maxFixedBufferSize));
return Actor.Props.Create<ActorRefSourceActor<T>>(bufferSize, overflowStrategy, maxFixedBufferSize);
}

/// <summary>
Expand All @@ -58,6 +58,7 @@ public static Props Props(int bufferSize, OverflowStrategy overflowStrategy, Act
/// <param name="bufferSize">TBD</param>
/// <param name="overflowStrategy">TBD</param>
/// <param name="maxFixedBufferSize">TBD</param>
/// If this changes you must also change <see cref="ActorRefSourceActor{T}.Props"/> as well!
public ActorRefSourceActor(int bufferSize, OverflowStrategy overflowStrategy, int maxFixedBufferSize)
{
BufferSize = bufferSize;
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Streams/Implementation/FanOut.cs
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ internal static class Unzip
/// <param name="settings">TBD</param>
/// <returns>TBD</returns>
public static Props Props<T>(ActorMaterializerSettings settings)
=> Actor.Props.Create(() => new Unzip<T>(settings, 2)).WithDeploy(Deploy.Local);
=> Actor.Props.Create<Unzip<T>>(settings, 2).WithDeploy(Deploy.Local);
}

/// <summary>
Expand All @@ -740,6 +740,7 @@ internal sealed class Unzip<T> : FanOut<T>
/// This exception is thrown when the elements in <see cref="Akka.Streams.Implementation.FanOut{T}.PrimaryInputs"/>
/// are of an unknown type.
/// </exception>>
/// If this gets changed you must change <see cref="Akka.Streams.Implementation.FanOut.Unzip{T}"/> as well!
public Unzip(ActorMaterializerSettings settings, int outputCount = 2) : base(settings, outputCount)
{
OutputBunch.MarkAllOutputs();
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Streams/Implementation/FanoutProcessorImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ internal sealed class FanoutProcessorImpl<T, TStreamBuffer> : ActorProcessorImpl
/// <param name="onTerminated">TBD</param>
/// <returns>TBD</returns>
public static Props Props(ActorMaterializerSettings settings, Action onTerminated = null)
=> Actor.Props.Create(() => new FanoutProcessorImpl<T, TStreamBuffer>(settings, onTerminated)).WithDeploy(Deploy.Local);
=> Actor.Props.Create<FanoutProcessorImpl<T, TStreamBuffer>>(settings, onTerminated).WithDeploy(Deploy.Local);

/// <summary>
/// TBD
Expand All @@ -240,6 +240,7 @@ public static Props Props(ActorMaterializerSettings settings, Action onTerminate
/// </summary>
/// <param name="settings">TBD</param>
/// <param name="onTerminated">TBD</param>
/// If this gets changed you must change <see cref="FanoutProcessorImpl{T,TStreamBuffer}.Props"/> as well!
public FanoutProcessorImpl(ActorMaterializerSettings settings, Action onTerminated) : base(settings)
{
PrimaryOutputs = new FanoutOutputs<T, TStreamBuffer>(settings.MaxInputBufferSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1329,7 +1329,8 @@ private void Complete()
/// </summary>
/// <param name="shell">TBD</param>
/// <returns>TBD</returns>
public static Props Props(GraphInterpreterShell shell) => Actor.Props.Create(() => new ActorGraphInterpreter(shell)).WithDeploy(Deploy.Local);
public static Props Props(GraphInterpreterShell shell) => Actor.Props
.Create<ActorGraphInterpreter>(shell).WithDeploy(Deploy.Local);

private ISet<GraphInterpreterShell> _activeInterpreters = new HashSet<GraphInterpreterShell>();
private readonly Queue<GraphInterpreterShell> _newShells = new();
Expand All @@ -1346,6 +1347,7 @@ private void Complete()
/// TBD
/// </summary>
/// <param name="shell">TBD</param>
/// If this ctor gets changed you -must- change <see cref="ActorGraphInterpreter.Props"/> as well!
public ActorGraphInterpreter(GraphInterpreterShell shell)
{
_initial = shell;
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Streams/Implementation/IO/FilePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static Props Props(FileInfo f, TaskCompletionSource<IOResult> completionP
if (maxBuffer < initialBuffer)
throw new ArgumentException($"maxBuffer must be >= initialBuffer (was {maxBuffer})", nameof(maxBuffer));

return Actor.Props.Create(() => new FilePublisher(f, completionPromise, chunkSize, startPosition, maxBuffer))
return Actor.Props.Create<FilePublisher>( f, completionPromise, chunkSize, startPosition, maxBuffer)
.WithDeploy(Deploy.Local);
}

Expand Down Expand Up @@ -86,6 +86,7 @@ private struct Continue : IDeadLetterSuppression
/// <param name="chunkSize">TBD</param>
/// <param name="startPosition">TBD</param>
/// <param name="maxBuffer">TBD</param>
/// If this changes you must also change <see cref="FilePublisher.Props"/> as well!
public FilePublisher(FileInfo f, TaskCompletionSource<IOResult> completionPromise, int chunkSize, long startPosition, int maxBuffer)
{
_f = f;
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static Props Props(
if (startPosition < 0)
throw new ArgumentException($"startPosition must be >= 0 (was {startPosition})", nameof(startPosition));

return Actor.Props.Create(() => new FileSubscriber(f, completionPromise, bufferSize, startPosition, fileMode, autoFlush, flushCommand))
return Actor.Props.Create<FileSubscriber>(f, completionPromise, bufferSize, startPosition, fileMode, autoFlush, flushCommand)
.WithDeploy(Deploy.Local);
}

Expand All @@ -72,6 +72,7 @@ public static Props Props(
/// <param name="fileMode">TBD</param>
/// <param name="autoFlush"></param>
/// <param name="flushSignaler"></param>
/// If this changes you must change <see cref="FileSubscriber.Props"/> as well!
public FileSubscriber(
FileInfo f,
TaskCompletionSource<IOResult> completionPromise,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static Props Props(Stream inputstream, TaskCompletionSource<IOResult> com
if (chunkSize <= 0)
throw new ArgumentException($"chunkSize must be > 0 was {chunkSize}", nameof(chunkSize));

return Actor.Props.Create(()=> new InputStreamPublisher(inputstream, completionSource, chunkSize)).WithDeploy(Deploy.Local);
return Actor.Props.Create<InputStreamPublisher>(inputstream, completionSource, chunkSize).WithDeploy(Deploy.Local);
}

private struct Continue : IDeadLetterSuppression
Expand All @@ -58,6 +58,7 @@ private struct Continue : IDeadLetterSuppression
/// <param name="inputstream">TBD</param>
/// <param name="completionSource">TBD</param>
/// <param name="chunkSize">TBD</param>
/// If this gets changed you must change <see cref="InputStreamPublisher.Props"/> as well!
public InputStreamPublisher(Stream inputstream, TaskCompletionSource<IOResult> completionSource, int chunkSize)
{
_inputstream = inputstream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public static Props Props(Stream os, TaskCompletionSource<IOResult> completionPr
throw new ArgumentException("Buffer size must be > 0");

return
Actor.Props.Create(() => new OutputStreamSubscriber(os, completionPromise, bufferSize, autoFlush))
Actor.Props.Create<OutputStreamSubscriber>(os, completionPromise, bufferSize, autoFlush)
.WithDeploy(Deploy.Local);
}

Expand All @@ -53,6 +53,7 @@ public static Props Props(Stream os, TaskCompletionSource<IOResult> completionPr
/// <param name="completionPromise">TBD</param>
/// <param name="bufferSize">TBD</param>
/// <param name="autoFlush">TBD</param>
/// If this gets changed you must change <see cref="OutputStreamSubscriber.Props"/> as well!
public OutputStreamSubscriber(Stream outputStream, TaskCompletionSource<IOResult> completionPromise, int bufferSize, bool autoFlush)
{
_outputStream = outputStream;
Expand Down
4 changes: 4 additions & 0 deletions src/core/Akka/Pattern/BackoffOnRestartSupervisor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ internal sealed class BackoffOnRestartSupervisor : BackoffSupervisorBase
private readonly OneForOneStrategy _strategy;
private readonly ILoggingAdapter _log = Context.GetLogger();

/// <devremarks>
/// If the arguments here change, you -must- change the invocation in <see cref="BackoffOptions"/> accordingly!
/// Expression based props are too slow for many scenarios, so we must drop compile time safety for that sake.
/// </devremarks>
public BackoffOnRestartSupervisor(
Props childProps,
string childName,
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka/Pattern/BackoffOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ internal override Props Props
switch (_backoffType)
{
case RestartImpliesFailure _:
return Props.Create(() => new BackoffOnRestartSupervisor(_childProps, _childName, _minBackoff, _maxBackoff, _reset, _randomFactor, _strategy, _replyWhileStopped, _finalStopMessage));
return Props.Create<BackoffOnRestartSupervisor>(_childProps, _childName, _minBackoff, _maxBackoff, _reset, _randomFactor, _strategy, _replyWhileStopped, _finalStopMessage);
case StopImpliesFailure _:
return Props.Create(() => new BackoffSupervisor(_childProps, _childName, _minBackoff, _maxBackoff, _reset, _randomFactor, _strategy, _replyWhileStopped, _finalStopMessage));
return Props.Create<BackoffSupervisor>(_childProps, _childName, _minBackoff, _maxBackoff, _reset, _randomFactor, _strategy, _replyWhileStopped, _finalStopMessage);
default:
return Props.Empty;
}
Expand Down
4 changes: 4 additions & 0 deletions src/core/Akka/Pattern/BackoffSupervisor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public BackoffSupervisor(
{
}

/// <summary>
/// If the arguments here change, you -must- change the invocation in <see cref="BackoffOptions"/> accordingly!
/// Expression based props are too slow for many scenarios, so we must drop compile time safety for that sake.
/// </summary>
public BackoffSupervisor(
Props childProps,
string childName,
Expand Down

0 comments on commit 17a9c77

Please sign in to comment.