Skip to content

Commit

Permalink
Fix sharding recovery error and WithTransport serialization (#3744)
Browse files Browse the repository at this point in the history
* fixed typo in RemoteActorRefProvider comment

* Working on #3414 - bringing SerializeWithTransport API up to par with JVM

* added spec to help validate CurrentTransportInformation issues

Based on the equivalent JVM spec

* working on bringing serialization up to snuff

* brought serialization class up to snuff

* wrapping up RmeoteActorRefProvider implementation

* WIP

* cleaning up Serialization class

* looks like there's a Lazy<SerializationInfo> translation from Scala to C# that we haven't quite done

* fixed Serialization class

* fixed bug with Akka.Remote.Serialization.SerializationTransportInformationSpec

* forced a couple of specs using default akka.remote configs to run sequentially

This was done in order to avoid the two specs trying to bind on the same port at the same time.

* added serialization verification to the Akka.Persistence.TCK

* fixed issues with default Akka.Perisstence.TCK specs

* fixed IActorRef serialziation support in Akka.Persistence journals and snapshot stores

* fixed compilation issuyes

* fixed Akka.Sql.Common serialization in a backwards-compatible fashion

* had to disable serialization specs for Sql Journals

* Added API approvals

* updated creator and serialize-all-messages serialization

* added ITestOutputHelper to Akka.Cluster.Sharding.Tests.SupervisionSpec

* made changes to LocalSnapshotSerializer

* fixed bug in WithTransport method

* updated Akka.Remote MessageSerializer
  • Loading branch information
Aaronontheweb authored Jul 18, 2019
1 parent 9e20444 commit 8f9d19f
Show file tree
Hide file tree
Showing 36 changed files with 919 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Akka.Event;
using Akka.Pattern;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Sharding.Tests
{
Expand Down Expand Up @@ -93,7 +94,7 @@ protected override void OnReceive(object message)
private readonly ExtractShardId _extractShard = message =>
message is Msg msg ? (msg.Id % 2).ToString(CultureInfo.InvariantCulture) : null;

public SupervisionSpec() : base(GetConfig())
public SupervisionSpec(ITestOutputHelper output) : base(GetConfig(), output: output)
{ }

public static Config GetConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1099,8 +1099,7 @@ private async Task HandleWriteMessages(WriteMessages req, TCommand command)

foreach (var envelope in req.Messages)
{
var write = envelope as AtomicWrite;
if (write != null)
if (envelope is AtomicWrite write)
{
var writes = (IImmutableList<IPersistentRepresentation>)write.Payload;
foreach (var unadapted in writes)
Expand All @@ -1111,9 +1110,8 @@ private async Task HandleWriteMessages(WriteMessages req, TCommand command)
tagBuilder.Clear();

var persistent = AdaptToJournal(unadapted);
if (persistent.Payload is Tagged)
if (persistent.Payload is Tagged tagged)
{
var tagged = (Tagged)persistent.Payload;
if (tagged.Tags.Count != 0)
{
tagBuilder.Append(';');
Expand Down Expand Up @@ -1205,29 +1203,35 @@ protected virtual void WriteEvent(TCommand command, IPersistentRepresentation pe
var payloadType = persistent.Payload.GetType();
var serializer = _serialization.FindSerializerForType(payloadType, Setup.DefaultSerializer);

string manifest = "";
if (serializer is SerializerWithStringManifest)
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
Akka.Serialization.Serialization.WithTransport(_serialization.System, () =>
{
manifest = ((SerializerWithStringManifest)serializer).Manifest(persistent.Payload);
}
else
{
if (serializer.IncludeManifest)
string manifest = "";
if (serializer is SerializerWithStringManifest stringManifest)
{
manifest = persistent.Payload.GetType().TypeQualifiedName();
manifest = stringManifest.Manifest(persistent.Payload);
}
}
else
{
if (serializer.IncludeManifest)
{
manifest = persistent.Payload.GetType().TypeQualifiedName();
}
}
var binary = serializer.ToBinary(persistent.Payload);
var binary = serializer.ToBinary(persistent.Payload);
AddParameter(command, "@PersistenceId", DbType.String, persistent.PersistenceId);
AddParameter(command, "@SequenceNr", DbType.Int64, persistent.SequenceNr);
AddParameter(command, "@Timestamp", DbType.Int64, 0L);
AddParameter(command, "@IsDeleted", DbType.Boolean, false);
AddParameter(command, "@Manifest", DbType.String, manifest);
AddParameter(command, "@Payload", DbType.Binary, binary);
AddParameter(command, "@Tag", DbType.String, tags);
AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier);
AddParameter(command, "@PersistenceId", DbType.String, persistent.PersistenceId);
AddParameter(command, "@SequenceNr", DbType.Int64, persistent.SequenceNr);
AddParameter(command, "@Timestamp", DbType.Int64, 0L);
AddParameter(command, "@IsDeleted", DbType.Boolean, false);
AddParameter(command, "@Manifest", DbType.String, manifest);
AddParameter(command, "@Payload", DbType.Binary, binary);
AddParameter(command, "@Tag", DbType.String, tags);
AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier);
return manifest;
});
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,20 +690,26 @@ protected virtual void WriteEvent(DbCommand command, IPersistentRepresentation e
var payloadType = e.Payload.GetType();
var serializer = Serialization.FindSerializerForType(payloadType, Configuration.DefaultSerializer);

// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
string manifest = "";
if (serializer is SerializerWithStringManifest)
var binary = Akka.Serialization.Serialization.WithTransport(Serialization.System, () =>
{
manifest = ((SerializerWithStringManifest)serializer).Manifest(e.Payload);
}
else
{
if (serializer.IncludeManifest)
if (serializer is SerializerWithStringManifest stringManifest)
{
manifest = e.Payload.GetType().TypeQualifiedName();
manifest = stringManifest.Manifest(e.Payload);
}
else
{
if (serializer.IncludeManifest)
{
manifest = e.Payload.GetType().TypeQualifiedName();
}
}
}
var binary = serializer.ToBinary(e.Payload);
return serializer.ToBinary(e.Payload);
});


AddParameter(command, "@PersistenceId", DbType.String, e.PersistenceId);
AddParameter(command, "@SequenceNr", DbType.Int64, e.SequenceNr);
Expand Down Expand Up @@ -746,11 +752,13 @@ protected virtual IPersistentRepresentation ReadEvent(DbDataReader reader)
// Support old writes that did not set the serializer id
var type = Type.GetType(manifest, true);
var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
deserialized = deserializer.FromBinary((byte[])payload, type);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
deserialized = Akka.Serialization.Serialization.WithTransport(Serialization.System, () => deserializer.FromBinary((byte[])payload, type) );
}
else
{
var serializerId = reader.GetInt32(SerializerIdIndex);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
deserialized = Serialization.Deserialize((byte[])payload, serializerId, manifest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ protected virtual void SetPayloadParameter(object snapshot, DbCommand command)
{
var snapshotType = snapshot.GetType();
var serializer = Serialization.FindSerializerForType(snapshotType, Configuration.DefaultSerializer);

var binary = serializer.ToBinary(snapshot);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
var binary = Akka.Serialization.Serialization.WithTransport(Serialization.System, () => serializer.ToBinary(snapshot));
AddParameter(command, "@Payload", DbType.Binary, binary);
}

Expand Down Expand Up @@ -585,8 +585,9 @@ protected object GetSnapshot(DbDataReader reader)
if (reader.IsDBNull(5))
{
var type = Type.GetType(manifest, true);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
var serializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
obj = serializer.FromBinary(binary, type);
obj = Akka.Serialization.Serialization.WithTransport(Serialization.System, () => serializer.FromBinary(binary, type));
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Akka.Event;
using Akka.Persistence.Snapshot;
using Akka.Util;
using Akka.Util.Internal;

namespace Akka.Persistence.Sql.Common.Snapshot
{
Expand All @@ -39,12 +40,15 @@ private Initialized() { }

private readonly SnapshotStoreSettings _settings;

private readonly ExtendedActorSystem _actorSystem;

/// <summary>
/// Initializes a new instance of the <see cref="SqlSnapshotStore"/> class.
/// </summary>
/// <param name="config">The configuration used to configure the snapshot store.</param>
protected SqlSnapshotStore(Config config)
{
_actorSystem = Context.System.AsInstanceOf<ExtendedActorSystem>();
_settings = new SnapshotStoreSettings(config);
_pendingRequestsCancellation = new CancellationTokenSource();
}
Expand Down Expand Up @@ -224,7 +228,9 @@ private SnapshotEntry ToSnapshotEntry(SnapshotMetadata metadata, object snapshot
var snapshotType = snapshot.GetType();
var serializer = Context.System.Serialization.FindSerializerForType(snapshotType, _settings.DefaultSerializer);

var binary = serializer.ToBinary(snapshot);
var binary = Akka.Serialization.Serialization.WithTransport(_actorSystem,
() => serializer.ToBinary(snapshot));


return new SnapshotEntry(
persistenceId: metadata.PersistenceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<PropertyGroup>
<AssemblyTitle>Akka.Persistence.Sqlite.Tests</AssemblyTitle>
<TargetFrameworks>$(NetCoreTestVersion)</TargetFrameworks>
<RuntimeIdentifier Condition=" '$(TargetFramework)' == '$(NetFrameworkTestVersion)' And '$(OS)' == 'Windows_NT'">win7-x64</RuntimeIdentifier>

</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,8 @@ class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistenc
}
}");
}

// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
protected override bool SupportsSerialization => false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public SqliteJournalSpec(ITestOutputHelper output)
Initialize();
}

// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
protected override bool SupportsSerialization => false;

private static Config CreateSpecConfig(string connectionString)
{
return ConfigurationFactory.ParseString(@"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,7 @@ class = ""Akka.Persistence.Sqlite.Snapshot.SqliteSnapshotStore, Akka.Persistence
}
}");
}

protected override bool SupportsSerialization => true;
}
}
22 changes: 20 additions & 2 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Tools")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.MultiNodeTestRunner.Shared.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Persistence")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Persistence.TCK")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Remote")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Remote.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Remote.Tests.MultiNode")]
Expand Down Expand Up @@ -985,6 +986,8 @@ namespace Akka.Actor
Akka.Actor.LocalActorRef Guardian { get; }
Akka.Actor.IInternalActorRef RootGuardian { get; }
Akka.Actor.ActorPath RootPath { get; }
[Akka.Annotations.InternalApiAttribute()]
Akka.Serialization.Information SerializationInformation { get; }
Akka.Actor.Settings Settings { get; }
Akka.Actor.LocalActorRef SystemGuardian { get; }
Akka.Actor.IInternalActorRef TempContainer { get; }
Expand Down Expand Up @@ -1270,6 +1273,7 @@ namespace Akka.Actor
public Akka.Event.ILoggingAdapter Log { get; }
public Akka.Actor.IInternalActorRef RootGuardian { get; }
public Akka.Actor.ActorPath RootPath { get; }
public Akka.Serialization.Information SerializationInformation { get; }
public Akka.Actor.Settings Settings { get; }
public Akka.Actor.LocalActorRef SystemGuardian { get; }
public Akka.Actor.IInternalActorRef TempContainer { get; }
Expand Down Expand Up @@ -4541,6 +4545,16 @@ namespace Akka.Serialization
public override object FromBinary(byte[] bytes, System.Type type) { }
public override byte[] ToBinary(object obj) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class Information : System.IEquatable<Akka.Serialization.Information>
{
public Information(Akka.Actor.Address address, Akka.Actor.ActorSystem system) { }
public Akka.Actor.Address Address { get; }
public Akka.Actor.ActorSystem System { get; }
public bool Equals(Akka.Serialization.Information other) { }
public override bool Equals(object obj) { }
public override int GetHashCode() { }
}
public class NewtonSoftJsonSerializer : Akka.Serialization.Serializer
{
public NewtonSoftJsonSerializer(Akka.Actor.ExtendedActorSystem system) { }
Expand Down Expand Up @@ -4572,7 +4586,7 @@ namespace Akka.Serialization
public class Serialization
{
public Serialization(Akka.Actor.ExtendedActorSystem system) { }
public Akka.Actor.ActorSystem System { get; }
public Akka.Actor.ExtendedActorSystem System { get; }
public void AddSerializationMap(System.Type type, Akka.Serialization.Serializer serializer) { }
[System.ObsoleteAttribute("No longer supported. Use the AddSerializer(name, serializer) overload instead.", true)]
public void AddSerializer(Akka.Serialization.Serializer serializer) { }
Expand All @@ -4581,8 +4595,12 @@ namespace Akka.Serialization
public object Deserialize(byte[] bytes, int serializerId, string manifest) { }
public Akka.Serialization.Serializer FindSerializerFor(object obj, string defaultSerializerName = null) { }
public Akka.Serialization.Serializer FindSerializerForType(System.Type objectType, string defaultSerializerName = null) { }
public static Akka.Serialization.Information GetCurrentTransportInformation() { }
public byte[] Serialize(object o) { }
public static string SerializedActorPath(Akka.Actor.IActorRef actorRef) { }
public static T SerializeWithTransport<T>(Akka.Actor.ActorSystem system, Akka.Actor.Address address, System.Func<T> action) { }
[System.ObsoleteAttribute("Obsolete. Use the SerializeWithTransport<T>(ExtendedActorSystem) method instead.")]
public static T WithTransport<T>(Akka.Actor.ActorSystem system, Akka.Actor.Address address, System.Func<T> action) { }
public static T WithTransport<T>(Akka.Actor.ExtendedActorSystem system, System.Func<T> action) { }
}
public abstract class Serializer
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ namespace Akka.Remote
public Akka.Actor.IActorRef RemoteWatcher { get; }
public Akka.Actor.IInternalActorRef RootGuardian { get; }
public Akka.Actor.ActorPath RootPath { get; }
public Akka.Serialization.Information SerializationInformation { get; }
public Akka.Actor.Settings Settings { get; }
public Akka.Actor.LocalActorRef SystemGuardian { get; }
public Akka.Actor.IInternalActorRef TempContainer { get; }
Expand Down
2 changes: 2 additions & 0 deletions src/core/Akka.Persistence.TCK.Tests/LocalSnapshotStoreSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public LocalSnapshotStoreSpec(ITestOutputHelper output)
Initialize();
}

protected override bool SupportsSerialization => true;

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
Expand Down
2 changes: 2 additions & 0 deletions src/core/Akka.Persistence.TCK.Tests/MemoryJournalSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ public MemoryJournalSpec(ITestOutputHelper output)
}

protected override bool SupportsRejectingNonSerializableObjects { get { return false; } }

protected override bool SupportsSerialization => true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public MemorySnapshotStoreSpec(ITestOutputHelper output)
Initialize();
}

protected override bool SupportsSerialization => true;

[Fact]
public void MemorySnapshotStore_is_threadsafe()
{
Expand Down
Loading

0 comments on commit 8f9d19f

Please sign in to comment.