akkadotnet / Akka.Streams.Kafka

Kafka Akka.Streams connectors - part of the Alpakka project
https://getakka.net/
Apache License 2.0
67 stars 22 forks source link

Failing to serialize/deserialize payload with serialize-messages = on #293

Open bezchristo opened 2 years ago

bezchristo commented 2 years ago

Version Information Akka 1.4.38 Akka.Serialization.Hyperion 1.4.38 Akka.Streams.Kafka 1.2.1

Describe the bug When setting serialize-messages=on in the HOCON file while using Hyperion I get an exception complaining about an ambiguous match that was found.

Failed to serialize and deserialize payload object Swallowing exception during message send
Cause: System.Runtime.Serialization.SerializationException: Failed to serialize and deserialize payload object [Akka.Streams.Kafka.Stages.Consumers.Actors.KafkaConsumerActor`2+Internal+Poll`2[Confluent.Kafka.Ignore,System.Byte[],Confluent.Kafka.Ignore,System.Byte[]]]. Envelope: [<Akka.Streams.Kafka.Stages.Consumers.Actors.KafkaConsumerActor`2+Internal+Poll`2[Confluent.Kafka.Ignore,System.Byte[],Confluent.Kafka.Ignore,System.Byte[]]> from [akka://ticker/system/kafka-consumer-2#430199385]], Actor type: [Akka.Streams.Kafka.Stages.Consumers.Actors.KafkaConsumerActor`2[Confluent.Kafka.Ignore,System.Byte[]]]
 ---> Hyperion.ValueSerializers.UnsupportedTypeException: Ambiguous match found.
   at Hyperion.ValueSerializers.UnsupportedTypeSerializer.WriteManifest(Stream stream, SerializerSession session)
   at Hyperion.Extensions.StreamEx.WriteObject(Stream stream, Object value, Type valueType, ValueSerializer valueSerializer, Boolean preserveObjectReferences, SerializerSession session)

If I don't use Hyperion but still leave serialize-messages = on enabled so that it uses Json serialization I get the following exception:

System.Runtime.Serialization.SerializationException: Failed to serialize and deserialize payload object [Akka.Streams.Kafka.Stages.Consumers.Actors.KafkaConsumerActor`2+Internal+Poll`2[Confluent.Kafka.Ignore,System.Byte[],Confluent.Kafka.Ignore,System.Byte[]]]. Envelope: [<Akka.Streams.Kafka.Stages.Consumers.Actors.KafkaConsumerActor`2+Internal+Poll`2[Confluent.Kafka.Ignore,System.Byte[],Confluent.Kafka.Ignore,System.Byte[]]> from [ticker://book/system/kafka-consumer-3#490851764]], Actor type: [Akka.Streams.Kafka.Stages.Consumers.Actors.KafkaConsumerActor`2[Confluent.Kafka.Ignore,System.Byte[]]]
 ---> Akka.Actor.ActorInitializationException: Do not create actors using 'new', always create them using an ActorContext/System

As soon as I remove serialize-messages = on the exceptions disappear.

To Reproduce Steps to reproduce the behavior:

  1. Add the following configuration to your HOCON file.

    actor {
     serialize-messages = on  
    
     serializers {
      hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
    }
    serialization-bindings {
      "System.Object" = hyperion
    }
    }
  2. Make sure that you use Akka.Streams.Kafka to consume from a Kafka topic.
  3. Run your application.

Expected behavior Serialization / Deserialization should work fine for Akka.Streams.Kafka using Hyperion or Json.

Actual behavior Getting serialization/deserialization exceptions when enabling local serialization via serialize-messages = on

Environment .Net 6 Windows

Aaronontheweb commented 1 year ago

Thanks for reporting this - I think we need to tag this plugins internal messages with INoSerializationVerificationNeeded so they get skipped during serialize-all-messages=on.