vert-x3 / vertx-kafka-client

Reactive Kafka Client for Vert.x
Apache License 2.0
84 stars 82 forks source link

High load when broker become unavailable #194

Closed cescoffier closed 3 years ago

cescoffier commented 3 years ago

The consumer runs into a poll loop when the broker becomes unavailable. That introduces a high load on the system, and may even kill the application when the CPU usage exceeds the quota.

Version

3.9.5

Context

Just a simple consumer (as a stream) and close the broker while polling record.

Steps to reproduce

  1. Create a consumer and consume the records as a stream
  2. Close the broker
  3. Check the number of polling and the load
cescoffier commented 3 years ago

I also noticed that sometimes the high-load happens after a metadata retrieval failure, as it poll constantly (without getting any records).

tsegismont commented 3 years ago

Does this apply to 4.0 too?

cescoffier commented 3 years ago

Yes, for both. The code didn't change much around that area.

tsegismont commented 3 years ago

@cescoffier can you help creating a small reproducer? I tried:

public class ProducerVerticle extends AbstractVerticle {

  @Override
  public void start() throws Exception {
    Map<String, String> config = new HashMap<>();
    config.put("bootstrap.servers", "localhost:9092");
    config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    config.put("group.id", "my_group");
    config.put("auto.offset.reset", "earliest");
    config.put("enable.auto.commit", "false");

    KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config);

    AtomicInteger counter = new AtomicInteger();
    vertx.setPeriodic(1000, l-> {
      producer.write(KafkaProducerRecord.create("mytopic", String.valueOf(counter.getAndIncrement())))
        .onFailure(Throwable::printStackTrace);
    });
  }
}

public class ConsumerVerticle extends AbstractVerticle {

  @Override
  public void start() throws Exception {
    Map<String, Object> config = new HashMap<>();
    config.put("bootstrap.servers", "localhost:9294");
    config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    config.put("group.id", "my_group");
    config.put("auto.offset.reset", "earliest");
    config.put("enable.auto.commit", "false");

    KafkaReadStream<String, String> consumer = KafkaReadStream.create(vertx, config);
    consumer.handler(record-> {
      System.out.println("record = " + record);
    });
    consumer.subscribe(Collections.singleton("mytopic")).onFailure(Throwable::printStackTrace);
  }
}

But when I stop the broker, my system load remains low.

Thanks

cescoffier commented 3 years ago

We have this one: https://github.com/quarkusio/quarkus/issues/14366 It also does it with the kafka quickstart. I was able to reduce the load on the producer and admin side, but for the consumer, it runs in a poll loop, rescheduling a poll immediately (schedule(0)).

It happens after ~ 1 minute after the shutdown of the broker.

tsegismont commented 3 years ago

@cescoffier thanks I'll try again tomorrow. I had only stopped the broker, perhaps that's why I was not able to reproduce.

Yeah I suspected this is due to the error handling in that method that keeps rescheduling. But if you can't reproduce, you can't be sure it's fixed :wink:

tsegismont commented 3 years ago

@cescoffier I tried again without success. No CPU spike with both 3.9.4 and 4.0.0.

I added some logging with the debugger, the pollRecords method is invoked every second (which I guess matches the Kafka library's default).

tsegismont commented 3 years ago

@cescoffier actually this 1 sec timeout is the poll timeout defined on KafkaReadStream. Perhaps this has been set to zero in with io.vertx.kafka.client.consumer.KafkaReadStream#pollTimeout(java.time.Duration)?

There is no protection in our code to make sure the value is strictly positive.Perhaps we could do that.

cescoffier commented 3 years ago

It's a tricky one. It took me 5 times to have one "high-load" (~50%). It always happens after:

org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets {prices-0=OffsetAndMetadata{offset=48, leaderEpoch=null, metadata=''}}

This is passed to the exception handler of the KafkaReadStream.

Let me try to set the pollTimeout to a higher value to see if that's improve it.

cescoffier commented 3 years ago

No same things with a pollTimeout set to 10 seconds.

What I'm seeing is located in https://github.com/vert-x3/vertx-kafka-client/blob/master/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java#L155-L173

  1. Broker is down, after 1 minute, you get the exception, but it retries (which is good) as the polling tasked is re-enqueued
  2. As the broker is down and now it just gets an empty set of records (almost immediately), and so submitted is kept to false
  3. The finally block enqueues the next task without delay, and it just goes through that loop continuously.
tsegismont commented 3 years ago

@cescoffier I tried with the kafka-quickstart of Quarkus without success. (Note that in master branch it uses Vert.x 3.7)

What I'm seeing is located in https://github.com/vert-x3/vertx-kafka-client/blob/master/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java#L155-L173

I see which method you're talking about but:

Broker is down, after 1 minute, you get the exception, but it retries (which is good) as the polling tasked is re-enqueued

I can't get to reproduce this

As the broker is down and now it just gets an empty set of records (almost immediately), and so submitted is kept to false

This does not match my observation: when pollRecords is invoked, it returns an empty set of records after the timeout as expired, not immediately.

The finally block enqueues the next task without delay, and it just goes through that loop continuously.

Looking at the code again, the only way I can see the task ending and submitting another immediately is when pollRecords throws an exception. If you're able to reproduce, can you set an exceptionHandler so that we understand which exception it is?

cescoffier commented 3 years ago

The exception handler gets a timeout exception about the metadata retrieval.

BTW, you need to try the development branch on the quickstart repository.

tsegismont commented 3 years ago

@cescoffier today I was able to reproduce on the development branch of the quickstart repo. I will come back to you soon.

tsegismont commented 3 years ago

I can't reproduce anymore when I force the kafka-clients version to 2.6.0. Can you?

With version 2.5.0, I did some debugging and it seems after the 60s delay that you mentioned the Kafka client goes into a broken state (tight loop in pollRecords that also keeps waking up the heartbeat checker).

cescoffier commented 3 years ago

That's interesting, I actually didn't try with 2.6 as at the moment I'm stuck with 2.5. I will try with 2.6 and see if we can manage to update.

I completely agree about the "broken state", very odd behavior.

cescoffier commented 3 years ago

@tsegismont I confirm. I just updated locally to 2.6 and the issue is gone!

Closing this one, now, let's try to bump the kafka version in Quarkus.