Azure / azure-functions-kafka-extension

Kafka extension for Azure Functions
MIT License
113 stars 78 forks source link

Key deserializer was not specified - ISpecificRecord Key #439

Open danielbojczuk opened 1 year ago

danielbojczuk commented 1 year ago

Hi everyone!

My team is migrating one on-premises application to Azure, and we decided to use AzureFunctions with KafkaTrigger.

To have the necessary objects in place, we are using the trigger like this: public void Run( [KafkaTrigger(...)] KafkaEventData<Application,Event>[] events, ILogger log) Where both Event and Application implement the ISpecificRecord interface.

The problem is when the key implements the ISpecificRecord interface; this exception is thrown:

System.InvalidOperationException
  HResult=0x80131509
  Message=Key deserializer was not specified and there is no default deserializer defined for type Application.
  Source=Confluent.Kafka
  StackTrace:
   at Confluent.Kafka.Consumer`2..ctor(ConsumerBuilder`2 builder)
   at Confluent.Kafka.ConsumerBuilder`2.Build()
   at Microsoft.Azure.WebJobs.Extensions.Kafka.KafkaListener`2.CreateConsumer() in C:\Users\Daniel Bojczuk\source\repos\azure-functions-kafka-extension\src\Microsoft.Azure.WebJobs.Extensions.Kafka\Listeners\KafkaListener.cs:line 112
   at Microsoft.Azure.WebJobs.Extensions.Kafka.KafkaListener`2.<.ctor>b__23_0() in C:\Users\Daniel Bojczuk\source\repos\azure-functions-kafka-extension\src\Microsoft.Azure.WebJobs.Extensions.Kafka\Listeners\KafkaListener.cs:line 79
   at System.Lazy`1.ViaFactory(LazyThreadSafetyMode mode)
   at System.Lazy`1.ExecutionAndPublication(LazyHelper executionAndPublication, Boolean useDefaultConstructor)
   at System.Lazy`1.CreateValue()
   at System.Lazy`1.get_Value()
   at Microsoft.Azure.WebJobs.Extensions.Kafka.KafkaListener`2.CreateTopicScaler() in C:\Users\Daniel Bojczuk\source\repos\azure-functions-kafka-extension\src\Microsoft.Azure.WebJobs.Extensions.Kafka\Listeners\KafkaListener.cs:line 117
   at Microsoft.Azure.WebJobs.Extensions.Kafka.KafkaListener`2.<.ctor>b__23_1() in C:\Users\Daniel Bojczuk\source\repos\azure-functions-kafka-extension\src\Microsoft.Azure.WebJobs.Extensions.Kafka\Listeners\KafkaListener.cs:line 80
   at System.Lazy`1.ViaFactory(LazyThreadSafetyMode mode)
   at System.Lazy`1.ExecutionAndPublication(LazyHelper executionAndPublication, Boolean useDefaultConstructor)
   at System.Lazy`1.CreateValue()
   at System.Lazy`1.get_Value()
   at Microsoft.Azure.WebJobs.Extensions.Kafka.KafkaListener`2.GetMonitor() in C:\Users\Daniel Bojczuk\source\repos\azure-functions-kafka-extension\src\Microsoft.Azure.WebJobs.Extensions.Kafka\Listeners\KafkaListener.cs:line 403
   at Microsoft.Azure.WebJobs.Host.Listeners.HostListenerFactory.RegisterScaleMonitor(IListener listener, IScaleMonitorManager monitorManager)
   at Microsoft.Azure.WebJobs.Host.Listeners.HostListenerFactory.<CreateAsync>d__14.MoveNext()
danielbojczuk commented 1 year ago

I add a PR with an implementation suggestion.