morganstanley / modern-cpp-kafka

A C++ API for Kafka clients (i.e. KafkaProducer, KafkaConsumer, AdminClient)
Apache License 2.0
348 stars 88 forks source link

Using assign() creates group id #209

Open jonathan-dev opened 1 year ago

jonathan-dev commented 1 year ago

I am using the assign() method to poll messages from a specific kafka topic/partion. According to this stack overflow post here when using the assign() api (at least for java) we don't need consumer groups for this. When running my code however I notice that consumer groups with an auto generated name are created every time.

kafka::Properties properties = kafka::Properties(
      {
       {"bootstrap.servers", {"kafka:29092"}},
       {"max.poll.records", {"500"}},
       {"fetch.min.bytes",{"1"}},
       {"enable.auto.commit",{"false"}},
       {"enable.auto.offset.store", {"false"}},
       {"auto.offset.reset",{"earliest"}}
       });
kafka::clients::consumer::KafkaConsumer timeConsumer =
kafka::clients::consumer::KafkaConsumer(properties);
kafka::TopicPartition tp = std::pair("Time",0);
kafka::TopicPartitions tps({tp});
timeConsumer.assign(tps);
auto now = std::chrono::system_clock::now();
std::chrono::duration<int> tenSec(10);
auto timePoint = now-tenSec;
auto offsets = timeConsumer.offsetsForTime(tps,timePoint);
timeConsumer.seek(tp,offsets[tp]);

while(true){
    records = timeConsumer.poll(std::chrono::milliseconds(100));
    if (records.size() == 0) {
      std::cout << "not messages fetched"<< std::endl;
      break;
    }
...
}

Is there anything I am doing wrong here that is causing the creation of the consumer groups? Or did I possibly get something wrong completely?

Thanks in advance for helping me out :)

kenneth-jia commented 1 year ago

Hi, @jonathan-dev The group.id is vital for a consumer, and each consumer belongs to a "consumer group". https://www.confluent.io/blog/configuring-apache-kafka-consumer-group-ids/ About modern-cpp-kafka API, if there's no group.id configured for consumer properties, a random string would be used for it.

jonathan-dev commented 1 year ago

So does this mean that there is simply no equivalent to the assign method documented here in your library?