diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubMediatorSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubMediatorSpec.cs index 7f6f4847394..458784c7142 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubMediatorSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubMediatorSpec.cs @@ -286,6 +286,15 @@ private void AwaitCount(int expected) }); } + private void AwaitCountSubscribers(int expected, string topic) + { + AwaitAssert(() => + { + Mediator.Tell(new CountSubscribers(topic)); + Assert.Equal(expected, ExpectMsg()); + }); + } + #endregion [MultiNodeFact] @@ -306,6 +315,7 @@ public void DistributedPubSubMediatorSpecs() DistributedPubSubMediator_must_remove_entries_when_node_is_removed(); DistributedPubSubMediator_must_receive_proper_UnsubscribeAck_message(); DistributedPubSubMediator_must_get_topics_after_simple_publish(); + DistributedPubSubMediator_must_remove_topic_subscribers_when_they_terminate(); } public void DistributedPubSubMediator_must_startup_2_nodes_cluster() @@ -820,5 +830,23 @@ public void DistributedPubSubMediator_must_get_topics_after_simple_publish() EnterBarrier("after-get-topics"); }); } + + public void DistributedPubSubMediator_must_remove_topic_subscribers_when_they_terminate() + { + Within(TimeSpan.FromSeconds(15), () => + { + RunOn(() => + { + var s1 = new Subscribe("topic_b1", CreateChatUser("u18")); + Mediator.Tell(s1); + ExpectMsg(x => x.Subscribe.Equals(s1)); + + AwaitCountSubscribers(1, "topic_b1"); + ChatUser("u18").Tell(PoisonPill.Instance); + AwaitCountSubscribers(0, "topic_b1"); + }, _first); + EnterBarrier("after-15"); + }); + } } } diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs index 12c44ddce98..81020dbffd9 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs @@ -378,6 +378,22 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) { Sender.Tell(deltaCount); }); + Receive(msg => + { + var encTopic = Internal.Utils.EncodeName(msg.Topic); + _buffer.BufferOr(Internal.Utils.MakeKey(Self.Path / encTopic), msg, Sender, () => + { + var child = Context.Child(encTopic); + if (!child.IsNobody()) + { + child.Tell(Count.Instance, Sender); + } + else + { + Sender.Tell(0); + } + }); + }); } private bool OtherHasNewerVersions(IDictionary versions) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/TopicMessages.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/TopicMessages.cs index 060dfccf1be..1ed8efb75cd 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/TopicMessages.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/TopicMessages.cs @@ -41,6 +41,19 @@ internal sealed class Count private Count() { } } + /// + /// TBD + /// + internal sealed class CountSubscribers + { + public string Topic { get; } + + public CountSubscribers(string topic) + { + Topic = topic; + } + } + /// /// TBD /// diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs index 2a84525beba..6c014798437 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs @@ -111,6 +111,10 @@ protected bool DefaultReceive(object message) Context.Parent.Tell(NewSubscriberArrived.Instance); } } + else if (message is Count) + { + Sender.Tell(Subscribers.Count); + } else { foreach (var subscriber in Subscribers) @@ -137,7 +141,7 @@ protected override bool Receive(object message) return Business(message) || DefaultReceive(message); } - private void Remove(IActorRef actorRef) + protected void Remove(IActorRef actorRef) { Subscribers.Remove(actorRef); @@ -233,6 +237,7 @@ protected override bool Business(object message) var terminated = (Terminated)message; var key = Utils.MakeKey(terminated.ActorRef); _buffer.RecreateAndForwardMessagesIfNeeded(key, () => NewGroupActor(terminated.ActorRef.Path.Name)); + Remove(terminated.ActorRef); } else return false; return true;