spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Dispatcher has no subscribers when value serializer is set for reactive producer in Spring boot 2.3.3 #973

Closed sabbyanandan closed 4 years ago

sabbyanandan commented 4 years ago

@rgonciarz commented on Thu Oct 08 2020

Describe the issue

Dispatcher has no subscribers when value serializer is set for reactive producer in Spring boot 2.3.3. It works fine with when the following setting is default and not set:

spring.cloud.stream.kafka.binder.producer-properties.value.serializer: org.apache.kafka.common.serialization.StringSerializer

In Spring Boot 2.3.4 I experience a different behavior. When I with or without value serializer NPE is thrown: Kafka consumer cannot be created, NPE is thrown when starting the app (KafkaMessageListenerContainer)

To Reproduce I created a small project that may help to reproduce the issue:

https://github.com/rgonciarz/reactive-topology-chain/tree/reactive-function-no-value-serialization - working version with no value serializer set

Steps to reproduce the behavior:

  1. Just set spring.cloud.stream.kafka.binder.producer-properties.value.serializer: org.apache.kafka.common.serialization.StringSerializer

or checkout the branch: https://github.com/rgonciarz/reactive-topology-chain/tree/reactive-function-with-producer-value-serialization - does not work when value serializer is set

Affected version: 2.3.3, 2.3.4

2020-10-08 18:49:57.965  INFO 7573 --- [container-0-C-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.topologyR-in-0' has 0 subscriber(s).
2020-10-08 18:50:00.972 ERROR 7573 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.topologyR-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=9349ef3f-bf14-47ed-ba62-5a07d9f9462f, headers={kafka_offset=125, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1fccd0d6, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=9349ef3f-bf14-47ed-ba62-5a07d9f9462f, kafka_receivedTopic=topic2, kafka_receivedTimestamp=1602175742577, contentType=application/json, kafka_groupId=topologyR}], failedMessage=GenericMessage [payload=9349ef3f-bf14-47ed-ba62-5a07d9f9462f, headers={kafka_offset=125, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1fccd0d6, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=9349ef3f-bf14-47ed-ba62-5a07d9f9462f, kafka_receivedTopic=topic2, kafka_receivedTimestamp=1602175742577, contentType=application/json, kafka_groupId=topologyR}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:75)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1880)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1862)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1799)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1739)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1636)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1366)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1082)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:990)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=9349ef3f-bf14-47ed-ba62-5a07d9f9462f, headers={kafka_offset=125, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1fccd0d6, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=9349ef3f-bf14-47ed-ba62-5a07d9f9462f, kafka_receivedTopic=topic2, kafka_receivedTimestamp=1602175742577, contentType=application/json, kafka_groupId=topologyR}]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 27 more
sobychacko commented 4 years ago

Hi @rgonciarz I ran your application and it starts fine standalone using java -jar build/libs/reactive-0.0.1-SNAPSHOT.jar --spring.cloud.stream.kafka.binder.producer-properties.value.serializer=org.apache.kafka.common.serialization.StringSerializer. Do you need to send any data to the Kafka topic to reproduce what you are seeing?

sobychacko commented 4 years ago

@rgonciarz We were able to get to the bottom of this issue. It turned out that there were some issues with the way native encoding works with reactive producers. It is now addressed through this commit: https://github.com/spring-cloud/spring-cloud-stream/commit/5b04295570f79d9bc9347c90f3412c2242febf5e The changes are backported to 3.0.x of Spring Cloud Stream (https://github.com/spring-cloud/spring-cloud-stream/commit/f22c8f6ace8608a46287bdc866acce3a68edf78d), so the next minor release (3.0.9) will have these fixes.

If you are trying it locally before the release, make sure that you have the following snapshot dependencies in your project:

<dependency>
       <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>3.0.9.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-function-context</artifactId>
    <version>3.0.11.BUILD-SNAPSHOT</version>
</dependency>
rgonciarz commented 4 years ago

Thank you

minhnguyenvan95 commented 3 years ago

you saved my day. TY