Closed jorgebsa closed 4 years ago
Hi Olegz Did you get it working? I am unable to make StreamBridge work with Message<> argument in SpringBoot: 2.4.1 and SpringCloud: 2020.0.1 Is it correct when you define spring.cloud.stream.source: test then framework will create necessary function bean and its bindings so I dont need to create my Supplier<> bean. But when I did it at runtime it throws error saying the bean 'test' not found. Any clue?
We are in 3.1.3 version which is SpringCloud: 2020.0.3. Try that as the issue has been fixed on on Mar 23, 2020
Thanks I will do that. Please clarify few things for me. I am writing a producer-only microservice that:
This is implemented:
Contract contract = new Contract(); Message<Contract> msg = MessageBuilder.withPayload(contract) .setHeader(KafkaHeaders.MESSAGE_KEY, "1234") .build(); streamBridge.send("sendMessage-out-0", msg)
spring.cloud.stream.source: sendMessage spring.cloud.stream.bindings.sendMessage-out-0.destination: test-topic spring.cloud.stream.bindings.sendMessage-out-0.content-type: application/*+avro spring.cloud.stream.kafka.binder.producer-properties.key.serializer: org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.binder.producer-properties.value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url: xxxxx
My understanding is that this is all needed. The framework would create necessary Bean and bindings also, so I dont need to even do this
@Bean public Supplier<Flux<Message<Object>>> sendMessage(Message<Object> msg) { ... }
I this correct? I am getting error at boot time that "bean named 'sendMessage' could not be found" if I dont create Supplier<> bean. What you need to do to pass right headers? Any clue?Btw, if you dont provide "spring.cloud.stream.function.definition: sendMessage" and are using Swagger2 jars then the framework try to bind to output topic of Swagger because the jar has the Supplier<> bean in it. This is known issue.
I am not sure what version you are using, but streamBridge.send("blahblah", msg)
will always work regardless of blahblah
bean being there. That is to support dynamic destinations.
Anyway, if you believe you still have an issue please submit a small sample that reproduces it so we can take a look
Versions
Spring Boot
2.2.5
Spring CloudHoxton.SR3
Description
Basically, there isn't a simple way of sending a message with custom headers through
StreamBridge.send(..)
because it doesn't care if the data is of type message, even though the documentation leads a developer to believe that aMessage<?>
would have its content preserved. What happens is that the send method ignores all the headers of the data object, constructing a new GenericMessage instance that gets sent to the output binding. This message instance only has 3 headers: id, timestamp and contentType.This is better detailed in the
README.md
of the example project.Example code
https://github.com/jorgebsa/stream-bridge-header-error