openzipkin / brave

Java distributed tracing implementation compatible with Zipkin backend services.
Apache License 2.0
2.36k stars 713 forks source link

TracingConsumer (Kafka) may blocks the startup of Kafka streams during the poll #1334

Closed frosiere closed 2 years ago

frosiere commented 2 years ago

The TracingConsumer is currently a wrapper around a KafkaConsumer. This consumer provides the following implementation for the poll methods

public ConsumerRecords<K, V> poll(Duration timeout) {
  return poll(timeout.toMillis());
}

public ConsumerRecords<K, V> poll(long timeout) {
   ConsumerRecords<K, V> records = delegate.poll(timeout);
  // instrumentation of the poll method
}

In Kafka, both methods call an internal method as follow

// internal method using the includeMetadataInTimeout flag
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
...   

public ConsumerRecords<K, V> poll(long timeoutMs) {
  return poll(time.timer(timeoutMs), false);
}

public ConsumerRecords<K, V> poll(Duration timeout) {
  return poll(time.timer(timeout), true);
}

In some situations (unexpected restart of our applications? not 100% sure about the condition to reproduce), this may block the startup of Kafka Streams.

The issue has been reproduced with Spring Boot 2.6.7, Spring Kafka 2.8.5, Kafka 3.0.1 and Brave 5.13.7 (version 5.13.9 is still using the same code for the poll).

See the dump in comment.

To avoid the issue, wouldn't it make sense to review the TracingConsumer to take into account the includeMetadataInTimeout flag? Any reason to not have considered this flag?

Any feedback, comment, help are more than welcome.

Thanks a lot for the nice tracing support provided by Brave.

frosiere commented 2 years ago
"my-service-5e17e9a6-4f23-4a13-9a42-fc115cef024f-GlobalStreamThread" #32 prio=5 os_prio=0 cpu=478.07ms elapsed=100.23s tid=0x00007f917a146000 nid=0x26 runnable  [0x00007f910ebfb000]
   java.lang.Thread.State: RUNNABLE
                at sun.nio.ch.EPoll.wait(java.base@11.0.13/Native Method)
                at sun.nio.ch.EPollSelectorImpl.doSelect(java.base@11.0.13/Unknown Source)
                at sun.nio.ch.SelectorImpl.lockAndDoSelect(java.base@11.0.13/Unknown Source)
                - locked <0x000000008adb1b98> (a sun.nio.ch.Util$2)
                - locked <0x000000008adb1a48> (a sun.nio.ch.EPollSelectorImpl)
                at sun.nio.ch.SelectorImpl.select(java.base@11.0.13/Unknown Source)
                at org.apache.kafka.common.network.Selector.select(Selector.java:873)
                at org.apache.kafka.common.network.Selector.poll(Selector.java:465)
                at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:551)
                at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
                at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
                at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297)
                at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
                at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1166)
                at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:96)
                at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:90)
                at org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.restoreState(GlobalStateManagerImpl.java:271)
                at org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.registerStore(GlobalStateManagerImpl.java:203)
                at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:119)
                at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:254)
                at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
                at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$1(MeteredKeyValueStore.java:126)
                at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$$Lambda$1546/0x0000000100da3840.run(Unknown Source)
                at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:769)
                at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:126)
                at org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.initialize(GlobalStateManagerImpl.java:137)
                at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.initialize(GlobalStateUpdateTask.java:68)
                at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.initialize(GlobalStreamThread.java:251)
                at org.apache.kafka.streams.processor.internals.GlobalStreamThread.initialize(GlobalStreamThread.java:394)
                at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:286)
frosiere commented 2 years ago

The issue could easily be fixed by the following rework

    public ConsumerRecords<K, V> poll(Duration timeout) {
        return poll(delegate.poll(timeout));
    }

    public ConsumerRecords<K, V> poll(long timeout) {
        return poll(delegate.poll(timeout));
    }

    private ConsumerRecords<K, V> poll(ConsumerRecords<K, V> records) {
        // current implementation
jcchavezs commented 2 years ago

@jeqo could you please have a look?

On Wed, 22 Jun 2022, 23:22 François Rosière, @.***> wrote:

The issue could easily be fixed by the following rework

public ConsumerRecords<K, V> poll(Duration timeout) {
    return poll(() -> delegate.poll(timeout));
}

public ConsumerRecords<K, V> poll(long timeout) {
    return poll(() -> delegate.poll(timeout));
}

private ConsumerRecords<K, V> poll(Supplier<ConsumerRecords<K, V>> recordsSupplier) {
    final var records = recordsSupplier.get();
    // current implementation

— Reply to this email directly, view it on GitHub https://github.com/openzipkin/brave/issues/1334#issuecomment-1163611827, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAXOYAURIH7BDHPJ2LSGJYDVQN7YBANCNFSM5ZL7S24A . You are receiving this because you are subscribed to this thread.Message ID: @.***>

jeqo commented 2 years ago

@frosiere , great catch! Wasn't aware of this change of behavior based on the parameter type.

Will test the proposed solution locally and post a short PR. thanks!