Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is AK1001 a valid rule? #65

Closed
dimabarbul opened this issue Feb 11, 2024 · 1 comment · Fixed by #74
Closed

Is AK1001 a valid rule? #65

dimabarbul opened this issue Feb 11, 2024 · 1 comment · Fixed by #74

Comments

@dimabarbul
Copy link

Version Information
Version of Akka.NET? 1.5.16
Which Akka.NET Modules? Akka.Analyzers

Describe the bug
The documentation for rule AK1001 says that in order to pass sender to PipeTo method, local variable should be used, because PipeTo will be executed later. I have serious doubts that this is the case.

Here is the code that is provided in the documentation:

public sealed class MyActor : UntypedActor{

    protected override void OnReceive(object message){
        async Task<int> LocalFunction(){
            await Task.Delay(10);
            return message.ToString().Length;
        }

        // potentially unsafe use of Context.Sender
        LocalFunction().PipeTo(Sender); 
    }
}

AFAIK, here is what happens in the line LocalFunction().PipeTo(Sender);:

  1. We execute local function. It is marker as async. This means that state machine will be used.
    So, what will happen: the control will be passed to the function, it will create new instance of state machine class, start it and simply return task which can be used to get information about the work being done. I'll stress here: the code will NOT wait until the job is done, it will just start the job and return instance of Task class for caller to be able to track the job. All this will be done synchronously, in the very same thread that we started.
  2. Then this instance of Task class will be passed to PipeTo method as the first argument (as PipeTo is an extension method) and the second argument will be value returned by Sender getter method. I'll repeat: the getter method will be executed right here, BEFORE calling PipeTo, because in order to call PipeTo we must know what arguments we provide to the method. And all this happens synchronously, in the same thread that we started in. We're still synchronously processing current message and Akka will not change sender as we are not done yet. Sender getter will return address of sender object (as it is a reference type) and this address will be used inside of PipeTo method. So even when later Sender getter will be returning another object, PipeTo method will still use the address we provided to it.
  3. PipeTo is async method, so again, instance of state machine will be created, started and task will be returned. And we will continue
    execution in the same thread as before. If after this line we will get value of this.Sender (or Context.Sender) again, we will get the same value as the one we've passed to PipeTo.

To Reproduce

The unit test I use to verify the behavior is following:

using Akka.Actor;
using Akka.TestKit;
using Akka.TestKit.Xunit2;

namespace UnitTests;

public class UnitTest1 : TestKit
{
    [Fact]
    public void Test1()
    {
        IActorRef sut = this.Sys.ActorOf(Props.Create(() => new MyActor()));
        TestProbe testProbe1 = this.CreateTestProbe();
        TestProbe testProbe2 = this.CreateTestProbe();

        sut.Tell("first", testProbe1.Ref);
        sut.Tell("second", testProbe2.Ref);

        testProbe1.ExpectMsg(5);
        testProbe2.ExpectMsg(6);
    }

    public sealed class MyActor : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            async Task<int> LocalFunction()
            {
                Console.WriteLine($"Begin {nameof(LocalFunction)} for message \"{message}\"");
                await Task.Delay(1000);
                Console.WriteLine($"End {nameof(LocalFunction)} for message \"{message}\"");
                return message.ToString().Length;
            }

            Console.WriteLine($"Received \"{message}\", this.Sender={this.Sender}, Context.Sender={Context.Sender}");
#pragma warning disable AK1001
            LocalFunction().PipeTo(this.Sender);
#pragma warning restore AK1001
        }
    }
}

The output is following:

Received "first", this.Sender=[akka://test/system/testActor2#247961004], Context.Sender=[akka://test/system/testActor2#247961004]
Begin LocalFunction for message "first"
Received "second", this.Sender=[akka://test/system/testActor3#1017927033], Context.Sender=[akka://test/system/testActor3#1017927033]
Begin LocalFunction for message "second"
End LocalFunction for message "first"
End LocalFunction for message "second"

By the end of the local function called for the "first" the "second" message will already be processed, so the sender should have been changed to testProbe2, but the first message is correctly delivered to testProbe1 - test passes.

I tried running another variant of this test, making sure that local function for "first" end in the middle of processing "second":

public class UnitTest1 : TestKit
{
    [Fact]
    public void Test2()
    {
        IActorRef sut = this.Sys.ActorOf(Props.Create(() => new MyActor2()));
        TestProbe testProbe1 = this.CreateTestProbe();
        TestProbe testProbe2 = this.CreateTestProbe();

        sut.Tell("first", testProbe1.Ref);
        sut.Tell("second", testProbe2.Ref);

        testProbe1.ExpectMsg(5);
        testProbe2.ExpectMsg(6);
    }

    public sealed class MyActor2 : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            async Task<int> LocalFunction()
            {
                Console.WriteLine($"Begin {nameof(LocalFunction)} for message \"{message}\"");
                await Task.Delay(100);
                Console.WriteLine($"End {nameof(LocalFunction)} for message \"{message}\"");
                return message.ToString().Length;
            }

            // wait some time to make local function for "first" end while
            // "second" is being processed
            Console.WriteLine($"Received \"{message}\", this.Sender={this.Sender}, Context.Sender={Context.Sender}, waiting");
            Thread.Sleep(100);
            Console.WriteLine($"Continuing processing \"{message}\", this.Sender={this.Sender}, Context.Sender={Context.Sender}");
#pragma warning disable AK1001
            LocalFunction().PipeTo(this.Sender);
#pragma warning restore AK1001
        }
    }
}

The output for the second test:

Received "first", this.Sender=[akka://test/system/testActor2#719986033], Context.Sender=[akka://test/system/testActor2#719986033], waiting
Continuing processing "first", this.Sender=[akka://test/system/testActor2#719986033], Context.Sender=[akka://test/system/testActor2#719986033]
Begin LocalFunction for message "first"
Received "second", this.Sender=[akka://test/system/testActor3#1932466648], Context.Sender=[akka://test/system/testActor3#1932466648], waiting
End LocalFunction for message "first"
Continuing processing "second", this.Sender=[akka://test/system/testActor3#1932466648], Context.Sender=[akka://test/system/testActor3#1932466648]
Begin LocalFunction for message "second"
End LocalFunction for message "second"

Even in this case test passes and first message is successfully delivered to testProbe1.

I might miss something here, but I don't really see what.

Expected behavior
It would be great if someone could provide the code that will demonstrate scenario when code violating AK1001 does not work as expected.

Actual behavior
The sample code works fine.

Environment
Linux, .NET 6.0.417.

Unit test containing both scenarios
using Akka.Actor;
using Akka.TestKit;
using Akka.TestKit.Xunit2;

namespace UnitTests;

public class UnitTest1 : TestKit
{
    [Fact]
    public void Test1()
    {
        IActorRef sut = this.Sys.ActorOf(Props.Create(() => new MyActor()));
        TestProbe testProbe1 = this.CreateTestProbe();
        TestProbe testProbe2 = this.CreateTestProbe();

        sut.Tell("first", testProbe1.Ref);
        sut.Tell("second", testProbe2.Ref);

        testProbe1.ExpectMsg(5);
        testProbe2.ExpectMsg(6);
    }

    [Fact]
    public void Test2()
    {
        IActorRef sut = this.Sys.ActorOf(Props.Create(() => new MyActor2()));
        TestProbe testProbe1 = this.CreateTestProbe();
        TestProbe testProbe2 = this.CreateTestProbe();

        sut.Tell("first", testProbe1.Ref);
        sut.Tell("second", testProbe2.Ref);

        testProbe1.ExpectMsg(5);
        testProbe2.ExpectMsg(6);
    }

    public sealed class MyActor : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            async Task<int> LocalFunction()
            {
                Console.WriteLine($"Begin {nameof(LocalFunction)} for message \"{message}\"");
                await Task.Delay(1000);
                Console.WriteLine($"End {nameof(LocalFunction)} for message \"{message}\"");
                return message.ToString().Length;
            }

            Console.WriteLine($"Received \"{message}\", this.Sender={this.Sender}, Context.Sender={Context.Sender}");
#pragma warning disable AK1001
            LocalFunction().PipeTo(this.Sender);
#pragma warning restore AK1001
        }
    }

    public sealed class MyActor2 : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            async Task<int> LocalFunction()
            {
                Console.WriteLine($"Begin {nameof(LocalFunction)} for message \"{message}\"");
                await Task.Delay(100);
                Console.WriteLine($"End {nameof(LocalFunction)} for message \"{message}\"");
                return message.ToString().Length;
            }

            // wait some time to make local function for "first" end while
            // "second" is being processed
            Console.WriteLine($"Received \"{message}\", this.Sender={this.Sender}, Context.Sender={Context.Sender}, waiting");
            Thread.Sleep(100);
            Console.WriteLine($"Continuing processing \"{message}\", this.Sender={this.Sender}, Context.Sender={Context.Sender}");
#pragma warning disable AK1001
            LocalFunction().PipeTo(this.Sender);
#pragma warning restore AK1001
        }
    }
}
@Aaronontheweb
Copy link
Member

@dimabarbul you are right, it may not be a valid rule - I wrote all of those changes to PipeTo a while back for the express purpose of trying to prevent this error through Akka.NET's own infrastructure (and to improve traceability, which it also helped.)

So yeah, maybe we need to get rid of AK1001

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants