Open yeomko22 opened 3 years ago
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<String, String> record: records) {
ConsumerWorker worker = new ConsumerWorker(record.value());
executorService.execute(worker);
}
ExcutorService executorSrervice = Executors.newCachedThreadPool();
for (int i=0; i<CONSUMER_COUNT; i++) {
ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
executorService.execute(worker);
}
for (Map.Extry<MetricName, ? extends Metric> entry : kafkaConsumer.metrics().entrySet()) {
if ("records-lag-max".equl(etnry.getKey().name()) |
"records-lag".equl(etnry.getKey().name()) |
"records-lag-avg".equl(etnry.getKey().name())) {
Metric metric = entry.getValue();
logger.info("{}:{}", entry.getKey().name(), metric.metricValue());
}
}
multithread consumer
컨슈머 운영 전략