spring-cloud / spring-cloud-stream-samples

Samples for Spring Cloud Stream
Apache License 2.0
956 stars 676 forks source link

How do we override default adminConfig, ProducerConfig and ConsumerConfigs. #246

Open voodemsanthosh opened 1 year ago

voodemsanthosh commented 1 year ago

We are using spring cloud stream kafka binder,

              <dependency>
                   <groupId>org.springframework.cloud</groupId>
                   <artifactId>spring-cloud-stream-binder-kafka</artifactId>
             </dependency>

We are trying to override detault consumer properties mostly for below properties.

spring: cloud: stream: kafka:

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_problem_statement

    bindings:
      process-in-0:
        consumer:
          autoCommitOnError: true
          ackEachRecord: true
          max.poll.records: 100
          max.poll.interval.ms: 600000
          configuration:
            auto.commit.interval.ms: 60000

these are not ever showing under consumer configs level

{ "date_time":"2023-08-20 19:21:20,013", "thread":"[main]", "log_level":"INFO ", "class_name":"ConsumerConfig", "globalId":"", {}, "log_message":ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 60000 auto.include.jmx.reporter = true auto.offset.reset = earliest bootstrap.servers = [localhost:8080] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = test-input-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = test-input group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 600000 max.poll.records = 100 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.connect.timeout.ms = null sasl.login.read.timeout.ms = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.login.retry.backoff.max.ms = 10000 sasl.login.retry.backoff.ms = 100 sasl.mechanism = GSSAPI sasl.oauthbearer.clock.skew.seconds = 30 sasl.oauthbearer.expected.audience = null sasl.oauthbearer.expected.issuer = null sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000 sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000 sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 sasl.oauthbearer.jwks.endpoint.url = null sasl.oauthbearer.scope.claim.name = scope sasl.oauthbearer.sub.claim.name = sub sasl.oauthbearer.token.endpoint.url = null security.protocol = SSL security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 60000 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = [hidden] ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = security/keystore.jks ssl.keystore.password = [hidden] ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = security/truststore.jks ssl.truststore.password = [hidden] ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer }

Please provider right documents to set or override default consumer configuration.

My problem statement:

Application designed as functional approach and we are trying to consume and produce message to kafka topic,

@Bean
public Function<Message<List<String>>, Message<String>> process() {
  return request -> {
   //logic here
  } 
}

my consumer message is large and processing take more time(1000000 ms) and it is retrying continuously.

Please help us how to resolve this offset commit for larger message and process take more time ?