Farfetch / kafkaflow-retry-extensions

Kafka Flow Retry Patterns Extensions
https://farfetch.github.io/kafkaflow-retry-extensions/
MIT License
56 stars 7 forks source link

[Feature Request]: Add support to Avro messages serialization/deserialization in retry #133

Open sergioamribeiro opened 1 year ago

sergioamribeiro commented 1 year ago

Is your request related to a problem you have?

Yes. We have a consumer that is receiving an Avro message and when an exception happens and the message go to retry, then when the retry job tries to serialize the message it breaks with an error

{"EventSeverity":"Error","Timestamp":"2023-06-21T16:20:05.9589524Z","Thread":"35","ActId":"b5219cba-364d-4e93-be67-051641fd7583","Msg":{"Message":"[KafkaFlow]Error processing message","Data":{"Message":{"Key":"dGVzdC1jaGFyZ2ViYWNr","Value":"H4sIAAAAAAAECuxYTW/bOBD9K8ScJUGy/NHo5sjKboBiu9gke+iiB1oa20Qp0ktRDYzA/31BfStVCil1iwKbo01qODN8b94Mn+AuPmBKIXiCG4Y8ySD45wn+oClCAPgFhf4daYIKLKCc0QwzCETOuQXr/s8/ZQaBa8FGxnmKQlPNpKj3bnBHc67/pjzH+r8PKkHFxB6CmfVNL1LMMrrHh4fbzWu9gN+43FLOTyQX7N8cCUtQaLZjqMhOKqIPSKpjwIIJ3laJynQRiQX3dA/ByoKbnHNRJrFaO5+tJq+apZhpmh7HxeN9lVW4Zyl2vSYHejyiwGSa+9c0wxYAVTBcij1Uofi9UIqVswXv5Z7FlN+fjlh900Rkp4xzlkFvk0HXSxvPVr3ES6v10d7i2dnFmdDLIwoq9K0JegQ4ZwNpPCDRhZEOIqalsExfjzaGH3UUbi+IYqUDhDG4+WRBKHOhC55UWcyFIVd1R96sd0a5dj633/k1v8qvm7uIevS+O9LYkH5H1Q51fHBKQjCxd2KZplI4XzywIBLxVzvNOqoYnZ2ieeLEB6r2uKXxZ6eoIJnDhEYlKC9NDJaIDmO+5YHTd9pElI1xvIXZKw30cjzBw8FgizLxrgvlNmdj4TxQFW4TIndFWQibKyBakscDiw9EH1hGigshsVQKs6MUSUa0nAb4CoJjoNtBujTVPpQJvpqrhV4YC3WMhclfJDyaFhQdVYf84TqUsOyYa0xIacsiTBRXmaWUc5JKgZqqQsB0o1lxrhSK+GQRdPYOYeXla0VFRmMjwKTeQVhGNpJzasSOZeSRcU62aA6JUWhTsCfI3qBubE8ajZ0C3MseX8qlQeVIMGYpNfWyoyodwajXWwJP14kyo2GVrHEInI+6pU5+Bbm9+2DPZ95qWi5fxyd9Oo6k0mI4EGPAMIljskdFUGh1mub43SndSm6UDx7EZyEfjSJtcMs0WBAqTJiGTw2sIOjsqgTtJWF6X/hk2guw4HvV5mcK1hQhdHpRDgrZFHPQEuTChntMbpRvim/9UF9WxKuuIhZKtaF6JMyXwzBvdbWSvqS0+L3F7ldvktvub/VC91eiJDK8DxVSXcwOlyNbOSQ+u2somq9Cvii5qRpNcqRKC1Skc1fdquTAxRD4LNrLk+7iB/yo0C91MVUna/XeCoKnt4G+fuYYeiYZaN3fBnoze5az+NgJ6G2gL3vHSw30gyW7/PP/OJ1b0H15DGDhr6PI8zz7Krp27bl7PbfXvr+05+566buRF3phBBY0T2EQwMyd+ba7tGfevbcIFvPAWzoz1/9ottVvVwF4ruu6pn9rBfA2gQA211G4ulqF9tqPXHv+bjm3rxYr374J59HierHcbGZm1mhH6wCaIdkMOeVMauw3v5oZKIDo4S/jhmmyzQts22713fYD3w0Wi49w/g8AAP//AwAIogReLBYAAA=="},"Topic":"dev.commerce.fraud.chargeback.events.internal.v1.LedgerEntryCreated-retry","MessageKey":"dGVzdC1jaGFyZ2ViYWNr","ConsumerName":"f4e39865-ce91-4319-917b-5ce6d3ecee25"},"Exception":"Newtonsoft.Json.JsonSerializationException: Unable to find a constructor to use for type Avro.Field. A class should either have a default constructor, one constructor with arguments or a constructor marked with the JsonConstructor attribute. Path 'Schema.Fields[0].Name', line 1, position 29.\r\n at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateNewObject(JsonReader reader, JsonObjectContract objectContract, JsonProperty containerMember, JsonProperty containerProperty, String id, Boolean& createdFromNonDefaultCreator)\r\n at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObject(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)\r\n at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateValueInternal(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)\r\n at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.PopulateList(IList list, JsonReader reader, JsonArrayContract contract, JsonProperty containerProperty, String id)\r\n at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateList(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, Object existingValue, String id)\r\n at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateValueInternal(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)\r\n at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.SetPropertyValue(JsonProperty property, JsonConverter propertyConverter, JsonContainerContract containerContract, JsonProperty containerProperty, JsonReader reader, Object target)\r\n at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.PopulateObject(Object newObject, JsonReader reader, JsonObjectContract contract, JsonProperty member, String id)\r\n at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObject(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)\r\n at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateValueInternal(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)\r\n at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.SetPropertyValue(JsonProperty property, JsonConverter propertyConverter, JsonContainerContract containerContract, JsonProperty containerProperty, JsonReader reader, Object target)\r\n at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.PopulateObject(Object newObject, JsonReader reader, JsonObjectContract contract, JsonProperty member, String id)\r\n at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObject(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)\r\n at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateValueInternal(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)\r\n at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.Deserialize(JsonReader reader, Type objectType, Boolean checkAdditionalContent)\r\n at Newtonsoft.Json.JsonSerializer.DeserializeInternal(JsonReader reader, Type objectType)\r\n at Newtonsoft.Json.JsonConvert.DeserializeObject(String value, Type type, JsonSerializerSettings settings)\r\n at KafkaFlow.Retry.Durable.RetryDurableConsumerNewtonsoftJsonSerializerMiddleware.Invoke(IMessageContext context, MiddlewareDelegate next)\r\n at KafkaFlow.Retry.Durable.RetryDurableConsumerUtf8EncoderMiddleware.Invoke(IMessageContext context, MiddlewareDelegate next)\r\n at KafkaFlow.Retry.Durable.RetryDurableConsumerCompressorMiddleware.Invoke(IMessageContext context, MiddlewareDelegate next)\r\n at KafkaFlow.Consumers.ConsumerWorker.ProcessMessageAsync(ConsumeResult2 message, CancellationToken cancellationToken)"},"HostName":"FFPTW5CG035795M"}

Describe the solution you'd like

Add support for AvroSerialization (identical to Newtonsoft.Json.JsonSerialization) something identical to the code below image

Are you able to help bring it to life and contribute with a Pull Request?

Yes

Additional context

No response