opensource4you / astraea

釋放kafka的無限潛能
Apache License 2.0
142 stars 58 forks source link

[PERF] consumer polling overhead optimization #1586

Closed garyparrot closed 1 year ago

garyparrot commented 1 year ago

Context: https://github.com/skiptests/astraea/issues/1567#issuecomment-1477200413

Astraea 包裝的 Consumer 有效能問題,上圖是用我自己寫的 Apache Kafka Consumer 跑的(一樣的 Consumer Group Size & Consumer Config 下 Astraea Consumer 撐不到一分鐘)

Astraea 的 Consumer 有一點額外的 ovrehead 存在,下面的程式碼預先對 Cluster 建立一些資料,然後分別以 KafkaConsumer 和 Astraea 的 Consumer 進行撈取,反映其整體撈取的速度差距。

```java package org.astraea.app; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.BytesDeserializer; import org.apache.kafka.common.serialization.BytesSerializer; import org.apache.kafka.common.utils.Bytes; import org.astraea.common.DataRate; import org.astraea.common.Utils; import org.astraea.common.admin.Admin; import org.astraea.common.admin.ClusterInfo; import org.astraea.common.admin.Replica; import org.astraea.common.consumer.Consumer; import org.astraea.common.consumer.SubscribedConsumer; import org.astraea.common.producer.Producer; import org.astraea.it.Service; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; public class IgnoreConsumerTest { private static final String topicName = "TheTopic"; private static final String bootstrap = "192.168.0.4:10636"; @BeforeAll static void setupTopic() { try (var admin = Admin.of(bootstrap)) { admin.creator().topic(topicName).numberOfPartitions(3).run().toCompletableFuture().join(); long sum = admin.clusterInfo(Set.of(topicName)).toCompletableFuture().join().replicas().stream().mapToLong(Replica::size).sum(); if (sum == 0) { try (var producer = Producer.of(bootstrap)) { var bytes = new byte[1024]; producer.send(IntStream.range(0, 5_000_000) .mapToObj(i -> org.astraea.common.producer.Record.builder() .topic(topicName) .value(bytes) .build()) .collect(Collectors.toUnmodifiableList())) .forEach(i -> i.toCompletableFuture().join()); } } } } @Test void testAstraeaConsumer() { try (var consumer = Consumer.forTopics(Set.of(topicName)) .bootstrapServers(bootstrap) .config(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") .build()) { int consumed = 0; var s = System.nanoTime(); while (!Thread.currentThread().isInterrupted()) { int cnt = consumer.poll(Duration.ofSeconds(1)).size(); consumed += cnt; if(consumed >= 5_000_000) break; } var t = System.nanoTime(); System.out.println("Record: " + consumed); System.out.println("AstraeaConsumer Time: " + Duration.ofNanos(t - s).toMillis() + " ms"); System.out.println("AstraeaConsumer Record Rate: " + consumed / ((t - s) / 1e9)); System.out.println("AstraeaConsumer Byte Rate: " + DataRate.Byte.of((long)(consumed * 1024.0 / ((t - s) / 1e9))).perSecond()); } } @Test void testKafkaConsumer() { try (var consumer = new KafkaConsumer(Map.ofEntries( Map.entry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap), Map.entry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class), Map.entry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class), Map.entry(ConsumerConfig.GROUP_ID_CONFIG, Utils.randomString()), Map.entry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")))) { consumer.subscribe(Set.of(topicName)); int consumed = 0; var s = System.nanoTime(); while (!Thread.currentThread().isInterrupted()) { int cnt = consumer.poll(Duration.ofSeconds(1)).count(); consumed += cnt; if(consumed >= 5_000_000) break; } System.out.println("Record: " + consumed); var t = System.nanoTime(); System.out.println("KafkaConsumer Time: " + Duration.ofNanos(t - s).toMillis() + " ms"); System.out.println("KafkaConsumer Record Rate: " + consumed / ((t - s) / 1e9)); System.out.println("KafkaConsumer Byte Rate: " + DataRate.Byte.of((long)(consumed * 1024.0 / ((t - s) / 1e9))).perSecond()); } } } ```

我的設備的執行結果

# KafkaConsumer
Record: 5000000
KafkaConsumer Time: 11549 ms
KafkaConsumer Record Rate: 432927.1026343041
KafkaConsumer Byte Rate: 443.32 MB/seconds

# Astraea Consumer
Record: 5000000
AstraeaConsumer Time: 13201 ms
AstraeaConsumer Record Rate: 378732.7562436388
AstraeaConsumer Byte Rate: 387.82 MB/seconds

可以看到透過 KafkaConsumer 直接撈取可以達到約 450 MB/s 的速度,而 Astraea 的版本只有 390 MB/s 附近。

chia7712 commented 1 year ago

@garyparrot 你要處理嗎?如果在忙其他項目的話,我接手處理

garyparrot commented 1 year ago

你要處理嗎?如果在忙其他項目的話,我接手處理

學長您方便的話可以先處理,我這邊還有 Lag 的問題能研究。