Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ClusterSingletonProxy fails to reacquire singleton actor #7315

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using Akka.TestKit;
using Akka.Util.Internal;
using FluentAssertions;
using FluentAssertions.Extensions;

namespace Akka.Cluster.Tools.Tests.MultiNode.Singleton
{
Expand All @@ -43,7 +44,7 @@ public ClusterSingletonManagerLeaseSpecConfig()
akka.actor.provider = ""cluster""
akka.remote.log-remote-lifecycle-events = off
#akka.cluster.auto-down-unreachable-after = off
#akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
# akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.testkit.auto-down-unreachable-after = 0s
test-lease {
Expand Down Expand Up @@ -134,7 +135,9 @@ public class ClusterSingletonManagerLeaseSpec : MultiNodeClusterSpec
protected override int InitialParticipantsValueFactory => Roles.Count;

// used on the controller
private TestProbe leaseProbe;
private TestProbe _leaseProbe;

private IActorRef _proxy;

public ClusterSingletonManagerLeaseSpec()
: this(new ClusterSingletonManagerLeaseSpecConfig())
Expand All @@ -145,7 +148,7 @@ protected ClusterSingletonManagerLeaseSpec(ClusterSingletonManagerLeaseSpecConfi
{
_config = config;

leaseProbe = CreateTestProbe();
_leaseProbe = CreateTestProbe();
}

[MultiNodeFact]
Expand All @@ -156,6 +159,7 @@ public void ClusterSingletonManagerLeaseSpecs()
Cluster_singleton_manager_with_lease_should_find_the_lease_on_every_node();
Cluster_singleton_manager_with_lease_should_Start_singleton_and_ping_from_all_nodes();
Cluster_singleton_manager_with_lease_should_Move_singleton_when_oldest_node_downed();
Cluster_singleton_manager_with_lease_proxy_should_reacquire_singleton_actor_when_lease_lost();
}

public void Cluster_singleton_manager_with_lease_should_form_a_cluster()
Expand Down Expand Up @@ -216,19 +220,21 @@ public void Cluster_singleton_manager_with_lease_should_Start_singleton_and_ping
{
Sys.ActorOf(
ClusterSingletonManager.Props(
ClusterSingletonManagerLeaseSpecConfig.ImportantSingleton.Props, PoisonPill.Instance, ClusterSingletonManagerSettings.Create(Sys).WithRole("worker")),
ClusterSingletonManagerLeaseSpecConfig.ImportantSingleton.Props,
PoisonPill.Instance,
ClusterSingletonManagerSettings.Create(Sys).WithRole("worker")),
"important");
}, _config.First, _config.Second, _config.Third, _config.Fourth);
EnterBarrier("singleton-started");

var proxy = Sys.ActorOf(
_proxy = Sys.ActorOf(
ClusterSingletonProxy.Props(
singletonManagerPath: "/user/important",
settings: ClusterSingletonProxySettings.Create(Sys).WithRole("worker")));

RunOn(() =>
{
proxy.Tell("Ping");
_proxy.Tell("Ping");
// lease has not been granted so now allowed to come up
ExpectNoMsg(TimeSpan.FromSeconds(2));
}, _config.First, _config.Second, _config.Third, _config.Fourth);
Expand Down Expand Up @@ -286,32 +292,88 @@ public void Cluster_singleton_manager_with_lease_should_Move_singleton_when_olde

EnterBarrier("first node downed");

var proxy = Sys.ActorOf(
ClusterSingletonProxy.Props(
singletonManagerPath: "/user/important",
settings: ClusterSingletonProxySettings.Create(Sys).WithRole("worker")));

RunOn(() =>
{
proxy.Tell("Ping");
_proxy.Tell("Ping");
// lease has not been granted so now allowed to come up
ExpectNoMsg(TimeSpan.FromSeconds(2));
}, _config.Second, _config.Third, _config.Fourth);
EnterBarrier("singleton-not-migrated");

RunOn(() =>
{
TestLeaseActorClientExt.Get(Sys).GetLeaseActor().Tell(new TestLeaseActor.ActionRequest(new TestLeaseActor.Acquire(GetAddress(_config.Second).HostPort()), true));
var leaseActor = TestLeaseActorClientExt.Get(Sys).GetLeaseActor();
leaseActor.Tell(new TestLeaseActor.ActionRequest(new TestLeaseActor.Release(GetAddress(_config.First).HostPort()), true));
leaseActor.Tell(new TestLeaseActor.ActionRequest(new TestLeaseActor.Acquire(GetAddress(_config.Second).HostPort()), true));
}, _config.Controller);

EnterBarrier("singleton-moving-to-second");

RunOn(() =>
{
ExpectMsg(new ClusterSingletonManagerLeaseSpecConfig.ImportantSingleton.Response("Ping", GetAddress(_config.Second)), TimeSpan.FromSeconds(20));
}, _config.Second, _config.Third, _config.Fourth);

EnterBarrier("singleton-moved-to-second");
}

// Reproduction for https://github.com/akkadotnet/Akka.Management/issues/2490
public void Cluster_singleton_manager_with_lease_proxy_should_reacquire_singleton_actor_when_lease_lost()
{
RunOn(() =>
{
var singletonManager = new RootActorPath(GetAddress(_config.Second)) / "user" / "important";
var selection = Sys.ActorSelection(singletonManager);
var actorRef = selection.ResolveOne(3.Seconds()).GetAwaiter().GetResult();
actorRef.Tell(new LeaseLost(new Exception("Lease not found")), TestLeaseActorClientExt.Get(Sys).GetLeaseActor());
}, _config.Second);

EnterBarrier("lease-deleted");

RunOn(() =>
{
TestLeaseActor.LeaseRequests requests = null;
AwaitAssert(() =>
{
TestLeaseActorClientExt.Get(Sys).GetLeaseActor().Tell(TestLeaseActor.GetRequests.Instance);
var msg = ExpectMsg<TestLeaseActor.LeaseRequests>();

msg.Requests.Count.Should().Be(2);
requests = msg;
}, TimeSpan.FromSeconds(10));

requests.Requests[0].Should().Be(new TestLeaseActor.Acquire(GetAddress(_config.Second).HostPort()));
requests.Requests[1].Should().Be(new TestLeaseActor.Release(GetAddress(_config.Second).HostPort()));

TestLeaseActorClientExt.Get(Sys).GetLeaseActor().Tell(
new TestLeaseActor.ActionRequest(new TestLeaseActor.Release(GetAddress(_config.Second).HostPort()), false));
}, _config.Controller);

EnterBarrier("singleton-actor-downed");

RunOn(() =>
{
_proxy.Tell("Ping");
// lease was lost
ExpectNoMsg(TimeSpan.FromSeconds(2));
}, _config.Second, _config.Third, _config.Fourth);
EnterBarrier("lease-lost");

RunOn(() =>
{
TestLeaseActorClientExt.Get(Sys).GetLeaseActor().Tell(new TestLeaseActor.ActionRequest(new TestLeaseActor.Acquire(GetAddress(_config.Second).HostPort()), true));
}, _config.Controller);

EnterBarrier("singleton-actor-recreated");

// In the bug, even though second node manages to reacquire the lease and restarts the singleton actor,
// all the proxies failed to reacquire the new singleton actor ref
RunOn(() =>
{
proxy.Tell("Ping");
ExpectMsg(new ClusterSingletonManagerLeaseSpecConfig.ImportantSingleton.Response("Ping", GetAddress(_config.Second)), TimeSpan.FromSeconds(20));
}, _config.Second, _config.Third, _config.Fourth);
EnterBarrier("finished");

EnterBarrier("singleton-proxy-reacquire-singleton-actor");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,12 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS
{
if (Equals(_singleton, terminated.ActorRef))
{
// buffering mode, identification of new will start when old node is removed
// buffering mode
_singleton = null;

// Bugfix: https://github.com/akkadotnet/Akka.Management/issues/2490
// try to re-acquire singleton in-case this is caused by a lost lease condition
IdentifySingleton();
Comment on lines +150 to +151
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual fix, just try to re-acquire immediately

}
});
ReceiveAny(msg =>
Expand Down
Loading