Skip to content

Commit

Permalink
disable auto-downing on restart specs (#7214)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb authored May 31, 2024
1 parent 3befea4 commit 1f0d09c
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
using FluentAssertions;
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tools.Tests.Singleton
{
Expand Down Expand Up @@ -77,14 +78,14 @@ protected TestException(SerializationInfo info, StreamingContext context)
}
}

private Cluster cluster;
private TestLeaseExt testLeaseExt;
private readonly Cluster _cluster;
private readonly TestLeaseExt _testLeaseExt;

private AtomicCounter counter = new(0);
private TimeSpan shortDuration = TimeSpan.FromMilliseconds(50);
private string leaseOwner;
private readonly AtomicCounter _counter = new(0);
private readonly TimeSpan _shortDuration = TimeSpan.FromMilliseconds(50);
private readonly string _leaseOwner;

public ClusterSingletonLeaseSpec() : base(ConfigurationFactory.ParseString(@"
public ClusterSingletonLeaseSpec(ITestOutputHelper output) : base(ConfigurationFactory.ParseString(@"
#akka.loglevel = INFO
akka.loglevel = DEBUG
akka.actor.provider = ""cluster""
Expand All @@ -99,22 +100,22 @@ public ClusterSingletonLeaseSpec() : base(ConfigurationFactory.ParseString(@"
hostname = ""127.0.0.1""
port = 0
}
}").WithFallback(TestLease.Configuration))
}").WithFallback(TestLease.Configuration), output)
{

cluster = Cluster.Get(Sys);
testLeaseExt = TestLeaseExt.Get(Sys);
_cluster = Cluster.Get(Sys);
_testLeaseExt = TestLeaseExt.Get(Sys);

leaseOwner = cluster.SelfMember.Address.HostPort();
_leaseOwner = _cluster.SelfMember.Address.HostPort();

cluster.Join(cluster.SelfAddress);
_cluster.Join(_cluster.SelfAddress);
AwaitAssert(() =>
{
cluster.SelfMember.Status.ShouldBe(MemberStatus.Up);
_cluster.SelfMember.Status.ShouldBe(MemberStatus.Up);
});
}

private string NextName() => $"important-{counter.GetAndIncrement()}";
private string NextName() => $"important-{_counter.GetAndIncrement()}";

private ClusterSingletonManagerSettings NextSettings() => ClusterSingletonManagerSettings.Create(Sys).WithSingletonName(NextName());

Expand All @@ -133,10 +134,10 @@ public void ClusterSingleton_with_lease_should_not_start_until_lease_is_availabl
TestLease testLease = null;
AwaitAssert(() =>
{
testLease = testLeaseExt.GetTestLease(LeaseNameFor(settings));
testLease = _testLeaseExt.GetTestLease(LeaseNameFor(settings));
}); // allow singleton manager to create the lease

probe.ExpectNoMsg(shortDuration);
probe.ExpectNoMsg(_shortDuration);
testLease.InitialPromise.SetResult(true);
probe.ExpectMsg("preStart");
}
Expand All @@ -154,12 +155,12 @@ public void ClusterSingleton_with_lease_should_do_not_start_if_lease_acquire_ret
TestLease testLease = null;
AwaitAssert(() =>
{
testLease = testLeaseExt.GetTestLease(LeaseNameFor(settings));
testLease = _testLeaseExt.GetTestLease(LeaseNameFor(settings));
}); // allow singleton manager to create the lease

probe.ExpectNoMsg(shortDuration);
probe.ExpectNoMsg(_shortDuration);
testLease.InitialPromise.SetResult(false);
probe.ExpectNoMsg(shortDuration);
probe.ExpectNoMsg(_shortDuration);
}

[Fact]
Expand All @@ -175,17 +176,17 @@ public void ClusterSingleton_with_lease_should_retry_trying_to_get_lease_if_acqu
TestLease testLease = null;
AwaitAssert(() =>
{
testLease = testLeaseExt.GetTestLease(LeaseNameFor(settings));
testLease = _testLeaseExt.GetTestLease(LeaseNameFor(settings));
}); // allow singleton manager to create the lease

testLease.Probe.ExpectMsg(new TestLease.AcquireReq(leaseOwner));
singletonProbe.ExpectNoMsg(shortDuration);
TaskCompletionSource<bool> nextResponse = new TaskCompletionSource<bool>();
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(_leaseOwner));
singletonProbe.ExpectNoMsg(_shortDuration);
var nextResponse = new TaskCompletionSource<bool>();

testLease.SetNextAcquireResult(nextResponse.Task);
testLease.InitialPromise.SetResult(false);
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(leaseOwner));
singletonProbe.ExpectNoMsg(shortDuration);
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(_leaseOwner));
singletonProbe.ExpectNoMsg(_shortDuration);
nextResponse.SetResult(true);
singletonProbe.ExpectMsg("preStart");
}
Expand All @@ -203,13 +204,13 @@ public void ClusterSingleton_with_lease_should_do_not_start_if_lease_acquire_fai
TestLease testLease = null;
AwaitAssert(() =>
{
testLease = testLeaseExt.GetTestLease(LeaseNameFor(settings));
testLease = _testLeaseExt.GetTestLease(LeaseNameFor(settings));
}); // allow singleton manager to create the lease


probe.ExpectNoMsg(shortDuration);
probe.ExpectNoMsg(_shortDuration);
testLease.InitialPromise.SetException(new TestException("no lease for you"));
probe.ExpectNoMsg(shortDuration);
probe.ExpectNoMsg(_shortDuration);
}

[Fact]
Expand All @@ -225,16 +226,16 @@ public void ClusterSingleton_with_lease_should_retry_trying_to_get_lease_if_acqu
TestLease testLease = null;
AwaitAssert(() =>
{
testLease = testLeaseExt.GetTestLease(LeaseNameFor(settings));
testLease = _testLeaseExt.GetTestLease(LeaseNameFor(settings));
}); // allow singleton manager to create the lease

testLease.Probe.ExpectMsg(new TestLease.AcquireReq(leaseOwner));
singletonProbe.ExpectNoMsg(shortDuration);
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(_leaseOwner));
singletonProbe.ExpectNoMsg(_shortDuration);
TaskCompletionSource<bool> nextResponse = new TaskCompletionSource<bool>();
testLease.SetNextAcquireResult(nextResponse.Task);
testLease.InitialPromise.SetException(new TestException("no lease for you"));
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(leaseOwner));
singletonProbe.ExpectNoMsg(shortDuration);
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(_leaseOwner));
singletonProbe.ExpectNoMsg(_shortDuration);
nextResponse.SetResult(true);
singletonProbe.ExpectMsg("preStart");
}
Expand All @@ -252,19 +253,19 @@ public void ClusterSingleton_with_lease_should_stop_singleton_if_the_lease_fails
TestLease testLease = null;
AwaitAssert(() =>
{
testLease = testLeaseExt.GetTestLease(LeaseNameFor(settings));
testLease = _testLeaseExt.GetTestLease(LeaseNameFor(settings));
}); // allow singleton manager to create the lease

testLease.Probe.ExpectMsg(new TestLease.AcquireReq(leaseOwner));
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(_leaseOwner));
testLease.InitialPromise.SetResult(true);
lifecycleProbe.ExpectMsg("preStart");
var callback = testLease.GetCurrentCallback();
callback(null);
lifecycleProbe.ExpectMsg("postStop");
testLease.Probe.ExpectMsg(new TestLease.ReleaseReq(leaseOwner));
testLease.Probe.ExpectMsg(new TestLease.ReleaseReq(_leaseOwner));

// should try and reacquire lease
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(leaseOwner));
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(_leaseOwner));
lifecycleProbe.ExpectMsg("preStart");
}

Expand All @@ -281,15 +282,15 @@ public void ClusterSingleton_with_lease_should_release_lease_when_leaving_oldest
TestLease testLease = null;
AwaitAssert(() =>
{
testLease = testLeaseExt.GetTestLease(LeaseNameFor(settings));
testLease = _testLeaseExt.GetTestLease(LeaseNameFor(settings));
}); // allow singleton manager to create the lease

singletonProbe.ExpectNoMsg(shortDuration);
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(leaseOwner));
singletonProbe.ExpectNoMsg(_shortDuration);
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(_leaseOwner));
testLease.InitialPromise.SetResult(true);
singletonProbe.ExpectMsg("preStart");
cluster.Leave(cluster.SelfAddress);
testLease.Probe.ExpectMsg(new TestLease.ReleaseReq(leaseOwner));
_cluster.Leave(_cluster.SelfAddress);
testLease.Probe.ExpectMsg(new TestLease.ReleaseReq(_leaseOwner));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Akka.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tools.Tests.Singleton
{
Expand All @@ -25,26 +26,33 @@ public class ClusterSingletonRestart2Spec : AkkaSpec
private readonly ActorSystem _sys3;
private ActorSystem _sys4 = null;

public ClusterSingletonRestart2Spec() : base(@"
akka.loglevel = INFO
akka.actor.provider = ""cluster""
akka.cluster.roles = [singleton]
akka.cluster.auto-down-unreachable-after = 2s
akka.cluster.singleton.min-number-of-hand-over-retries = 5
akka.remote {
dot-netty.tcp {
hostname = ""127.0.0.1""
port = 0
}
}")
public ClusterSingletonRestart2Spec(ITestOutputHelper output) : base("""

akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.cluster.roles = [singleton]
#akka.cluster.auto-down-unreachable-after = 2s
akka.cluster.singleton.min-number-of-hand-over-retries = 5
akka.remote {
dot-netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
""", output)
{
_sys1 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
_sys2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
_sys3 = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString("akka.cluster.roles = [other]")
.WithFallback(Sys.Settings.Config));

// ensure xunit output is captured
InitializeLogger(_sys1);
InitializeLogger(_sys2);
InitializeLogger(_sys3);
}

public void Join(ActorSystem from, ActorSystem to)
public Task Join(ActorSystem from, ActorSystem to)
{
if (Cluster.Get(from).SelfRoles.Contains("singleton"))
{
Expand All @@ -54,9 +62,9 @@ public void Join(ActorSystem from, ActorSystem to)
}


Within(TimeSpan.FromSeconds(45), () =>
return WithinAsync(TimeSpan.FromSeconds(45), () =>
{
AwaitAssert(() =>
return AwaitAssertAsync(() =>
{
Cluster.Get(from).Join(Cluster.Get(to).SelfAddress);
Cluster.Get(from).State.Members.Select(x => x.UniqueAddress).Should().Contain(Cluster.Get(from).SelfUniqueAddress);
Expand All @@ -70,56 +78,61 @@ public void Join(ActorSystem from, ActorSystem to)
}

[Fact]
public void Restarting_cluster_node_during_hand_over_must_restart_singletons_in_restarted_node()
public async Task Restarting_cluster_node_during_hand_over_must_restart_singletons_in_restarted_node()
{
Join(_sys1, _sys1);
Join(_sys2, _sys1);
Join(_sys3, _sys1);
var joinTasks = new[]
{
Join(_sys1, _sys1),
Join(_sys2, _sys1),
Join(_sys3, _sys1)
};
await Task.WhenAll(joinTasks);

var proxy3 = _sys3.ActorOf(
ClusterSingletonProxy.Props("user/echo", ClusterSingletonProxySettings.Create(_sys3).WithRole("singleton")), "proxy3");

Within(TimeSpan.FromSeconds(5), () =>
await WithinAsync(TimeSpan.FromSeconds(5), () =>
{
AwaitAssert(() =>
return AwaitAssertAsync(async () =>
{
var probe = CreateTestProbe(_sys3);
proxy3.Tell("hello", probe.Ref);
probe.ExpectMsg<UniqueAddress>(TimeSpan.FromSeconds(1))
.Should()
var msg = await probe.ExpectMsgAsync<UniqueAddress>(TimeSpan.FromSeconds(1));
msg.Should()
.Be(Cluster.Get(_sys1).SelfUniqueAddress);
});
});

Cluster.Get(_sys1).Leave(Cluster.Get(_sys1).SelfAddress);


// ReSharper disable once PossibleInvalidOperationException
var sys2Port = Cluster.Get(_sys2).SelfAddress.Port.Value; // grab value before shutdown
// at the same time, shutdown sys2, which would be the expected next singleton node
Shutdown(_sys2);
// it will be downed by the join attempts of the new incarnation

// then restart it
// ReSharper disable once PossibleInvalidOperationException
var sys2Port = Cluster.Get(_sys2).SelfAddress.Port.Value;
var sys4Config = ConfigurationFactory.ParseString(@"akka.remote.dot-netty.tcp.port=" + sys2Port)
.WithFallback(_sys1.Settings.Config);
_sys4 = ActorSystem.Create(_sys1.Name, sys4Config);
InitializeLogger(_sys4); // ensure xunit output is captured

Join(_sys4, _sys3);
await Join(_sys4, _sys3);

// let it stabilize
Task.Delay(TimeSpan.FromSeconds(5)).Wait();
//Task.Delay(TimeSpan.FromSeconds(5)).Wait();

Within(TimeSpan.FromSeconds(10), () =>
await WithinAsync(TimeSpan.FromSeconds(10), () =>
{
AwaitAssert(() =>
return AwaitAssertAsync(async () =>
{
var probe = CreateTestProbe(_sys3);
proxy3.Tell("hello2", probe.Ref);
// note that sys3 doesn't have the required singleton role, so singleton instance should be
// on the restarted node
probe.ExpectMsg<UniqueAddress>(TimeSpan.FromSeconds(1))
.Should()
var msg = await probe.ExpectMsgAsync<UniqueAddress>(TimeSpan.FromSeconds(1));
msg.Should()
.Be(Cluster.Get(_sys4).SelfUniqueAddress);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public ClusterSingletonRestart3Spec(ITestOutputHelper output) : base(@"
akka.loglevel = DEBUG
akka.actor.provider = ""cluster""
akka.cluster.app-version = ""1.0.0""
akka.cluster.auto-down-unreachable-after = 2s
#akka.cluster.auto-down-unreachable-after = 2s
akka.cluster.singleton.min-number-of-hand-over-retries = 5
akka.cluster.singleton.consider-app-version = true
akka.remote {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Akka.TestKit.TestActors;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tools.Tests.Singleton
{
Expand All @@ -25,16 +26,16 @@ public class ClusterSingletonRestartSpec : AkkaSpec
private readonly ActorSystem _sys2;
private ActorSystem _sys3 = null;

public ClusterSingletonRestartSpec() : base(@"
public ClusterSingletonRestartSpec(ITestOutputHelper output) : base(@"
akka.loglevel = INFO
akka.actor.provider = ""cluster""
akka.cluster.auto-down-unreachable-after = 2s
#akka.cluster.auto-down-unreachable-after = 2s
akka.remote {
dot-netty.tcp {
hostname = ""127.0.0.1""
port = 0
}
}")
}", output)
{
_sys1 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
_sys2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
Expand Down
Loading

0 comments on commit 1f0d09c

Please sign in to comment.