spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Cannot disable nativeEncoding/nativeDecoding in general for Kafka Streams binder #1182

Closed ferblaca closed 2 years ago

ferblaca commented 2 years ago

I want to generally disable the nativeEncoding and nativeDecoding properties for the kstream binder. I have followed the documentation and tried setting it at binder, consumer and producer level with no results:

spring:
  cloud:
    stream:
      binders:
        kafka-stream:
          type: kstream
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    streams:
                      default:
                        producer:
                          useNativeEncoding: false
                        consumer:
                          useNativeDecoding: false
                      binder:
                        brokers: localhost:9092
                        producerProperties:
                          useNativeEncoding: false
                        consumerProperties:
                          useNativeDecoding: false

and in any case, the serialization/deserialization does not go through the CompositeMessageConverter class.

Only in the case where such a configuration is set at the binding level, then it has the expected behavior and the serialization/deserialization occurs in the converters:

      bindings:
        kafkaStreamCount-out-0:
          group: kstream-consumer-group
          destination: words-sink
          binder: kafka-stream
          producer:
            useNativeEncoding: false

Is it possible to configure it in general for the whole binder?

thank you very much in advance.

Spring-Boot v2.6.2 Spring-Cloud v2021.0.0

In this repository is a demo where the problem is reproduced. As the app is the serialization/deserialization does not occur in the converters. To make it go through the converters uncomment the producer.useNativeEncoding setting of the kafkaStreamCount-out-0 binding.

sobychacko commented 2 years ago

@ferblaca Apologies for the delay in responding. useNativeDecoding and useNativeEncoding are core SCSt level properties. You cannot use them at the binder level such as spring.cloud.stream.kafka.streams.default.producer.useNativeEncoding. When you have a multi-binder scenario and environment specific properties, currently, the Kafka Streams binder only considers connection properties such as the broker connection details (spring.cloud.stream.kafka.streams.binder.brokers). Additionally, consumerProperties and producerProperties specified at the binder level are only for Kafka-specific consumer/producer properties, not SCSt properties.

With that said, I checked out your sample app and the following configuration works for me:

spring:
  cloud:
    stream:
      default:
        producer:
          useNativeEncoding: false
        consumer:
          useNativeDecoding: false
      binders:
        kafka1:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    streams:
                      binder:
                        brokers: localhost:9092
        kafka-stream:
          type: kstream
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    streams:
                      binder:
                        brokers: localhost:9092
      function:
        definition: nameSupplier;kafkaStreamCount
      kafka:
        streams:
          binder:
            functions:
              kafkaStreamCount:
                applicationId: app-id-1
                configuration:
                  client.id: client-id-kafkaStreamCount
      bindings:
        nameSupplier-out-0:
          group: kstream-consumer-group
          destination: words-source
          contentType: text/plain
          binder: kafka1
        kafkaStreamCount-in-0:
          group: kstream-consumer-group
          destination: words-source
          contentType: text/plain
          binder: kafka-stream
        kafkaStreamCount-out-0:
          group: kstream-consumer-group
          destination: words-sink
          binder: kafka-stream
#          producer:
#            useNativeEncoding: false

Note that I specify, useNativeEncoding and useNativeDecoding on spring.cloud.stream.default.producer and spring.cloud.stream.default.consumer. The end result is that for the whole application, we use SCSt message converter-based conversion (which is the default in message channel Kafka binder and by setting them explicitly to false, change it in Kafka Streams binder as well).

ferblaca commented 2 years ago

Thank you very much @sobychacko for the clarification! it is quite confusing...

However, is it in the roadmap to be able to configure at the kstream binder level properties that are core, as is the case of "nativeEnconding/nativeDecoding", without also having to modify the behavior of the kafka binder?

sobychacko commented 2 years ago

Well, all the binding level core properties are still applied on the binding, as the commented out properties in your case. For e.g. spring.cloud.stream.bindings.kafkaStreamCount-out-0.producer.useNativeEncoding. That way, it only affect that individual binding, nothing else. But I see your point. You want to apply useNativeEncoding on all producer bindings from Kafka Streams. We will certainly use this issue to brainstorm ideas for how we can enable that.

sobychacko commented 2 years ago

Issue moved to spring-cloud/spring-cloud-stream #2321 via ZenHub