Skip to content

Commit

Permalink
Fix missing sendOneMessageToEachGroup field in Publish (#7202)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus authored May 23, 2024
1 parent 8be4730 commit 45b5193
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,10 @@ private long NextVersion()

private IActorRef NewTopicActor(string encodedTopic)
{
var t = Context.ActorOf(Actor.Props.Create(() => new Topic(_settings.RemovedTimeToLive, _settings.RoutingLogic, _settings.SendToDeadLettersWhenNoSubscribers)), encodedTopic);
var t = Context.ActorOf(
Actor.Props.Create(() => new Topic(_settings.RemovedTimeToLive, _settings.RoutingLogic, _settings.SendToDeadLettersWhenNoSubscribers))
.WithDeploy(Deploy.Local),
encodedTopic);
HandleRegisterTopic(t);
return t;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,10 @@ protected override bool Business(object message)

private IActorRef NewGroupActor(string encodedGroup)
{
var g = Context.ActorOf(Props.Create(() => new Group(EmptyTimeToLive, _routingLogic, SendToDeadLettersWhenNoSubscribers)), encodedGroup);
var g = Context.ActorOf(
Props.Create(() => new Group(EmptyTimeToLive, _routingLogic, SendToDeadLettersWhenNoSubscribers))
.WithDeploy(Deploy.Local),
encodedGroup);
Context.Watch(g);
Context.Parent.Tell(new RegisterTopic(g));
return g;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,14 @@ private byte[] PublishToProto(Publish publish)
var protoMessage = new Proto.Msg.Publish();
protoMessage.Topic = publish.Topic;
protoMessage.Payload = _payloadSupport.PayloadToProto(publish.Message);
protoMessage.SendOneMessageToEachGroup = publish.SendOneMessageToEachGroup;
return protoMessage.ToByteArray();
}

private Publish PublishFrom(byte[] bytes)
{
var publishProto = Proto.Msg.Publish.Parser.ParseFrom(bytes);
return new Publish(publishProto.Topic, _payloadSupport.PayloadFrom(publishProto.Payload));
return new Publish(publishProto.Topic, _payloadSupport.PayloadFrom(publishProto.Payload), publishProto.SendOneMessageToEachGroup);
}

private byte[] SendToOneSubscriberToProto(SendToOneSubscriber sendToOneSubscriber)
Expand Down
1 change: 1 addition & 0 deletions src/protobuf/DistributedPubSubMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ message SendToAll {
message Publish {
string topic = 1;
Akka.Remote.Serialization.Proto.Msg.Payload payload = 3;
bool sendOneMessageToEachGroup = 4;
}

// Send a message to only one subscriber of a group.
Expand Down

0 comments on commit 45b5193

Please sign in to comment.