spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
1.01k stars 614 forks source link

Spring Cloud Stream routing-to channel (using spring.cloud.stream.function.definition) gives ClassCastException #2719

Open KarthikRIyer opened 1 year ago

KarthikRIyer commented 1 year ago

Describe the issue Publishing messages using .setHeader("spring.cloud.stream.sendto.destination", "output binding name here") gives me the following exception:

Caused by: java.lang.ClassCastException: class com.example.airplane.FlightEvent cannot be cast to class [B (com.example.airplane.FlightEvent is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) ~[kafka-clients-3.3.2.jar:na]
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-3.3.2.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1005) ~[kafka-clients-3.3.2.jar:na]
    ... 59 common frames omitted

More details:

I have a functionRouter that routes messages to different functions based on header. These functions in turn process the message and return a result which I need to publish to another topic.

application.properties

spring.application.name=airplane
server.port=8095

spring.cloud.stream.bindings.planeEventProducer.destination=plane-events

spring.cloud.stream.bindings.flightEventProducer.destination=flight-events
spring.cloud.stream.bindings.landEventProducer.destination=land-events
spring.cloud.stream.bindings.arrivalEventProducer.destination=arrival-events

spring.cloud.stream.output-bindings=replayProducer;planeEventProducer;flightEventProducer;landEventProducer;arrivalEventProducer

spring.cloud.stream.bindings.functionRouter-in-0.destination=plane-events,flight-events,land-events,arrival-events
spring.cloud.stream.bindings.functionRouter-in-0.group=airplane
spring.cloud.stream.bindings.functionRouter-in-0.consumer.concurrency=8

spring.cloud.stream.function.definition=functionRouter;
spring.cloud.stream.function.routing.enabled=true
spring.cloud.function.routing-expression=headers['Type']
spring.cloud.stream.kafka.binder.autoCreateTopics=true
spring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.kafka.binder.replication-factor=1
spring.cloud.stream.kafka.binder.min-partition-count=8

AirplaneApplication.java

package com.example.airplane;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.function.Consumer;
import java.util.function.Function;

@SpringBootApplication
public class AirplaneApplication {

    public static void main(String[] args) {
        SpringApplication.run(AirplaneApplication.class, args);
    }

    Logger logger = LoggerFactory.getLogger(AirplaneApplication.class);

    @Bean("ArrivalEvent")
    public Consumer<Message<ArrivalEvent>> arrivalEventConsumer() {
        return msg -> {
            logger.info("ArrivalEvent consumed: {}", msg.getPayload());
        };
    }

    @Bean("LandEvent")
    public Function<Message<LandEvent>, Message<ArrivalEvent>> landEventProcessor() {
        return landEventMessage -> {
            ArrivalEvent arrivalEvent = new ArrivalEvent(landEventMessage.getPayload().getFlightId()+"-flight", landEventMessage.getPayload().getCurrentAirport());
            logger.info("Publishing ArrivalEvent: {}", arrivalEvent);
            return MessageBuilder.withPayload(arrivalEvent)
                    .setHeader("Type", ArrivalEvent.class.getSimpleName())
                    .setHeader("spring.cloud.stream.sendto.destination", "arrivalEventProducer")
                    .build();
        };
    }

    @Bean("FlightEvent")
    public Function<Message<FlightEvent>, Message<LandEvent>> flightEventProcessor() {
        return flightEventMessage -> {
            LandEvent landEvent = new LandEvent(flightEventMessage.getPayload().getFlightId()+"-flight", flightEventMessage.getPayload().getCurrentAirport());
            logger.info("Publishing LandEvent: {}", landEvent);
            return MessageBuilder.withPayload(landEvent)
                    .setHeader("Type", LandEvent.class.getSimpleName())
                    .setHeader("spring.cloud.stream.sendto.destination", "landEventProducer")
                    .build();
        };
    }

    @Bean("PlaneEvent")
    public Function<Message<PlaneEvent>, Message<FlightEvent>> planeEventProcessor() {
        return planeEventMessage -> {
            FlightEvent flightEvent = new FlightEvent(planeEventMessage.getPayload().getPlaneId()+"-flight", planeEventMessage.getPayload().getCurrentAirport());
            logger.info("Publishing Flight: {}", flightEvent);
            return MessageBuilder.withPayload(flightEvent)
                    .setHeader("Type", FlightEvent.class.getSimpleName())
                    .setHeader("spring.cloud.stream.sendto.destination", "flightEventProducer")
                    .build();
        };
    }

}

ApplicationController.java

@RestController
public class AirplaneController {
    @Autowired
    StreamBridge streamBridge;

    @GetMapping("publishPlane")
    public void publishPlane() {
        PlaneEvent planeEvent = new PlaneEvent(UUID.randomUUID().toString(), "CITY");
        Message<PlaneEvent> message = MessageBuilder.withPayload(planeEvent).setHeader("Type", PlaneEvent.class.getSimpleName())
                .build();
        streamBridge.send("planeEventProducer", message);
    }
}

On calling the GET endpoint (publish plane) I get a ClassCastException when planeEventProcessor tries to publish to the flightEventProducer binding:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2953) ~[spring-kafka-3.0.4.jar:3.0.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2894) ~[spring-kafka-3.0.4.jar:3.0.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2860) ~[spring-kafka-3.0.4.jar:3.0.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$56(KafkaMessageListenerContainer.java:2783) ~[spring-kafka-3.0.4.jar:3.0.4]
    at io.micrometer.observation.Observation.observe(Observation.java:559) ~[micrometer-observation-1.10.4.jar:1.10.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2781) ~[spring-kafka-3.0.4.jar:3.0.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2633) ~[spring-kafka-3.0.4.jar:3.0.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2519) ~[spring-kafka-3.0.4.jar:3.0.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2171) ~[spring-kafka-3.0.4.jar:3.0.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1526) ~[spring-kafka-3.0.4.jar:3.0.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1490) ~[spring-kafka-3.0.4.jar:3.0.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1365) ~[spring-kafka-3.0.4.jar:3.0.4]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.springframework.kafka.KafkaException: Failed to execute runnable
    at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:75) ~[spring-integration-kafka-6.0.3.jar:6.0.3]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:461) ~[spring-integration-kafka-6.0.3.jar:6.0.3]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:425) ~[spring-integration-kafka-6.0.3.jar:6.0.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2880) ~[spring-kafka-3.0.4.jar:3.0.4]
    ... 13 common frames omitted
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@24f4760]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1118) ~[spring-cloud-stream-4.0.1.jar:4.0.1]
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:327) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:297) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.doSendMessage(FunctionConfiguration.java:648) ~[spring-cloud-stream-4.0.1.jar:4.0.1]
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:637) ~[spring-cloud-stream-4.0.1.jar:4.0.1]
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:327) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:297) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.0.6.jar:6.0.6]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.0.6.jar:6.0.6]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.0.6.jar:6.0.6]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.0.6.jar:6.0.6]
    at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at io.micrometer.observation.Observation.observe(Observation.java:492) ~[micrometer-observation-1.10.4.jar:1.10.4]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:394) ~[spring-integration-kafka-6.0.3.jar:6.0.3]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.lambda$onMessage$0(KafkaMessageDrivenChannelAdapter.java:464) ~[spring-integration-kafka-6.0.3.jar:6.0.3]
    at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.lambda$doWithRetry$0(KafkaInboundEndpoint.java:70) ~[spring-integration-kafka-6.0.3.jar:6.0.3]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-2.0.0.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-2.0.0.jar:na]
    at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:66) ~[spring-integration-kafka-6.0.3.jar:6.0.3]
    ... 16 common frames omitted
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.example.airplane.FlightEvent to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1008) ~[kafka-clients-3.3.2.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:952) ~[kafka-clients-3.3.2.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1022) ~[spring-kafka-3.0.4.jar:3.0.4]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:729) ~[spring-kafka-3.0.4.jar:3.0.4]
    at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:700) ~[spring-kafka-3.0.4.jar:3.0.4]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:510) ~[spring-kafka-3.0.4.jar:3.0.4]
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:528) ~[spring-integration-kafka-6.0.3.jar:6.0.3]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136) ~[spring-integration-core-6.0.3.jar:6.0.3]
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) ~[spring-integration-core-6.0.3.jar:6.0.3]
    ... 51 common frames omitted
Caused by: java.lang.ClassCastException: class com.example.airplane.FlightEvent cannot be cast to class [B (com.example.airplane.FlightEvent is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) ~[kafka-clients-3.3.2.jar:na]
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-3.3.2.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1005) ~[kafka-clients-3.3.2.jar:na]
    ... 59 common frames omitted

Publishing messages using output-bindings and streamBridge works as expected, but using the same output-binding with .setHeader("spring.cloud.stream.sendto.destination", "output binding name here") doesn't.

But, instead of using one of the bindings specified in spring.cloud.stream.output-bindings when I try to use the function's output binding PlaneEvent-out-0, and add PlaneEvent to spring.cloud.stream.function.definition, it works as expected.

On debugging a little I can see that streamBridge sets the outputContentType correctly (application/json).

public boolean send(String bindingName, Object data) {
        BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(bindingName);
        MimeType contentType = StringUtils.hasText(bindingProperties.getContentType()) ? MimeType.valueOf(bindingProperties.getContentType()) : MimeTypeUtils.APPLICATION_JSON;
        return this.send(bindingName, data, contentType);
    }

But, this doesn't seem to be happening with spring.cloud.stream.sendto.destination for bindings which are not specified in spring.cloud.stream.function.definition. The correct outputContentType is present in the BindingPropeties, but it isn't getting used.

image

To Reproduce Steps to reproduce the behavior:

  1. Clone this repo: https://github.com/KarthikRIyer/spring-cloud-stream-function-routing
  2. Set the kafka brokers with this property: spring.cloud.stream.kafka.binder.brokers
  3. Run the app. Execute this GET endpoint localhost:8095/publishPlane . The sample app has swagger for convenience.
  4. See the error in the logs.

Version of the framework SpringBoot 3.0.4 Java 17 Spring Cloud 2022.0.1

Expected behavior Publishing messages with the spring.cloud.stream.function.definition header and value as an output-binding specified in the properties in spring.cloud.stream.output-bindings should take into consideration the output content type from BindingProperties.

Screenshots

When publishing with Stream Bridge (Debug point here):

SCStr-bytearray

When publishing with spring.cloud.stream.function.definition (Same debug point mentioned above):

SCStr-object

KarthikRIyer commented 1 year ago

Update:

The exception while using spring.cloud.stream.function.definition from a Function<> does not depend if I use a separate output-binding, or the output binding of the function. It works as long as the Function<>'s bean name is specified in spring.cloud.function.definition.

fleboulch commented 1 year ago

Many thanks @KarthikRIyer your last comment save me hours of debug

sobychacko commented 1 year ago

@KarthikRIyer Is the original issue still there? Or can we close this issue?

KarthikRIyer commented 1 year ago

Yeah, the original issue is still there.

olegz commented 1 year ago

I am confused. What does it have to do with routing function? I see you have a Rest Controller from which you are using StreamBridge to always send to planeEventProducer, which itself always routes to flightEventProducer etc, RouterFunction has nothing to do with spring.cloud.stream.sendto.destination

I am confused

olegz commented 1 year ago

But, instead of using one of the bindings specified in spring.cloud.stream.output-bindings when I try to use the function's output binding PlaneEvent-out-0, and add PlaneEvent to spring.cloud.stream.function.definition, it works as expected.

You have multiple functions. None of them will turn into a binding by default. We only auto-bind if user has a single function, so specifying all these functions in function.definition property is the correct and the expected behaviour

olegz commented 1 year ago

Also, you seem to using all three routing techniques s-c -stream has to offer. . . seems a bit too much

If you are using StreamBridge to effectively connect REST call with stream, why not determine right there where exactly do you want yo send the message and just do it with bridge.send(..). You would save tons of configuration and the app would be much more readable and performant.

If you still insist on using routing function, then make sure it has all the information to route message to proper function

So i really do no see a case where spring.cloud.stream.sendto.destination would make any sense in your application.

olegz commented 1 year ago

Keep in mind that spring.cloud.stream.sendto.destination will send the message to a destination. This means that an extra queue and an extra binding must be there. So it's a network call. And all that your are calling is in the same application. So you are leaving your app just to come back to it. Why?