spring-attic / hdfs

Apache License 2.0
5 stars 6 forks source link

Add support for KafkaAvroDeserializer #1

Open PedroAlvarado opened 7 years ago

PedroAlvarado commented 7 years ago

I'm using the confluent serializer and desializers. In the case of the hdfs-dataset sink, the deserializer returns a avro GenericData.Record instance for which the sink errors our with the exception below.

@sobychacko, any guidance on solving this issue will be great. I can commit to submit a pull request to make any necessary changes to fix this issue under your lead.

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.app.hdfs.dataset.sink.HdfsDatasetSinkConfiguration.datasetSinkMessageHandler.serviceActivator.handler]; nested exception is org.apache.avro.AvroRuntimeException: Not a Specific class: class org.apache.avro.generic.GenericData$Record
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:671) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:418) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.config.annotation.ServiceActivatorAnnotationPostProcessor$ReplyProducingMessageHandlerWrapper.handleRequestMessage(ServiceActivatorAnnotationPostProcessor.java:98) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$000(KafkaMessageDrivenChannelAdapter.java:54) ~[spring-integration-kafka-2.1.0.RELEASE.jar:na]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:288) ~[spring-integration-kafka-2.1.0.RELEASE.jar:na]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:279) ~[spring-integration-kafka-2.1.0.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:77) ~[spring-kafka-1.1.1.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.1.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:276) ~[spring-retry-1.1.4.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:172) ~[spring-retry-1.1.4.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.1.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:39) ~[spring-kafka-1.1.1.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:764) [spring-kafka-1.1.1.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:708) [spring-kafka-1.1.1.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2500(KafkaMessageListenerContainer.java:230) [spring-kafka-1.1.1.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:981) [spring-kafka-1.1.1.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_92]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_92]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: class org.apache.avro.generic.GenericData$Record
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217) ~[avro-1.8.1.jar:1.8.1]
    at org.springframework.data.hadoop.store.dataset.DatasetDefinition.getSchema(DatasetDefinition.java:150) ~[spring-data-hadoop-store-2.4.0.RELEASE.jar:2.4.0.RELEASE]
    at org.springframework.data.hadoop.store.dataset.DatasetUtils.getOrCreateDataset(DatasetUtils.java:77) ~[spring-data-hadoop-store-2.4.0.RELEASE.jar:2.4.0.RELEASE]
    at org.springframework.data.hadoop.store.dataset.ParquetDatasetStoreWriter.createWriter(ParquetDatasetStoreWriter.java:79) ~[spring-data-hadoop-store-2.4.0.RELEASE.jar:2.4.0.RELEASE]
    at org.springframework.data.hadoop.store.dataset.AbstractDatasetStoreWriter.write(AbstractDatasetStoreWriter.java:75) ~[spring-data-hadoop-store-2.4.0.RELEASE.jar:2.4.0.RELEASE]
    at org.springframework.data.hadoop.store.dataset.ParquetDatasetStoreWriter.write(ParquetDatasetStoreWriter.java:73) ~[spring-data-hadoop-store-2.4.0.RELEASE.jar:2.4.0.RELEASE]
    at org.springframework.data.hadoop.store.dataset.DatasetTemplate.write(DatasetTemplate.java:271) ~[spring-data-hadoop-store-2.4.0.RELEASE.jar:2.4.0.RELEASE]
    at org.springframework.cloud.stream.app.hdfs.dataset.sink.HdfsDatasetSinkConfiguration$1.handleMessage(HdfsDatasetSinkConfiguration.java:121) ~[spring-cloud-starter-stream-sink-hdfs-dataset-1.1.0.RC1.jar:1.1.0.RC1]
    at org.springframework.integration.config.annotation.ServiceActivatorAnnotationPostProcessor$ReplyProducingMessageHandlerWrapper.handleRequestMessage(ServiceActivatorAnnotationPostProcessor.java:98) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    ... 54 common frames omitted