object / Akka.Persistence.Reminders

Long-running message scheduler on top of Akka.NET persistence
Apache License 2.0
34 stars 6 forks source link

Serialization payload #6

Open Zetanova opened 5 years ago

Zetanova commented 5 years ago

If there are multiple nodes with different roles and assemblies in the cluster. Currently Reminder can only accept a payload where the type is known to every node. Else reminder could not receive and/or deserialize the payload message.

But for the functionality of the reminder service it is not important to deserialize it. Only the receiving node should be able to deserialize the payload back.

Horusiath commented 5 years ago

@Zetanova underneat the Reminder is just an actor. Like any actor it communicates by sending messages. If any recipient actor is supposed to receive the message from reminder, the details of serialization/deserialization must be known to both sides (just like when two actors talk with each other).

If you want to send serializer-agnostic message, you can serialize/deserialize it explicitly like:

var message = new MyMessage();
var payload = system.Serialization.FindSerializerFor(message).ToBytes(message, typeof(MyMessage));

var task = new Reminder.Schedule(taskId, recipient.Path, payload, delay);
reminder.Tell(task);

// then in the recipient actor
class Recipient : ReceiveActor
{
    public Recipient()
    {
        var serializer = Context.System.Serialization.FindSerializerForType(typeof(MyMessage));
        Receive<byte[]>(raw =>{
            var messager = (MyMessage)serializer.FromBytes(raw);
        });
    }
}
Zetanova commented 5 years ago

I thought about some kind of surrogate type defined by akka.net Is there somekind of mechanismus to lazy deserialize a received massage?

Horusiath commented 5 years ago

In theory you could do some kind of wrapper message, that's internally uses ISurrogate/ISurrogated mechanism to do late serialization, but that's only shifting the problem - instead of knowing how to deserialize your destination message, now you need to know how to deserialize its wrapper.

Zetanova commented 5 years ago

It looks like a generic problem that should be resolved over the akka.net base libery The gateway pattern would profit from a lazy deserialized payload-message-type too.

I will investigate more.

Zetanova commented 5 years ago

I have made now one payload message type. But what is missing is an auto unwarp-feature in akka for dual-pass a message with somekind of interface-flag like "IPayload"

This could then be used for sender defined remote success/failure messages or just a muti hope/forward route or in Akka.Persistence.Reminders as Payload

Example of auto unwarp/dual-pass in ActorBase:

internal protected virtual bool AroundReceive(Receive receive, object message)
{
    var wasHandled = receive(message);

    //try to unwarp payload message
    if(!wasHandled && message is IPayload payload)
    {
        if(payload.TryDeserialize(var pmsg)) 
        {
             wasHandled = receive(pmsg);
        }
    }
    if(!wasHandled)
    {
         Unhandled(message);
    }
    return wasHandled;
}

Payload class

public sealed class Payload : ISurrogated, IPayload
    {
        sealed class Surrogate : ISurrogate
        {
            public readonly int SerializerId;

            public readonly string Manifest;

            public readonly byte[] Data;

            public Surrogate(byte[] data, int serializerId, string manifest)
            {
                SerializerId = serializerId;
                Manifest = manifest;
                Data = data;
            }

            ISurrogated ISurrogate.FromSurrogate(ActorSystem system)
            {
                return new Payload(this, system);
            }

            public object Deserialize(ActorSystem system)
            {
                return system.Serialization.Deserialize(Data, SerializerId, Manifest);
            }
        }

        Surrogate _surrogate;
        ActorSystem _system;
        object _content;

        public Object Content => _content ?? (_content = _surrogate?.Deserialize(_system));

        public Payload(Object content)
        {
            _content = content;
        }

        private Payload(Surrogate surrogate, ActorSystem system)
        {
            _surrogate = surrogate;
            _system = system;
        }

        ISurrogate ISurrogated.ToSurrogate(ActorSystem system)
        {
            if(_surrogate is null)
            {
                String manifest;
                Byte[] data;
                int serializerId;

                switch (system.Serialization.FindSerializerFor(Content))
                {
                    case SerializerWithStringManifest serial:
                        manifest = serial.Manifest(Content);
                        data = serial.ToBinary(Content);
                        serializerId = serial.Identifier;
                        break;
                    case Serializer serial when serial.IncludeManifest:
                        manifest = Content.GetType().TypeQualifiedName();
                        data = serial.ToBinary(Content);
                        serializerId = serial.Identifier;
                        break;
                    case var serial:
                        manifest = String.Empty;
                        data = serial.ToBinary(Content);
                        serializerId = serial.Identifier;
                        break;
                }

                _surrogate = new Surrogate(data, serializerId, manifest);
            }

            return _surrogate;
        }

        public bool TryDeserialize(out object value)
        {
            try
            {
                value = Content;
                return true;
            }
            catch (SerializationException _)
            {
                value = null;
                return false;
            }
        }
    }
Horusiath commented 5 years ago

@Zetanova I think, that the right place to include feature like that would be either in core akka projects or directly as serializer feature.

Zetanova commented 5 years ago

@Horusiath Yes, i thinking of the same, posted it know into the https://github.com/akkadotnet/akka.net/issues/3811