timeplus-io / proton

A streaming SQL engine, a fast and lightweight alternative to ksqlDB and Apache Flink, 🚀 powered by ClickHouse.
https://timeplus.com
Apache License 2.0
1.5k stars 64 forks source link

Reuse same pooled Kafka(Simple)Consumer #544

Open chenziliang opened 7 months ago

chenziliang commented 7 months ago

Describe what enhancement you'd like to have

librdkafka shall allow reuse an existing Kafka consumer for different topic partition.

For example, at time t1, user queries topic1 by using external stream, we create a Kafka consumer and pool it. at time t2 while t1 query is still running, user queries topic2 by using external stream, for now we will create another Kafka consumer and pool it.

We shall be able to reuse Kafka consumer at t1 for t2 query instead of recreating another Kafka consumer which is actually very expensive and most importantly it is not scale.

Probably same for producer if we create separate producer on the fly for every external stream and if there are hundreds of them.

zliang-min commented 6 months ago

A consumer is bound to a specific topic parition ( using the legacy simpler consumer API ) or a specific list of paritions (using the new consumer API ). So, I am not quite sure what it exactly means to resue a consumer, or maybe you are talking about the KafkaWALSimpleConsumer ? But I want to understand more about why "recreating another kafka consumer is very expensive". At the end of the day, in order to consume from another topic/partition, another consumer has to be created anyways ( from the pont of view of librdkafka ). Could you elaborate, please?

From librdkafka side, what cachable are rd_kafkt_t and rd_kafka_topic_t ( if we use the new consumer API, we don't even need to cache rd_kafka_topic_t ).

BTW, I think we should refactor the pooling/caching for kafka external stream. Currently it uses KafkaWALPool, which I think (from the code) was originally deisnged for logstore storage. It looks to me that it's not the best idea to mix logstore and external stream together. It also has leaking issues like this. There are some useful concepts/tools existing in the klog namespace that can be reused in external stream, but it does not mean we have to resue everything there to make it more complex than it has to be.