From 4d1d79b568bdae6565423c3ed914f8a9606dc0e8 Mon Sep 17 00:00:00 2001 From: "maxim.salamatko" Date: Sat, 21 Nov 2015 08:37:25 +0400 Subject: [PATCH] #1416 created ReceiveActor implementation of AtLeastOnceDeliveryActor base class --- .../Akka.Persistence.Tests.csproj | 1 + .../AtLeastOnceDeliveryReceiveActorSpec.cs | 562 ++++++++++++++++++ .../AtLeastOnceDeliverySpec.cs | 11 +- .../Akka.Persistence/Akka.Persistence.csproj | 116 ++++ .../Akka.Persistence/AtLeastOnceDelivery.cs | 481 +++------------ .../AtLeastOnceDeliveryReceiveActor.cs | 164 +++++ .../AtLeastOnceDeliverySemantic.cs | 513 ++++++++++++++++ src/core/Akka.Persistence/Persistence.cs | 59 ++ 8 files changed, 1508 insertions(+), 399 deletions(-) create mode 100644 src/core/Akka.Persistence.Tests/AtLeastOnceDeliveryReceiveActorSpec.cs create mode 100644 src/core/Akka.Persistence/AtLeastOnceDeliveryReceiveActor.cs create mode 100644 src/core/Akka.Persistence/AtLeastOnceDeliverySemantic.cs diff --git a/src/core/Akka.Persistence.Tests/Akka.Persistence.Tests.csproj b/src/core/Akka.Persistence.Tests/Akka.Persistence.Tests.csproj index 1e9f1591a8a..39c87ab7801 100644 --- a/src/core/Akka.Persistence.Tests/Akka.Persistence.Tests.csproj +++ b/src/core/Akka.Persistence.Tests/Akka.Persistence.Tests.csproj @@ -76,6 +76,7 @@ + diff --git a/src/core/Akka.Persistence.Tests/AtLeastOnceDeliveryReceiveActorSpec.cs b/src/core/Akka.Persistence.Tests/AtLeastOnceDeliveryReceiveActorSpec.cs new file mode 100644 index 00000000000..e1c312830d5 --- /dev/null +++ b/src/core/Akka.Persistence.Tests/AtLeastOnceDeliveryReceiveActorSpec.cs @@ -0,0 +1,562 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2015 Typesafe Inc. +// Copyright (C) 2013-2015 Akka.NET project +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Linq; +using Akka.Actor; +using Akka.Event; +using Akka.TestKit; +using Xunit; + +namespace Akka.Persistence.Tests +{ + public class AtLeastOnceDeliveryReceiveActorSpec : PersistenceSpec + { + #region internal test classes + + [Serializable] + private sealed class AcceptedReq : IEvt + { + public AcceptedReq(string payload, string destinationPath) + { + Payload = payload; + DestinationPath = destinationPath; + } + + public string Payload { get; private set; } + + //FIXME: change to Akka.Actor.ActorPath when serialization problems will be solved + public string DestinationPath { get; private set; } + } + + [Serializable] + private sealed class Action : IEquatable + { + public Action(long id, string payload) + { + Id = id; + Payload = payload; + } + + public long Id { get; private set; } + public string Payload { get; private set; } + + public bool Equals(Action other) + { + return Id == other.Id && string.Equals(Payload, other.Payload); + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + return obj is Action && Equals((Action) obj); + } + + public override int GetHashCode() + { + unchecked + { + return (Id.GetHashCode()*397) ^ (Payload != null ? Payload.GetHashCode() : 0); + } + } + } + + [Serializable] + private sealed class ActionAck : IEquatable + { + public ActionAck(long id) + { + Id = id; + } + + public long Id { get; private set; } + + public bool Equals(ActionAck other) + { + return other != null && other.Id == Id; + } + } + + [Serializable] + private sealed class Boom + { + public static readonly Boom Instance = new Boom(); + } + + private class Destination : ReceiveActor + { + private readonly ISet _allReceived; + + public Destination(IActorRef testActor) + { + _allReceived = new HashSet(); + Receive(a => + { + if (!_allReceived.Contains(a.Id)) + { + testActor.Tell(a); + + _allReceived.Add(a.Id); + } + Sender.Tell(new ActionAck(a.Id)); + }); + } + } + + private interface IEvt + { + } + + [Serializable] + private sealed class InvalidReq + { + public static readonly InvalidReq Instance = new InvalidReq(); + + private InvalidReq() + { + } + + public override bool Equals(object obj) + { + return obj is InvalidReq; + } + } + + internal class Receiver : AtLeastOnceDeliveryReceiveActor + { + private readonly IDictionary _destinations; + private readonly bool _isAsync; + private readonly ILoggingAdapter _log; + private readonly IActorRef _testActor; + private IActorRef _lastSnapshotAskedForBy; + private readonly string _name; + + public Receiver(IActorRef testActor, string name, TimeSpan redeliverInterval, int warn, + int redeliveryBurstLimit, bool isAsync, IDictionary destinations) + : base(new PersistenceSettings.AtLeastOnceDeliverySettings(redeliverInterval, redeliveryBurstLimit, warn, 100000)) + { + _testActor = testActor; + _name = name; + _isAsync = isAsync; + _destinations = destinations; + _log = Context.GetLogger(); + + + Command(req => + { + if (string.IsNullOrEmpty(req.Payload)) Sender.Tell(InvalidReq.Instance); + else + { + char c = char.ToUpper(req.Payload[0]); + ActorPath destination = _destinations[c.ToString()]; + if (_isAsync) + PersistAsync(new AcceptedReq(req.Payload, destination.ToString()), e => + { + UpdateState(e); + Sender.Tell(ReqAck.Instance); + }); + else + Persist(new AcceptedReq(req.Payload, destination.ToString()), e => + { + UpdateState(e); + Sender.Tell(ReqAck.Instance); + }); + } + }); + Command(ack => + { + _log.Debug("Sender got ack: {0}", ack.Id); + if (ConfirmDelivery(ack.Id)) + { + if (_isAsync) + { + PersistAsync(new ReqDone(ack.Id), UpdateState); + } + else + { + Persist(new ReqDone(ack.Id), UpdateState); + } + } + }); + + Command(boom => + { + _log.Debug("Boom!"); + throw new Exception("boom"); + }); + + Command(save => + { + _log.Debug("Save snapshot"); + _lastSnapshotAskedForBy = Sender; + SaveSnapshot(new Snap(GetDeliverySnapshot())); + }); + + + Command(succ => + { + _log.Debug("Save snapshot success!"); + if (_lastSnapshotAskedForBy != null) + _lastSnapshotAskedForBy.Tell(succ); + }); + + Command(unconfirmedWarn => + { + _log.Debug("Sender got unconfirmed warning: unconfirmed deliveries count {0}", + unconfirmedWarn.UnconfirmedDeliveries.Count()); + _testActor.Tell(unconfirmedWarn); + }); + + Recover(evt => UpdateState(evt)); + + Recover(o => + { + var snap = (Snap) o.Snapshot; + SetDeliverySnapshot(snap.DeliverySnapshot); + }); + } + + public override string PersistenceId + { + get { return _name; } + } + + + + private void UpdateState(IEvt evt) + { + evt.Match() + .With(a => + { + _log.Debug("Deliver(destination, deliveryId => Action(deliveryId, {0})), recovering: {1}", + a.Payload, IsRecovering); + Deliver(ActorPath.Parse(a.DestinationPath), deliveryId => new Action(deliveryId, a.Payload)); + }) + .With(r => + { + _log.Debug("ConfirmDelivery({0}), recovering: {1}", r.Id, IsRecovering); + ConfirmDelivery(r.Id); + }); + } + } + + [Serializable] + private sealed class Req + { + public Req(string payload) + { + Payload = payload; + } + + public string Payload { get; private set; } + } + + [Serializable] + private sealed class ReqAck + { + public static readonly ReqAck Instance = new ReqAck(); + + private ReqAck() + { + } + + public override bool Equals(object obj) + { + return obj is ReqAck; + } + } + + [Serializable] + private sealed class ReqDone : IEvt, IEquatable + { + public ReqDone(long id) + { + Id = id; + } + + public long Id { get; private set; } + + public bool Equals(ReqDone other) + { + return other != null && other.Id == Id; + } + } + + [Serializable] + private sealed class SaveSnap + { + public static readonly SaveSnap Instance = new SaveSnap(); + } + + [Serializable] + private sealed class Snap + { + public Snap(AtLeastOnceDeliverySnapshot deliverySnapshot) + { + DeliverySnapshot = deliverySnapshot; + } + + public AtLeastOnceDeliverySnapshot DeliverySnapshot { get; private set; } + } + + private class Unreliable : ReceiveActor + { + private int _count; + + public Unreliable(int dropMod, IActorRef target) + { + Receive(message => + { + _count++; + if (_count%dropMod != 0) + { + target.Forward(message); + } + return true; + }); + } + } + + #endregion + + public AtLeastOnceDeliveryReceiveActorSpec() + : base(Configuration("inmem", "AtLeastOnceDeliveryReceiveActorSpec")) + { + } + + [Fact] + public void PersistentReceive_must_deliver_messages_in_order_when_nothing_is_lost() + { + TestProbe probe = CreateTestProbe(); + var destinations = new Dictionary + { + {"A", Sys.ActorOf(Props.Create(() => new Destination(probe.Ref))).Path} + }; + IActorRef sender = + Sys.ActorOf( + Props.Create( + () => + new Receiver(TestActor, Name, TimeSpan.FromMilliseconds(500), 5, 1000, false, destinations)), + Name); + + sender.Tell(new Req("a")); + ExpectMsg(ReqAck.Instance); + probe.ExpectMsg(a => a.Id == 1 && a.Payload == "a"); + probe.ExpectNoMsg(TimeSpan.FromSeconds(1)); + } + + [Fact] + public void PersistentReceive_must_redeliver_lost_messages() + { + TestProbe probe = CreateTestProbe(); + IActorRef dest = Sys.ActorOf(Props.Create(() => new Destination(probe.Ref))); + var destinations = new Dictionary + { + {"A", Sys.ActorOf(Props.Create(() => new Unreliable(3, dest))).Path} + }; + IActorRef sender = + Sys.ActorOf( + Props.Create( + () => + new Receiver(TestActor, Name, TimeSpan.FromMilliseconds(500), 5, 1000, false, destinations)), + Name); + + sender.Tell(new Req("a-1")); + ExpectMsg(ReqAck.Instance); + probe.ExpectMsg(a => a.Id == 1 && a.Payload == "a-1"); + + sender.Tell(new Req("a-2")); + ExpectMsg(ReqAck.Instance); + probe.ExpectMsg(a => a.Id == 2 && a.Payload == "a-2"); + + sender.Tell(new Req("a-3")); + sender.Tell(new Req("a-4")); + ExpectMsg(ReqAck.Instance); + ExpectMsg(ReqAck.Instance); + // a-3 was lost ... + probe.ExpectMsg(a => a.Id == 4 && a.Payload == "a-4"); + // ... and then redelivered + probe.ExpectMsg(a => a.Id == 3 && a.Payload == "a-3"); + probe.ExpectNoMsg(TimeSpan.FromSeconds(1)); + } + + [Fact] + public void PersistentReceive_must_redeliver_lost_messages_after_restart() + { + TestProbe probe = CreateTestProbe(); + IActorRef dest = Sys.ActorOf(Props.Create(() => new Destination(probe.Ref))); + var destinations = new Dictionary + { + {"A", Sys.ActorOf(Props.Create(() => new Unreliable(3, dest))).Path} + }; + IActorRef sender = + Sys.ActorOf( + Props.Create( + () => + new Receiver(TestActor, Name, TimeSpan.FromMilliseconds(500), 5, 1000, false, destinations)), + Name); + + sender.Tell(new Req("a-1")); + ExpectMsg(ReqAck.Instance); + probe.ExpectMsg(a => a.Id == 1 && a.Payload == "a-1"); + + sender.Tell(new Req("a-2")); + ExpectMsg(ReqAck.Instance); + probe.ExpectMsg(a => a.Id == 2 && a.Payload == "a-2"); + + sender.Tell(new Req("a-3")); + sender.Tell(new Req("a-4")); + ExpectMsg(ReqAck.Instance); + ExpectMsg(ReqAck.Instance); + // a-3 was lost ... + probe.ExpectMsg(a => a.Id == 4 && a.Payload == "a-4"); + // ... trigger restart ... + sender.Tell(Boom.Instance); + // ... and then redeliver + probe.ExpectMsg(a => a.Id == 3 && a.Payload == "a-3"); + + sender.Tell(new Req("a-5")); + ExpectMsg(ReqAck.Instance); + probe.ExpectMsg(a => a.Id == 5 && a.Payload == "a-5"); + + probe.ExpectNoMsg(TimeSpan.FromSeconds(1)); + } + + [Fact] + public void + PersistentReceive_must_resend_replayed_deliveries_with_an_initially_in_order_strategy_before_delivering_fresh_messages + () + { + TestProbe probe = CreateTestProbe(); + IActorRef dest = Sys.ActorOf(Props.Create(() => new Destination(probe.Ref))); + var destinations = new Dictionary + { + {"A", Sys.ActorOf(Props.Create(() => new Unreliable(2, dest))).Path} + }; + IActorRef sender = + Sys.ActorOf( + Props.Create( + () => + new Receiver(TestActor, Name, TimeSpan.FromMilliseconds(500), 5, 1000, false, destinations)), + Name); + + sender.Tell(new Req("a-1")); + ExpectMsg(ReqAck.Instance); + probe.ExpectMsg(a => a.Id == 1 && a.Payload == "a-1"); + + sender.Tell(new Req("a-2")); + ExpectMsg(ReqAck.Instance); + // a-2 was lost + + sender.Tell(new Req("a-3")); + ExpectMsg(ReqAck.Instance); + probe.ExpectMsg(a => a.Id == 3 && a.Payload == "a-3"); + + sender.Tell(new Req("a-4")); + ExpectMsg(ReqAck.Instance); + // a-4 was lost + + // trigger restart + sender.Tell(Boom.Instance); + sender.Tell(new Req("a-5")); + ExpectMsg(ReqAck.Instance); + + // and redeliver + probe.ExpectMsg(a => a.Id == 2 && a.Payload == "a-2"); // redelivered + // a-4 was redelivered but lost again + probe.ExpectMsg(a => a.Id == 5 && a.Payload == "a-5"); // redelivered + //FIXME: expression below works, just for some reason won't fit in 10 sec. interval + probe.ExpectMsg(a => a.Id == 4 && a.Payload == "a-4", TimeSpan.FromSeconds(20)); + // redelivered, 3th time + + probe.ExpectNoMsg(TimeSpan.FromSeconds(1)); + } + + [Fact] + public void PersistentReceive_must_restore_state_from_snapshot() + { + TestProbe probe = CreateTestProbe(); + IActorRef dest = Sys.ActorOf(Props.Create(() => new Destination(probe.Ref))); + var destinations = new Dictionary + { + {"A", Sys.ActorOf(Props.Create(() => new Unreliable(3, dest))).Path} + }; + IActorRef sender = + Sys.ActorOf( + Props.Create( + () => + new Receiver(TestActor, Name, TimeSpan.FromMilliseconds(500), 5, 1000, false, destinations)), + Name); + + sender.Tell(new Req("a-1")); + ExpectMsg(ReqAck.Instance); + probe.ExpectMsg(a => a.Id == 1 && a.Payload == "a-1"); + + sender.Tell(new Req("a-2")); + ExpectMsg(ReqAck.Instance); + probe.ExpectMsg(a => a.Id == 2 && a.Payload == "a-2"); + + sender.Tell(new Req("a-3")); + sender.Tell(new Req("a-4")); + sender.Tell(SaveSnap.Instance); + ExpectMsg(ReqAck.Instance); + ExpectMsg(ReqAck.Instance); + // a-3 was lost + + probe.ExpectMsg(a => a.Id == 4 && a.Payload == "a-4"); + + // after snapshot succeed + ExpectMsg(); + // trigger restart + sender.Tell(Boom.Instance); + // and then redelivered + probe.ExpectMsg(a => a.Id == 3 && a.Payload == "a-3"); + + sender.Tell(new Req("a-5")); + ExpectMsg(ReqAck.Instance); + probe.ExpectMsg(a => a.Id == 5 && a.Payload == "a-5"); + + probe.ExpectNoMsg(TimeSpan.FromSeconds(1)); + } + + [Fact] + public void PersistentReceive_must_warn_about_unconfirmed_messages() + { + TestProbe probeA = CreateTestProbe(); + TestProbe probeB = CreateTestProbe(); + + var destinations = new Dictionary {{"A", probeA.Ref.Path}, {"B", probeB.Ref.Path}}; + IActorRef sender = + Sys.ActorOf( + Props.Create( + () => + new Receiver(TestActor, Name, TimeSpan.FromMilliseconds(500), 3, 1000, false, destinations)), + Name); + + sender.Tell(new Req("a-1")); + sender.Tell(new Req("b-1")); + sender.Tell(new Req("b-2")); + ExpectMsg(ReqAck.Instance); + ExpectMsg(ReqAck.Instance); + ExpectMsg(ReqAck.Instance); + + UnconfirmedDelivery[] unconfirmed = ReceiveWhile(TimeSpan.FromSeconds(3), x => + x is UnconfirmedWarning + ? ((UnconfirmedWarning) x).UnconfirmedDeliveries + : Enumerable.Empty()) + .SelectMany(e => e).ToArray(); + + ActorPath[] resultDestinations = unconfirmed.Select(x => x.Destination).Distinct().ToArray(); + resultDestinations.ShouldOnlyContainInOrder(probeA.Ref.Path, probeB.Ref.Path); + object[] resultMessages = unconfirmed.Select(x => x.Message).Distinct().ToArray(); + resultMessages.ShouldOnlyContainInOrder(new Action(1, "a-1"), new Action(2, "b-1"), new Action(3, "b-2")); + + Sys.Stop(sender); + } + } +} \ No newline at end of file diff --git a/src/core/Akka.Persistence.Tests/AtLeastOnceDeliverySpec.cs b/src/core/Akka.Persistence.Tests/AtLeastOnceDeliverySpec.cs index bc3784c40b3..619768eeb6b 100644 --- a/src/core/Akka.Persistence.Tests/AtLeastOnceDeliverySpec.cs +++ b/src/core/Akka.Persistence.Tests/AtLeastOnceDeliverySpec.cs @@ -24,31 +24,22 @@ class Sender : AtLeastOnceDeliveryActor { private readonly IActorRef _testActor; private readonly string _name; - private readonly TimeSpan _redeliverInterval; - private readonly int _warn; - private readonly int _redeliveryBurstLimit; private readonly bool _isAsync; private readonly IDictionary _destinations; private readonly ILoggingAdapter _log; private IActorRef _lastSnapshotAskedForBy; public Sender(IActorRef testActor, string name, TimeSpan redeliverInterval, int warn, int redeliveryBurstLimit, bool isAsync, IDictionary destinations) - : base() + : base(new PersistenceSettings.AtLeastOnceDeliverySettings(redeliverInterval, redeliveryBurstLimit, warn, 100000)) { _testActor = testActor; _name = name; - _redeliverInterval = redeliverInterval; - _warn = warn; - _redeliveryBurstLimit = redeliveryBurstLimit; _isAsync = isAsync; _destinations = destinations; _log = Context.GetLogger(); } public override string PersistenceId { get { return _name; } } - public override TimeSpan RedeliverInterval { get { return _redeliverInterval; } } - public override int UnconfirmedDeliveryAttemptsToWarn { get { return _warn; } } - public override int RedeliveryBurstLimit { get { return _redeliveryBurstLimit; } } protected override bool ReceiveRecover(object message) { diff --git a/src/core/Akka.Persistence/Akka.Persistence.csproj b/src/core/Akka.Persistence/Akka.Persistence.csproj index 1eddee4b7e7..c97b3f82ce2 100644 --- a/src/core/Akka.Persistence/Akka.Persistence.csproj +++ b/src/core/Akka.Persistence/Akka.Persistence.csproj @@ -53,6 +53,8 @@ Properties\SharedAssemblyInfo.cs + + @@ -112,4 +114,118 @@ --> + + + + + ..\..\..\packages\Google.ProtocolBuffers\lib\net40\Google.ProtocolBuffers.Serialization.dll + True + True + + + ..\..\..\packages\Google.ProtocolBuffers\lib\net40\Google.ProtocolBuffers.dll + True + True + + + + + + + ..\..\..\packages\Google.ProtocolBuffers\lib\sl30\Google.ProtocolBuffers.Serialization.dll + True + True + + + ..\..\..\packages\Google.ProtocolBuffers\lib\sl30\Google.ProtocolBuffers.dll + True + True + + + + + + + ..\..\..\packages\Google.ProtocolBuffers\lib\sl40\Google.ProtocolBuffers.Serialization.dll + True + True + + + ..\..\..\packages\Google.ProtocolBuffers\lib\sl40\Google.ProtocolBuffers.dll + True + True + + + + + + + ..\..\..\packages\Google.ProtocolBuffers\lib\portable-net40+sl4+sl5+wp7+wp8+win8\Google.ProtocolBuffers.Serialization.dll + True + True + + + ..\..\..\packages\Google.ProtocolBuffers\lib\portable-net40+sl4+sl5+wp7+wp8+win8\Google.ProtocolBuffers.dll + True + True + + + + + + + + + ..\..\..\packages\Newtonsoft.Json\lib\net35\Newtonsoft.Json.dll + True + True + + + + + + + ..\..\..\packages\Newtonsoft.Json\lib\net20\Newtonsoft.Json.dll + True + True + + + + + + + ..\..\..\packages\Newtonsoft.Json\lib\net40\Newtonsoft.Json.dll + True + True + + + + + + + ..\..\..\packages\Newtonsoft.Json\lib\net45\Newtonsoft.Json.dll + True + True + + + + + + + ..\..\..\packages\Newtonsoft.Json\lib\portable-net45+wp80+win8+wpa81+dnxcore50\Newtonsoft.Json.dll + True + True + + + + + + + ..\..\..\packages\Newtonsoft.Json\lib\portable-net40+sl5+wp80+win8+wpa81\Newtonsoft.Json.dll + True + True + + + + \ No newline at end of file diff --git a/src/core/Akka.Persistence/AtLeastOnceDelivery.cs b/src/core/Akka.Persistence/AtLeastOnceDelivery.cs index 0d3ccb3da91..47a36d490cb 100644 --- a/src/core/Akka.Persistence/AtLeastOnceDelivery.cs +++ b/src/core/Akka.Persistence/AtLeastOnceDelivery.cs @@ -6,468 +6,171 @@ //----------------------------------------------------------------------- using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Runtime.Serialization; using Akka.Actor; using Akka.Actor.Internal; -using Akka.Persistence.Serialization; namespace Akka.Persistence { - #region Messages - /// - /// Snapshot of a current state. Can be retrieved with - /// and saved with . - /// During recovery the snapshot received in should be sent with - /// . + /// Persistent actor type, that sends messages with at-least-once delivery semantics to it's destinations. + /// It takes care of re-sending messages when they haven't been confirmed withing expected timeout. The same + /// message may be send twice or more to the same destination as a result of possible resends. + /// Use a method to send a message to a destination. Call the + /// + /// method once destination has replied with a confirmation message. The interval between redelivery attempts + /// can be defined with . After a number of delivery attempts an + /// message will be sent to . The resending will + /// continue, + /// but you may choose to cancel resending. + /// This actor type has state consisting of unconfirmed messages and a sequence number. It doesn't store it by + /// itself, so you must persist corresponding events so that state can be restored by calling the same + /// delivery-related methods during recovery phase of the persistent actor. During recovery calls to + /// won't send out a message, but it will be sent later if no + /// + /// call was performed. + /// Support for snapshot is provided by get and set delivery snapshot methods. These snapshots contains full + /// delivery state including unconfirmed messages. For custom snapshots remember to include those delivery ones. /// - [Serializable] - public sealed class AtLeastOnceDeliverySnapshot : IMessage, IEquatable - { - public AtLeastOnceDeliverySnapshot(long deliveryId, UnconfirmedDelivery[] unconfirmedDeliveries) - { - if(unconfirmedDeliveries == null) - throw new ArgumentNullException("unconfirmedDeliveries", "AtLeastOnceDeliverySnapshot expects not null array of unconfirmed deliveries"); - - DeliveryId = deliveryId; - UnconfirmedDeliveries = unconfirmedDeliveries; - } - - public readonly long DeliveryId; - public readonly UnconfirmedDelivery[] UnconfirmedDeliveries; - - public bool Equals(AtLeastOnceDeliverySnapshot other) - { - if (ReferenceEquals(other, null)) return false; - if (ReferenceEquals(this, other)) return true; - - return Equals(DeliveryId, other.DeliveryId) - && Equals(UnconfirmedDeliveries, UnconfirmedDeliveries); - } - - public override bool Equals(object obj) - { - return Equals(obj as AtLeastOnceDeliverySnapshot); - } - - public override int GetHashCode() - { - unchecked - { - return (DeliveryId.GetHashCode() * 397) ^ (UnconfirmedDeliveries != null ? UnconfirmedDeliveries.GetHashCode() : 0); - } - } - - public override string ToString() - { - return string.Format("AtLeastOnceDeliverySnapshot", DeliveryId, UnconfirmedDeliveries.Length); - } - } - - /// - /// message should be sent after - /// limit will be reached. - /// - [Serializable] - public sealed class UnconfirmedWarning : IEquatable - { - public UnconfirmedWarning(UnconfirmedDelivery[] unconfirmedDeliveries) - { - if(unconfirmedDeliveries == null) - throw new ArgumentNullException("unconfirmedDeliveries", "UnconfirmedWarning expects not null array of unconfirmed deliveries"); - - UnconfirmedDeliveries = unconfirmedDeliveries; - } - - public readonly UnconfirmedDelivery[] UnconfirmedDeliveries; - - public bool Equals(UnconfirmedWarning other) - { - if (ReferenceEquals(other, null)) return false; - if (ReferenceEquals(this, other)) return true; - - return Equals(UnconfirmedDeliveries, other.UnconfirmedDeliveries); - } - - public override bool Equals(object obj) - { - return Equals(obj as UnconfirmedWarning); - } - - public override int GetHashCode() - { - return (UnconfirmedDeliveries != null ? UnconfirmedDeliveries.GetHashCode() : 0); - } - - public override string ToString() - { - return string.Format("UnconfirmedWarning", UnconfirmedDeliveries.Length); - } - } - - /// - /// contains details about unconfirmed messages. - /// It's included inside and . - /// - [Serializable] - public sealed class UnconfirmedDelivery : IEquatable - { - public UnconfirmedDelivery(long deliveryId, ActorPath destination, object message) - { - DeliveryId = deliveryId; - Destination = destination; - Message = message; - } - - public readonly long DeliveryId; - public readonly ActorPath Destination; - public readonly object Message; - - public bool Equals(UnconfirmedDelivery other) - { - if (ReferenceEquals(other, null)) return false; - if (ReferenceEquals(this, other)) return true; - - return Equals(DeliveryId, other.DeliveryId) - && Equals(Destination, other.Destination) - && Equals(Message, other.Message); - } - - public override bool Equals(object obj) - { - return Equals(obj as UnconfirmedDelivery); - } - - public override int GetHashCode() - { - unchecked - { - var hashCode = DeliveryId.GetHashCode(); - hashCode = (hashCode * 397) ^ (Destination != null ? Destination.GetHashCode() : 0); - hashCode = (hashCode * 397) ^ (Message != null ? Message.GetHashCode() : 0); - return hashCode; - } - } - - public override string ToString() - { - return string.Format("UnconfirmedDelivery", DeliveryId, Destination, Message); - } - } - - [Serializable] - internal sealed class Delivery : IEquatable + public abstract class AtLeastOnceDeliveryActor : PersistentActor, IInitializableActor { - public Delivery(ActorPath destination, object message, DateTime timestamp, int attempt) - { - Destination = destination; - Message = message; - Timestamp = timestamp; - Attempt = attempt; - } - - public readonly ActorPath Destination; - public readonly object Message; - public readonly DateTime Timestamp; - public readonly int Attempt; - - public Delivery IncrementedCopy() - { - return new Delivery(Destination, Message, Timestamp, Attempt + 1); - } - - public bool Equals(Delivery other) - { - if (ReferenceEquals(other, null)) return false; - if (ReferenceEquals(this, other)) return true; - - return Equals(Attempt, other.Attempt) - && Equals(Timestamp, other.Timestamp) - && Equals(Destination, other.Destination) - && Equals(Message, other.Message); - } - - public override bool Equals(object obj) - { - return Equals(obj as Delivery); - } + private readonly AtLeastOnceDeliverySemantic _atLeastOnceDeliverySemantic; - public override int GetHashCode() + protected AtLeastOnceDeliveryActor() { - unchecked - { - var hashCode = (Destination != null ? Destination.GetHashCode() : 0); - hashCode = (hashCode * 397) ^ (Message != null ? Message.GetHashCode() : 0); - hashCode = (hashCode * 397) ^ Timestamp.GetHashCode(); - hashCode = (hashCode * 397) ^ Attempt; - return hashCode; - } + _atLeastOnceDeliverySemantic = new AtLeastOnceDeliverySemantic(Context, Extension.Settings.AtLeastOnceDelivery); } - public override string ToString() - { - return string.Format("Delivery - /// This exception is thrown when the threshold has been exceeded. - /// - public class MaxUnconfirmedMessagesExceededException : AkkaException - { /// - /// Initializes a new instance of the class. + /// Interval between redelivery attempts. /// - /// The message that describes the error. - /// The exception that is the cause of the current exception. - public MaxUnconfirmedMessagesExceededException(string message, Exception cause = null) - : base(message, cause) + public TimeSpan RedeliverInterval { + get { return _atLeastOnceDeliverySemantic.RedeliverInterval; } } /// - /// Initializes a new instance of the class. + /// Maximum number of unconfirmed messages that will be sent at each redelivery burst. This is to help to + /// prevent overflowing amount of messages to be sent at once, for eg. when destination cannot be reached for a long + /// time. /// - /// The that holds the serialized object data about the exception being thrown. - /// The that contains contextual information about the source or destination. - protected MaxUnconfirmedMessagesExceededException(SerializationInfo info, StreamingContext context) - : base(info, context) + public int RedeliveryBurstLimit { + get { return _atLeastOnceDeliverySemantic.RedeliveryBurstLimit; } } - } - /// - /// Persistent actor type, that sends messages with at-least-once delivery semantics to it's destinations. - /// It takes care of re-sending messages when they haven't been confirmed withing expected timeout. The same - /// message may be send twice or more to the same destination as a result of possible resends. - /// - /// Use a method to send a message to a destination. Call the - /// method once destination has replied with a confirmation message. The interval between redelivery attempts - /// can be defined with . After a number of delivery attempts an - /// message will be sent to . The resending will continue, - /// but you may choose to cancel resending. - /// - /// This actor type has state consisting of unconfirmed messages and a sequence number. It doesn't store it by - /// itself, so you must persist corresponding events so that state can be restored by calling the same - /// delivery-related methods during recovery phase of the persistent actor. During recovery calls to - /// won't send out a message, but it will be sent later if no - /// call was performed. - /// - /// Support for snapshot is provided by get and set delivery snapshot methods. These snapshots contains full - /// delivery state including unconfirmed messages. For custom snapshots remember to include those delivery ones. - /// - public abstract class AtLeastOnceDeliveryActor : PersistentActor, IInitializableActor - { - private ICancelable _redeliverScheduleCancelable; - private long _deliverySequenceNr = 0L; - private ConcurrentDictionary _unconfirmed = new ConcurrentDictionary(); - /// - /// Invoked after actor has been created and all of it's fields have been initialized. + /// After this number of delivery attempts a message will be sent to + /// . + /// The count is reset after restart. /// - public void Init() + public int UnconfirmedDeliveryAttemptsToWarn { - _redeliverScheduleCancelable = ScheduleRedelivery(); + get { return _atLeastOnceDeliverySemantic.UnconfirmedDeliveryAttemptsToWarn; } } - /// - /// Interval between redelivery attempts. + /// Maximum number of unconfirmed messages, that this actor is allowed to hold in the memory. When this + /// number is exceed, will throw + /// + /// instead of accepting messages. /// - public virtual TimeSpan RedeliverInterval { get { return DefaultRedeliverInterval; } } - protected TimeSpan DefaultRedeliverInterval { get { return Extension.Settings.AtLeastOnceDelivery.RedeliverInterval; } } - - /// - /// Maximum number of unconfirmed messages that will be sent at each redelivery burst. This is to help to - /// prevent overflowing amount of messages to be sent at once, for eg. when destination cannot be reached for a long time. - /// - public virtual int RedeliveryBurstLimit { get { return DefaultRedeliveryBurstLimit; } } - protected int DefaultRedeliveryBurstLimit { get { return Extension.Settings.AtLeastOnceDelivery.RedeliveryBurstLimit; } } - - /// - /// After this number of delivery attempts a message will be sent to . - /// The count is reset after restart. - /// - public virtual int UnconfirmedDeliveryAttemptsToWarn { get { return DefaultUnconfirmedDeliveryAttemptsToWarn; } } - protected int DefaultUnconfirmedDeliveryAttemptsToWarn { get { return Extension.Settings.AtLeastOnceDelivery.UnconfirmedAttemptsToWarn; } } - - /// - /// Maximum number of unconfirmed messages, that this actor is allowed to hold in the memory. When this - /// number is exceed, will throw - /// instead of accepting messages. - /// - public virtual int MaxUnconfirmedMessages { get { return DefaultMaxUnconfirmedMessages; } } - protected int DefaultMaxUnconfirmedMessages { get { return Extension.Settings.AtLeastOnceDelivery.MaxUnconfirmedMessages; } } - - /// - /// Number of messages, that have not been confirmed yet. - /// - public int UnconfirmedCount { get { return _unconfirmed.Count; } } - - /// - /// Send the message created with function to the - /// actor. It will retry sending the message until the delivery is confirmed with . - /// Correlation between these two methods is performed by delivery id - parameter of . - /// Usually it's passed inside the message to the destination, which replies with the message having the same id. - /// - /// During recovery this method won't send out any message, but it will be sent later until corresponding - /// method will be invoked. - /// - /// - /// Thrown when is greater than or equal to . - /// - public void Deliver(ActorPath destination, Func deliveryMessageMapper) + public int MaxUnconfirmedMessages { - if (UnconfirmedCount >= MaxUnconfirmedMessages) - { - throw new MaxUnconfirmedMessagesExceededException(string.Format("{0} has too many unconfirmed messages. Maximum allowed is {1}", PersistenceId, MaxUnconfirmedMessages)); - } - - var deliveryId = NextDeliverySequenceNr(); - var now = IsRecovering ? DateTime.UtcNow - RedeliverInterval : DateTime.UtcNow; - var delivery = new Delivery(destination, deliveryMessageMapper(deliveryId), now, attempt: 0); - - if (IsRecovering) - { - _unconfirmed.AddOrUpdate(deliveryId, delivery, (id, d) => delivery); - } - else - { - Send(deliveryId, delivery, now); - } + get { return _atLeastOnceDeliverySemantic.MaxUnconfirmedMessages; } } + /// - /// Call this method to confirm that message with has been sent - /// or to cancel redelivery attempts. + /// Number of messages, that have not been confirmed yet. /// - /// True if delivery was confirmed first time, false for duplicate confirmations. - public bool ConfirmDelivery(long deliveryId) + public int UnconfirmedCount { - Delivery delivery; - return _unconfirmed.TryRemove(deliveryId, out delivery); + get { return _atLeastOnceDeliverySemantic.UnconfirmedCount; } } - /// - /// Returns full state of the current delivery actor. Could be saved using method. - /// During recovery a snapshot received in should be set with . - /// - public AtLeastOnceDeliverySnapshot GetDeliverySnapshot() + public void Init() { - var unconfirmedDeliveries = _unconfirmed - .Select(e => new UnconfirmedDelivery(e.Key, e.Value.Destination, e.Value.Message)) - .ToArray(); - - return new AtLeastOnceDeliverySnapshot(_deliverySequenceNr, unconfirmedDeliveries); + _atLeastOnceDeliverySemantic.Init(); } - /// - /// If snapshot from was saved, it will be received during recovery phase in a - /// message and should be set with this method. - /// - /// - public void SetDeliverySnapshot(AtLeastOnceDeliverySnapshot snapshot) - { - _deliverySequenceNr = snapshot.DeliveryId; - var now = DateTime.UtcNow; - var unconfirmedDeliveries = snapshot.UnconfirmedDeliveries - .Select(u => new KeyValuePair(u.DeliveryId, new Delivery(u.Destination, u.Message, now, 0))); - - _unconfirmed = new ConcurrentDictionary(unconfirmedDeliveries); - } public override void AroundPostRestart(Exception cause, object message) { - _redeliverScheduleCancelable.Cancel(); + _atLeastOnceDeliverySemantic.Cancel(); base.AroundPostRestart(cause, message); } public override void AroundPostStop() { - _redeliverScheduleCancelable.Cancel(); + _atLeastOnceDeliverySemantic.Cancel(); base.AroundPostStop(); } protected override void OnReplaySuccess() { - RedeliverOverdue(); - base.OnReplaySuccess(); + _atLeastOnceDeliverySemantic.OnReplaySuccess(); } protected override bool AroundReceive(Receive receive, object message) { - if (message is RedeliveryTick) - { - RedeliverOverdue(); - return true; - } - - return base.AroundReceive(receive, message); + return _atLeastOnceDeliverySemantic.AroundReceive(receive, message) || base.AroundReceive(receive, message); } - private void Send(long deliveryId, Delivery delivery, DateTime timestamp) + /// + /// If snapshot from was saved, it will be received during recovery phase in a + /// message and should be set with this method. + /// + /// + public void SetDeliverySnapshot(AtLeastOnceDeliverySnapshot snapshot) { - var destination = Context.ActorSelection(delivery.Destination); - destination.Tell(delivery.Message); - - var dcopy = new Delivery(delivery.Destination, delivery.Message, timestamp, delivery.Attempt + 1); - _unconfirmed.AddOrUpdate(deliveryId, dcopy, (id, d) => dcopy); + _atLeastOnceDeliverySemantic.SetDeliverySnapshot(snapshot); } - private void RedeliverOverdue() + /// + /// Call this method to confirm that message with has been sent + /// or to cancel redelivery attempts. + /// + /// True if delivery was confirmed first time, false for duplicate confirmations. + public bool ConfirmDelivery(long deliveryId) { - var now = DateTime.UtcNow; - var deadline = now - RedeliverInterval; - var warnings = new List(); - - foreach (var entry in _unconfirmed.Where(e => e.Value.Timestamp <= deadline).Take(RedeliveryBurstLimit).ToArray()) - { - var deliveryId = entry.Key; - var unconfirmedDelivery = entry.Value; - - Send(deliveryId, unconfirmedDelivery, now); - - if (unconfirmedDelivery.Attempt == UnconfirmedDeliveryAttemptsToWarn) - { - warnings.Add(new UnconfirmedDelivery(deliveryId, unconfirmedDelivery.Destination, unconfirmedDelivery.Message)); - } - } - - if (warnings.Count != 0) - { - Self.Tell(new UnconfirmedWarning(warnings.ToArray())); - } + return _atLeastOnceDeliverySemantic.ConfirmDelivery(deliveryId); } - private long NextDeliverySequenceNr() + + /// + /// Returns full state of the current delivery actor. Could be saved using + /// method. + /// During recovery a snapshot received in should be set with + /// . + /// + public AtLeastOnceDeliverySnapshot GetDeliverySnapshot() { - return (++_deliverySequenceNr); + return _atLeastOnceDeliverySemantic.GetDeliverySnapshot(); } - private ICancelable ScheduleRedelivery() + /// + /// Send the message created with function to the + /// + /// actor. It will retry sending the message until the delivery is confirmed with . + /// Correlation between these two methods is performed by delivery id - parameter of + /// . + /// Usually it's passed inside the message to the destination, which replies with the message having the same id. + /// During recovery this method won't send out any message, but it will be sent later until corresponding + /// method will be invoked. + /// + /// + /// Thrown when is greater than or equal to . + /// + public void Deliver(ActorPath destination, Func deliveryMessageMapper) { - var interval = new TimeSpan(RedeliverInterval.Ticks / 2); - return Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(interval, interval, Self, RedeliveryTick.Instance, Self); + _atLeastOnceDeliverySemantic.Deliver(destination, deliveryMessageMapper, IsRecovering); } } -} - +} \ No newline at end of file diff --git a/src/core/Akka.Persistence/AtLeastOnceDeliveryReceiveActor.cs b/src/core/Akka.Persistence/AtLeastOnceDeliveryReceiveActor.cs new file mode 100644 index 00000000000..88ad476e3db --- /dev/null +++ b/src/core/Akka.Persistence/AtLeastOnceDeliveryReceiveActor.cs @@ -0,0 +1,164 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2015 Typesafe Inc. +// Copyright (C) 2013-2015 Akka.NET project +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Actor.Internal; +using Akka.Dispatch; +using Akka.Tools.MatchHandler; + +namespace Akka.Persistence +{ + /// + /// Receive persistent actor type, that sends messages with at-least-once delivery semantics to it's destinations. + /// + public abstract class AtLeastOnceDeliveryReceiveActor : ReceivePersistentActor + { + private readonly AtLeastOnceDeliverySemantic _atLeastOnceDeliverySemantic; + + protected AtLeastOnceDeliveryReceiveActor() + : base() + { + _atLeastOnceDeliverySemantic = new AtLeastOnceDeliverySemantic(Context, Extension.Settings.AtLeastOnceDelivery); + _atLeastOnceDeliverySemantic.Init(); + + } + protected AtLeastOnceDeliveryReceiveActor(PersistenceSettings.AtLeastOnceDeliverySettings settings) + : base() + { + _atLeastOnceDeliverySemantic = new AtLeastOnceDeliverySemantic(Context, settings); + _atLeastOnceDeliverySemantic.Init(); + + } + + /// + /// Interval between redelivery attempts. + /// + public virtual TimeSpan RedeliverInterval + { + get { return _atLeastOnceDeliverySemantic.RedeliverInterval; ; } + } + + + /// + /// Maximum number of unconfirmed messages that will be sent at each redelivery burst. This is to help to + /// prevent overflowing amount of messages to be sent at once, for eg. when destination cannot be reached for a long + /// time. + /// + public int RedeliveryBurstLimit + { + get { return _atLeastOnceDeliverySemantic.RedeliveryBurstLimit; } + } + + /// + /// After this number of delivery attempts a message will be sent to + /// . + /// The count is reset after restart. + /// + public int UnconfirmedDeliveryAttemptsToWarn + { + get { return _atLeastOnceDeliverySemantic.UnconfirmedDeliveryAttemptsToWarn; } + } + + /// + /// Maximum number of unconfirmed messages, that this actor is allowed to hold in the memory. When this + /// number is exceed, will throw + /// + /// instead of accepting messages. + /// + public int MaxUnconfirmedMessages + { + get { return _atLeastOnceDeliverySemantic.MaxUnconfirmedMessages; } + } + + /// + /// Number of messages, that have not been confirmed yet. + /// + public int UnconfirmedCount + { + get { return _atLeastOnceDeliverySemantic.UnconfirmedCount; } + } + + + public override void AroundPostRestart(Exception cause, object message) + { + _atLeastOnceDeliverySemantic.Cancel(); + base.AroundPostRestart(cause, message); + } + + public override void AroundPostStop() + { + _atLeastOnceDeliverySemantic.Cancel(); + base.AroundPostStop(); + } + + + protected override void OnReplaySuccess() + { + _atLeastOnceDeliverySemantic.OnReplaySuccess(); + } + + protected override bool AroundReceive(Receive receive, object message) + { + return _atLeastOnceDeliverySemantic.AroundReceive(receive, message) || base.AroundReceive(receive, message); + } + + /// + /// If snapshot from was saved, it will be received during recovery phase in a + /// message and should be set with this method. + /// + /// + public void SetDeliverySnapshot(AtLeastOnceDeliverySnapshot snapshot) + { + _atLeastOnceDeliverySemantic.SetDeliverySnapshot(snapshot); + } + + /// + /// Call this method to confirm that message with has been sent + /// or to cancel redelivery attempts. + /// + /// True if delivery was confirmed first time, false for duplicate confirmations. + public bool ConfirmDelivery(long deliveryId) + { + return _atLeastOnceDeliverySemantic.ConfirmDelivery(deliveryId); + } + + + /// + /// Returns full state of the current delivery actor. Could be saved using + /// method. + /// During recovery a snapshot received in should be set with + /// . + /// + public AtLeastOnceDeliverySnapshot GetDeliverySnapshot() + { + return _atLeastOnceDeliverySemantic.GetDeliverySnapshot(); + } + + /// + /// Send the message created with function to the + /// + /// actor. It will retry sending the message until the delivery is confirmed with . + /// Correlation between these two methods is performed by delivery id - parameter of + /// . + /// Usually it's passed inside the message to the destination, which replies with the message having the same id. + /// During recovery this method won't send out any message, but it will be sent later until corresponding + /// method will be invoked. + /// + /// + /// Thrown when is greater than or equal to . + /// + public void Deliver(ActorPath destination, Func deliveryMessageMapper) + { + _atLeastOnceDeliverySemantic.Deliver(destination, deliveryMessageMapper, IsRecovering); + } + + + } +} \ No newline at end of file diff --git a/src/core/Akka.Persistence/AtLeastOnceDeliverySemantic.cs b/src/core/Akka.Persistence/AtLeastOnceDeliverySemantic.cs new file mode 100644 index 00000000000..b751c037898 --- /dev/null +++ b/src/core/Akka.Persistence/AtLeastOnceDeliverySemantic.cs @@ -0,0 +1,513 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2015 Typesafe Inc. +// Copyright (C) 2013-2015 Akka.NET project +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.Serialization; +using Akka.Actor; +using Akka.Persistence.Serialization; + +namespace Akka.Persistence +{ + + #region Messages + + /// + /// Snapshot of a current state. Can be retrieved with + /// and saved with + /// . + /// During recovery the snapshot received in should be sent with + /// . + /// + public sealed class AtLeastOnceDeliverySnapshot : IMessage, IEquatable + { + public readonly long DeliveryId; + public readonly UnconfirmedDelivery[] UnconfirmedDeliveries; + + public AtLeastOnceDeliverySnapshot(long deliveryId, UnconfirmedDelivery[] unconfirmedDeliveries) + { + if (unconfirmedDeliveries == null) + throw new ArgumentNullException("unconfirmedDeliveries", + "AtLeastOnceDeliverySnapshot expects not null array of unconfirmed deliveries"); + + DeliveryId = deliveryId; + UnconfirmedDeliveries = unconfirmedDeliveries; + } + + public bool Equals(AtLeastOnceDeliverySnapshot other) + { + if (ReferenceEquals(other, null)) return false; + if (ReferenceEquals(this, other)) return true; + + return Equals(DeliveryId, other.DeliveryId) + && Equals(UnconfirmedDeliveries, UnconfirmedDeliveries); + } + + public override bool Equals(object obj) + { + return Equals(obj as AtLeastOnceDeliverySnapshot); + } + + public override int GetHashCode() + { + unchecked + { + return (DeliveryId.GetHashCode()*397) ^ + (UnconfirmedDeliveries != null ? UnconfirmedDeliveries.GetHashCode() : 0); + } + } + + public override string ToString() + { + return string.Format("AtLeastOnceDeliverySnapshot", DeliveryId, + UnconfirmedDeliveries.Length); + } + } + + /// + /// contains details about unconfirmed messages. + /// It's included inside and . + /// + [Serializable] + public sealed class UnconfirmedDelivery : IEquatable + { + public readonly long DeliveryId; + public readonly ActorPath Destination; + public readonly object Message; + + public UnconfirmedDelivery(long deliveryId, ActorPath destination, object message) + { + DeliveryId = deliveryId; + Destination = destination; + Message = message; + } + + public bool Equals(UnconfirmedDelivery other) + { + if (ReferenceEquals(other, null)) return false; + if (ReferenceEquals(this, other)) return true; + + return Equals(DeliveryId, other.DeliveryId) + && Equals(Destination, other.Destination) + && Equals(Message, other.Message); + } + + public override bool Equals(object obj) + { + return Equals(obj as UnconfirmedDelivery); + } + + public override int GetHashCode() + { + unchecked + { + int hashCode = DeliveryId.GetHashCode(); + hashCode = (hashCode*397) ^ (Destination != null ? Destination.GetHashCode() : 0); + hashCode = (hashCode*397) ^ (Message != null ? Message.GetHashCode() : 0); + return hashCode; + } + } + + public override string ToString() + { + return string.Format("UnconfirmedDelivery", DeliveryId, + Destination, Message); + } + } + + /// + /// message should be sent after + /// limit will be reached. + /// + [Serializable] + public sealed class UnconfirmedWarning : IEquatable + { + public readonly UnconfirmedDelivery[] UnconfirmedDeliveries; + + public UnconfirmedWarning(UnconfirmedDelivery[] unconfirmedDeliveries) + { + if (unconfirmedDeliveries == null) + throw new ArgumentNullException("unconfirmedDeliveries", + "UnconfirmedWarning expects not null array of unconfirmed deliveries"); + + UnconfirmedDeliveries = unconfirmedDeliveries; + } + + public bool Equals(UnconfirmedWarning other) + { + if (ReferenceEquals(other, null)) return false; + if (ReferenceEquals(this, other)) return true; + + return Equals(UnconfirmedDeliveries, other.UnconfirmedDeliveries); + } + + public override bool Equals(object obj) + { + return Equals(obj as UnconfirmedWarning); + } + + public override int GetHashCode() + { + return (UnconfirmedDeliveries != null ? UnconfirmedDeliveries.GetHashCode() : 0); + } + + public override string ToString() + { + return string.Format("UnconfirmedWarning", UnconfirmedDeliveries.Length); + } + } + + + [Serializable] + internal sealed class Delivery : IEquatable + { + public readonly int Attempt; + public readonly ActorPath Destination; + public readonly object Message; + public readonly DateTime Timestamp; + + public Delivery(ActorPath destination, object message, DateTime timestamp, int attempt) + { + Destination = destination; + Message = message; + Timestamp = timestamp; + Attempt = attempt; + } + + public bool Equals(Delivery other) + { + if (ReferenceEquals(other, null)) return false; + if (ReferenceEquals(this, other)) return true; + + return Equals(Attempt, other.Attempt) + && Equals(Timestamp, other.Timestamp) + && Equals(Destination, other.Destination) + && Equals(Message, other.Message); + } + + public Delivery IncrementedCopy() + { + return new Delivery(Destination, Message, Timestamp, Attempt + 1); + } + + public override bool Equals(object obj) + { + return Equals(obj as Delivery); + } + + public override int GetHashCode() + { + unchecked + { + int hashCode = (Destination != null ? Destination.GetHashCode() : 0); + hashCode = (hashCode*397) ^ (Message != null ? Message.GetHashCode() : 0); + hashCode = (hashCode*397) ^ Timestamp.GetHashCode(); + hashCode = (hashCode*397) ^ Attempt; + return hashCode; + } + } + + public override string ToString() + { + return string.Format("Delivery + /// This exception is thrown when the threshold has been exceeded. + /// + public class MaxUnconfirmedMessagesExceededException : AkkaException + { + /// + /// Initializes a new instance of the class. + /// + /// The message that describes the error. + /// The exception that is the cause of the current exception. + public MaxUnconfirmedMessagesExceededException(string message, Exception cause = null) + : base(message, cause) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// + /// The that holds the serialized object data about the exception being + /// thrown. + /// + /// + /// The that contains contextual information about the source or + /// destination. + /// + protected MaxUnconfirmedMessagesExceededException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } + } + + internal sealed class RedeliveryTick + { + public static readonly RedeliveryTick Instance = new RedeliveryTick(); + + private RedeliveryTick() + { + } + + public override bool Equals(object obj) + { + return obj is RedeliveryTick; + } + } + + #region actor methods + + private readonly IActorContext _context; + private long _deliverySequenceNr; + private ICancelable _redeliverScheduleCancelable; + private readonly PersistenceSettings.AtLeastOnceDeliverySettings _settings; + private ConcurrentDictionary _unconfirmed = new ConcurrentDictionary(); + + + public AtLeastOnceDeliverySemantic(IActorContext context, PersistenceSettings.AtLeastOnceDeliverySettings settings) + { + _context = context; + _settings = settings; + } + + + /// + /// Interval between redelivery attempts. + /// + public virtual TimeSpan RedeliverInterval + { + get { return DefaultRedeliverInterval; } + } + + protected TimeSpan DefaultRedeliverInterval + { + get { return _settings.RedeliverInterval; } + } + + /// + /// Maximum number of unconfirmed messages that will be sent at each redelivery burst. This is to help to + /// prevent overflowing amount of messages to be sent at once, for eg. when destination cannot be reached for a long + /// time. + /// + public virtual int RedeliveryBurstLimit + { + get { return DefaultRedeliveryBurstLimit; } + } + + protected int DefaultRedeliveryBurstLimit + { + get { return _settings.RedeliveryBurstLimit; } + } + + /// + /// After this number of delivery attempts a message will be sent to + /// . + /// The count is reset after restart. + /// + public virtual int UnconfirmedDeliveryAttemptsToWarn + { + get { return DefaultUnconfirmedDeliveryAttemptsToWarn; } + } + + protected int DefaultUnconfirmedDeliveryAttemptsToWarn + { + get { return _settings.UnconfirmedAttemptsToWarn; } + } + + /// + /// Maximum number of unconfirmed messages, that this actor is allowed to hold in the memory. When this + /// number is exceed, will throw + /// instead of accepting messages. + /// + public virtual int MaxUnconfirmedMessages + { + get { return DefaultMaxUnconfirmedMessages; } + } + + protected int DefaultMaxUnconfirmedMessages + { + get { return _settings.MaxUnconfirmedMessages; } + } + + /// + /// Number of messages, that have not been confirmed yet. + /// + public int UnconfirmedCount + { + get { return _unconfirmed.Count; } + } + + /// + /// Invoked after actor has been created and all of it's fields have been initialized. + /// + public void Init() + { + _redeliverScheduleCancelable = ScheduleRedelivery(); + } + + /// + /// Send the message created with function to the + /// + /// actor. It will retry sending the message until the delivery is confirmed with . + /// Correlation between these two methods is performed by delivery id - parameter of + /// . + /// Usually it's passed inside the message to the destination, which replies with the message having the same id. + /// During recovery this method won't send out any message, but it will be sent later until corresponding + /// method will be invoked. + /// + /// + /// Thrown when is greater than or equal to . + /// + public void Deliver(ActorPath destination, Func deliveryMessageMapper, bool isRecovering = false) + { + if (UnconfirmedCount >= MaxUnconfirmedMessages) + { + throw new MaxUnconfirmedMessagesExceededException( + string.Format("{0} has too many unconfirmed messages. Maximum allowed is {1}", _context.Self, + MaxUnconfirmedMessages)); + } + + long deliveryId = NextDeliverySequenceNr(); + DateTime now = isRecovering ? DateTime.UtcNow - RedeliverInterval : DateTime.UtcNow; + var delivery = new Delivery(destination, deliveryMessageMapper(deliveryId), now, 0); + + if (isRecovering) + { + _unconfirmed.AddOrUpdate(deliveryId, delivery, (id, d) => delivery); + } + else + { + Send(deliveryId, delivery, now); + } + } + + /// + /// Call this method to confirm that message with has been sent + /// or to cancel redelivery attempts. + /// + /// True if delivery was confirmed first time, false for duplicate confirmations. + public bool ConfirmDelivery(long deliveryId) + { + Delivery delivery; + return _unconfirmed.TryRemove(deliveryId, out delivery); + } + + /// + /// Returns full state of the current delivery actor. Could be saved using + /// method. + /// During recovery a snapshot received in should be set with + /// . + /// + public AtLeastOnceDeliverySnapshot GetDeliverySnapshot() + { + UnconfirmedDelivery[] unconfirmedDeliveries = _unconfirmed + .Select(e => new UnconfirmedDelivery(e.Key, e.Value.Destination, e.Value.Message)) + .ToArray(); + + return new AtLeastOnceDeliverySnapshot(_deliverySequenceNr, unconfirmedDeliveries); + } + + /// + /// If snapshot from was saved, it will be received during recovery phase in a + /// message and should be set with this method. + /// + /// + public void SetDeliverySnapshot(AtLeastOnceDeliverySnapshot snapshot) + { + _deliverySequenceNr = snapshot.DeliveryId; + DateTime now = DateTime.UtcNow; + IEnumerable> unconfirmedDeliveries = snapshot.UnconfirmedDeliveries + .Select( + u => new KeyValuePair(u.DeliveryId, new Delivery(u.Destination, u.Message, now, 0))); + + _unconfirmed = new ConcurrentDictionary(unconfirmedDeliveries); + } + + public void Cancel() + { + _redeliverScheduleCancelable.Cancel(); + } + + + public void OnReplaySuccess() + { + RedeliverOverdue(); + } + + public bool AroundReceive(Receive receive, object message) + { + if (message is RedeliveryTick) + { + RedeliverOverdue(); + return true; + } + return false; + } + + private void Send(long deliveryId, Delivery delivery, DateTime timestamp) + { + ActorSelection destination = _context.ActorSelection(delivery.Destination); + destination.Tell(delivery.Message); + + var dcopy = new Delivery(delivery.Destination, delivery.Message, timestamp, delivery.Attempt + 1); + _unconfirmed.AddOrUpdate(deliveryId, dcopy, (id, d) => dcopy); + } + + private void RedeliverOverdue() + { + DateTime now = DateTime.UtcNow; + DateTime deadline = now - RedeliverInterval; + var warnings = new List(); + + foreach ( + var entry in _unconfirmed.Where(e => e.Value.Timestamp <= deadline).Take(RedeliveryBurstLimit).ToArray() + ) + { + long deliveryId = entry.Key; + Delivery unconfirmedDelivery = entry.Value; + + Send(deliveryId, unconfirmedDelivery, now); + + if (unconfirmedDelivery.Attempt == UnconfirmedDeliveryAttemptsToWarn) + { + warnings.Add(new UnconfirmedDelivery(deliveryId, unconfirmedDelivery.Destination, + unconfirmedDelivery.Message)); + } + } + + if (warnings.Count != 0) + { + _context.Self.Tell(new UnconfirmedWarning(warnings.ToArray())); + } + } + + private long NextDeliverySequenceNr() + { + return (++_deliverySequenceNr); + } + + private ICancelable ScheduleRedelivery() + { + var interval = new TimeSpan(RedeliverInterval.Ticks/2); + return _context.System.Scheduler.ScheduleTellRepeatedlyCancelable(interval, interval, _context.Self, + RedeliveryTick.Instance, _context.Self); + } + + #endregion + } +} \ No newline at end of file diff --git a/src/core/Akka.Persistence/Persistence.cs b/src/core/Akka.Persistence/Persistence.cs index 3070112a953..8eb7d639fc9 100644 --- a/src/core/Akka.Persistence/Persistence.cs +++ b/src/core/Akka.Persistence/Persistence.cs @@ -235,6 +235,15 @@ public ViewSettings(Config config) public AtLeastOnceDeliverySettings AtLeastOnceDelivery { get; set; } public class AtLeastOnceDeliverySettings { + public AtLeastOnceDeliverySettings(TimeSpan redeliverInterval, int redeliveryBurstLimit, + int unconfirmedAttemptsToWarn, int maxUnconfirmedMessages) + { + RedeliverInterval = redeliverInterval; + RedeliveryBurstLimit = redeliveryBurstLimit; + UnconfirmedAttemptsToWarn = unconfirmedAttemptsToWarn; + MaxUnconfirmedMessages = maxUnconfirmedMessages; + } + public AtLeastOnceDeliverySettings(Config config) { RedeliverInterval = config.GetTimeSpan("at-least-once-delivery.redeliver-interval"); @@ -243,10 +252,60 @@ public AtLeastOnceDeliverySettings(Config config) RedeliveryBurstLimit = config.GetInt("at-least-once-delivery.redelivery-burst-limit"); } + /// + /// Interval between redelivery attempts. + /// public TimeSpan RedeliverInterval { get; private set; } + + /// + /// Maximum number of unconfirmed messages, that this actor is allowed to hold in the memory. When this + /// number is exceed, will throw + /// + /// instead of accepting messages. + /// public int MaxUnconfirmedMessages { get; private set; } + + /// + /// After this number of delivery attempts a message will be sent to + /// . + /// The count is reset after restart. + /// public int UnconfirmedAttemptsToWarn { get; private set; } + /// + /// Maximum number of unconfirmed messages that will be sent at each redelivery burst. This is to help to + /// prevent overflowing amount of messages to be sent at once, for eg. when destination cannot be reached for a long + /// time. + /// public int RedeliveryBurstLimit { get; private set; } + + + public AtLeastOnceDeliverySettings WithRedeliverInterval(TimeSpan redeliverInterval) + { + return Copy(redeliverInterval); + } + + public AtLeastOnceDeliverySettings WithMaxUnconfirmedMessages(int maxUnconfirmedMessages) + { + return Copy(null, null, null, maxUnconfirmedMessages); + } + + public AtLeastOnceDeliverySettings WithRedeliveryBurstLimit(int redeliveryBurstLimit) + { + return Copy(null, redeliveryBurstLimit); + } + + public AtLeastOnceDeliverySettings WithUnconfirmedAttemptsToWarn(int unconfirmedAttemptsToWarn) + { + return Copy(null, null, unconfirmedAttemptsToWarn); + } + + private AtLeastOnceDeliverySettings Copy(TimeSpan? redeliverInterval = null, int? redeliveryBurstLimit = null, + int? unconfirmedAttemptsToWarn = null, int? maxUnconfirmedMessages = null) + { + return new AtLeastOnceDeliverySettings(redeliverInterval ?? RedeliverInterval, + redeliveryBurstLimit ?? RedeliveryBurstLimit, unconfirmedAttemptsToWarn ?? UnconfirmedAttemptsToWarn, + maxUnconfirmedMessages ?? MaxUnconfirmedMessages); + } } public InternalSettings Internal { get; private set; }