spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

A lot of logs are produced due to metrics #1107

Closed rodion-fisenko closed 3 years ago

rodion-fisenko commented 3 years ago

Hi!

I found an issue due to which produces a lot of redundant logs like this: ConsumerCoordinator: [Consumer clientId=consumer-progress-pass-4, groupId=progress-pass] Found no committed offset for partition bingoblitz_app_progress_pass_steps_passed-1

After some investigation, I found that the issue can be reproduced in the following conditions:

  1. Use just initialised partition which has not received any messages yet. It this case the partition doesn't have committed offset;
  2. Offset auto commit should be disabled (value of the property spring.cloud.stream.kafka.binder.consumer-properties.enable.auto.commit should be false);
  3. Micrometer metrics are enabled (the bean io.micrometer.core.instrument.MeterRegistry exists in application context).

In this case the logs like described above will be produced every 60 seconds for each partition with no committed offset.

It happens because after merging of this PR (https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/965) some metrics collect every 60 seconds. As I see in the code the class KafkaBinderMetrics create new consumer to calculate topic group lag. Offsets for this consumer cannot be auto reset to proper values (0 in this case) like it happens to usual consumers.

Like a workaround to fix this issue we can use one of this options:

  1. Disable logs with level below ERROR for org.apache.kafka.clients.consumer.internals.ConsumerCoordinator logger;
    1. Set the following property spring.cloud.stream.kafka.binder.consumer-properties.enable.auto.commit: true to enable auto commit. But this option is not preferable, as it can lead to problems.

Do you have a plan somehow to rework KafkaBinderMetrics to fix the issue?

UPD I found more suitable solution to avoid this issue.

If we set the value of the property ContainerProperties.assignmentCommitOption to ALWAYS then uncommitted offsets be automatically committed during first partition assignment.

To do it we should provide the following bean:

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
    return (container, dest, group) ->
        container.getContainerProperties()
            .setAssignmentCommitOption(ContainerProperties.AssignmentCommitOption.ALWAYS);
}
garyrussell commented 3 years ago

Why not change the log level of o.a.k.c.c.internals.ConsumerCoordinator to WARN?

sobychacko commented 3 years ago

@rodion-fisenko Please follow the suggestion from @garyrussell above by changing the log level in ConsumerCoordinator to WARN. For more information on this reasoning see this. See also this commit, where a single call to committed() is now used instead of invoking it per partition. Closing this issue.

st-h commented 2 years ago

We have been trying out version 3.1.4 in order to get rid of this issue, however we still see the same amount of logs being produced. I also noticed that there seems to be an issue where micrometer does not provide up to date metrics when deploying our app with a recent spring-cloud-stream-binder-kafka dependency. Strangely this can be fixed by deploying a second instance to Kubernetes - then those missing metrics suddenly appear all at once.