micronaut-projects / micronaut-kafka

Integration between Micronaut and Apache Kafka
Apache License 2.0
86 stars 107 forks source link

JMX Metric records-lag incorrect when topic is produced to external to consuming thread #933

Open retinaburn opened 11 months ago

retinaburn commented 11 months ago

Expected Behavior

When viewing the JMX Metric records-lag for a given topic and partition the records-lag should be somewhat comparable to the lag reported in Red Panda/Kafka while producing.

If the production of messages is faster than the consumption of messages the records-lag should increase over time.

Actual Behaviour

If producing to kafka is done via consumption thread then its reported correctly, with the records-lag increasing over time as documented in https://github.com/micronaut-projects/micronaut-kafka/issues/634

If producing to kafka is done via a REST Controller in the project, then the records-lag in JMX does not increase, but the consumer lag does increase when viewed in Red Panda/Kafka.

What's even stranger, is if you pre-load the topic with messages (say 10k) and start the app, then it records the records-lag correctly.

Steps To Reproduce

  1. Navigate to https://micronaut.io/launch
  2. Select kafka, micrometer-jmx, micrometer-prometheus, with name: consumer, java version: 21
  3. Generate Project
  4. Extract demo project
  5. Define Consumer:

    
    @KafkaListener(threads = 2)
    public class Consumer {
    private Logger log = LoggerFactory.getLogger(Consumer.class);
    
    @Inject
    Producer producer;
    
    @Topic("${topic.chat}")
    public void listener(ConsumerRecord<String, String> record) throws InterruptedException{
        log.debug("{}-{}:{} {}={}", record.topic(), record.partition(), record.offset(), record.key(), record.value());
        //Thread.sleep(1000);
        // producer.sendChat("1", "1");
        // producer.sendChat("2", "2");
    }

}

6. Define Producer Interface:

@KafkaClient(id="producer") public interface Producer { @Topic("${topic.chat}") void sendChat(@KafkaKey String key, String value); }


7. Define Controller:

@Controller("/produce") public class BatchController {

@Inject
Producer producer;

@Post("/batch/{numOfThreads}/{numMessages}")
public boolean batchProduce(@PathVariable Integer numOfThreads, @PathVariable Integer numMessages){
    ExecutorService executor = Executors.newFixedThreadPool(numOfThreads);

    for (int i=0; i<numOfThreads; i++){
        ThreadProducer threadProducer = new ThreadProducer(i, producer, numMessages);
        executor.submit(threadProducer);
    }

    return true;
}

class ThreadProducer implements Runnable {
    int numOfMessages;
    String threadId;
    Producer producer;
    ThreadProducer(int threadId, Producer producer, int numOfMessages){
        this.threadId = "" + threadId;
        this.producer = producer;
        this.numOfMessages = numOfMessages;
    }

    @Override
    public void run() {
            for(int j=0; j<numOfMessages; j++){
                producer.sendChat(threadId, ""+j);
            }   
    }

}

}


8. Execute Application
9. Confirm Lag for consumer is 0 in JMX and Red Panda/Kafka
10. `curl -X POST http://localhost:8082/produce/batch/10/100000`
11. Confirm Lag is increasing in Red Panda/Kafka for consumer
12. Confirm jmx kafka.consumer -> consumer-fetch-manager-metrics -> consumer-consumer-11 -> chat-room -> 0 -> Attributes -> records-lag is not increasing

### Environment Information

- Operating System: Windows 11
- JDK: Adoptium Temurin 21

### Example Application

https://github.com/retinaburn/micronaut-kafka-investigation

### Version

4.2.0