spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
330 stars 301 forks source link

ContentType header value inconsistent between Kafka and Kafka Streams binders #456

Closed maddenj-ie closed 5 years ago

maddenj-ie commented 5 years ago

Hi, When using Avro for serialization, the spring-cloud-stream-binder-kafka and spring-cloud-stream-schema, the content type of the outbound message header reflects the avro type and version as documented e.g. application/vnd.user.v2+avro.

When using the spring-cloud-stream-binder-kafka-streams binder, the content type reflects the content type property for the output binding rather than the actual type resolved by the MessageConverter.

The conversion appears to be done here : KafkaStreamsMessageConversionDelegate

Perhaps a Processor could be added to update the header info using the Processor Context with the updated content type.

Spring Cloud : Greenwich.M1 Spring Cloud Stream : 2.1.0.M3 Spring Cloud Stream Binder Kafka Streams : 2.1.0.M3 Spring Cloud Stream Schema : 2.1.0.M3

Cheers.

olegz commented 5 years ago

I think the issue is with the following code:

if (!StringUtils.isEmpty(contentType)) {
    headers.put(MessageHeaders.CONTENT_TYPE, contentType);
}

It should also check if content type is not already set. In other words regardless how, if a Message has CT set it must not be overriden.

maddenj-ie commented 5 years ago

Yes, I can see how that would apply if the stream value object is an instance of Message<?>.

In the scenario I outlined previously, the stream value object is an instance of an avro class so there are no headers propagated (*ignoring kafka headers here since the metadata is not available to ValueMapper). The header instance is created to include the content type from the output binding configuration to allow the appropriate message converter( avro schema registry in this case) from the composite converter to do the conversion. The message converter will also update the content type header to reflect the schema info (type + version etc.) from the avro schema registry. Ultimately, the updated content type header information is discarded as the ValueMapper only returns the payload of the converted Message<> returned by the message converter.

sobychacko commented 5 years ago

@maddenj-ie See this comment on this PR: https://github.com/spring-cloud/spring-cloud-stream-samples/pull/83#issuecomment-409408629 We should try adding header support and that way we can solve this issue.

maddenj-ie commented 5 years ago

@sobychacko thanks for the background info. header support would be useful. For the same app I also needed to specify a reply channel header to take advantage of dynamic destination resolution from the downstream consumers perspective. There doesn't appear to be any direct support for this in kafka streams yet so I used the processor api and updated the header via the processor context. I used the process method of the kstream which I think is a terminal operation. Is this approach or other possible in the mean time until kafka streams has the additional support?

sobychacko commented 5 years ago

@maddenj-ie Did the approach you mentioned there work for your use case?

maddenj-ie commented 5 years ago

@sobychacko Yes, I was able to add the new header. I used something like the following.

new Processor<String, String>() {

                    private ProcessorContext context;

                    @Override
                    public void init(ProcessorContext context) {
                        this.context = context;
                    }

                    @Override
                    public void process(String key, String value) {
                        context.headers().add(MessageHeaders.REPLY_CHANNEL,
                                "topicName");
                    }

                    @Override
                    public void close() {
                    }
                };

My approach was informed by this question on stack overflow. https://stackoverflow.com/questions/46736484/is-it-possible-to-access-message-headers-with-kafka-streams

Full disclosure: I am new to the Kafka Streams library so there may be drawbacks to this approach that I have not encountered yet. I'd like to understand if this processing marks the stream for repartitioning for instance.

sobychacko commented 5 years ago

@maddenj-ie We are trying to add the header support right now so that the content type set by the source is carried for deserialization in the Kafka streams binder (when native decoding is disabled which is the default in Kafka streams binder). For reply channel scenario for dynamic destinations you mentioned above, I think its better for you to continue using the approach you mentioned - i.e provide a custom processor in the application that adds the header. I don't think that it will be marked for re-partition unless you are doing some re-keying operations (please check with the Kafka Streams folks on Stack Overflow for clarification on that).

maddenj-ie commented 5 years ago

Hi, Apologies for posting on a closed issue and the delay in my feedback. It's been a while since I looked at this but the changes don't appear to address the original concerns of the ticket. I can see how the work done benefits the deserialisation of inbound messages and the consequence processing in a Kafka Stream Topology but it does not address the fact that the content type header info is not propagated after serialisation of an outbound message. Perhaps a new issue is required to at least track it if not possible currently? Thanks.

codependent commented 5 years ago

Hi, I've come across this issue while strugging with a similar problem. It is thoroughly described on Stackoverflow: https://stackoverflow.com/questions/55796503/spring-cloud-stream-kafka-application-not-generating-messages-with-the-correct-a

I'm getting a KStream<Int, TypeA> and returning in the output binding a KStream<Int, TypeB>, however TypeA's Avro content-type is used instead of TypeB's

@sobychacko @olegz I'd really appreciate if you could clear up how we can achieve it.

sobychacko commented 5 years ago

@maddenj-ie Sorry for letting this issue fall through the cracks. @codependent made me look at this again with the issues described on the SO thread referenced above. It is indeed an issue on the binder side which we will need to fix. I am re-opening this issue and use this to address the changes required.

sobychacko commented 5 years ago

@maddenj-ie @codependent PR issued for addressing the issue: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/648

sobychacko commented 5 years ago

@codependent The repo you shared for troubleshooting this issue contains some great stuff. Do you want to extract the spring bits and contribute as a sample to the samples repository?

maddenj-ie commented 5 years ago

Hi @sobychacko. Would it be worth considering the propagation of all headers so any headers added as part of the Kafka Stream Topology would also be available in the outbound message?

sobychacko commented 5 years ago

@maddenj-ie All the other headers should already be there from the context. The headers modified by the message serialization process were the ones getting dropped out.

maddenj-ie commented 5 years ago

@sobychacko Thanks for the correction.

codependent commented 5 years ago

@codependent The repo you shared for troubleshooting this issue contains some great stuff. Do you want to extract the spring bits and contribute as a sample to the samples repository?

@sobychacko absolutely! Could you give me some guidelines?

sobychacko commented 5 years ago

@codependent Yes, that's the right repo. For consistency reasons, if you can migrate that to a maven project, that will be great. Kotlin is fine and we can use that as a reference sample for Kotlin apps. I was thinking to make it an end-to-end sample with all 3 or your micro-services (customers, order and shipping) and for that, you might want o to create another root folder at the same level as the current kafka-streams-samples. However, the actual landing spot for this is very flexible. Thank you!