morganstanley / modern-cpp-kafka

A C++ API for Kafka clients (i.e. KafkaProducer, KafkaConsumer, AdminClient)
Apache License 2.0
348 stars 88 forks source link

Interceptor for broker's state #182

Closed kenneth-jia closed 1 year ago

kenneth-jia commented 1 year ago

This PR would be pending for a while, until a new librdkafka release is available (with https://github.com/edenhill/librdkafka/pull/4043)

Test output,

[ RUN      ] KafkaConsumer.BasicPoll
[2022-11-01 22:19:17.786359] kafka::Topic[6f794424-c55fa042] would be used
[2022-11-01 22:19:23.136371] CreateKafkaTopic: create topic[6f794424-c55fa042] with numPartitions[5], replicationFactor[3]. Result: Success
Broker[-1 - plaintext://GroupCoordinator:0] ==> INIT
Broker[-1 - plaintext://:0] ==> INIT
Broker[-1 - plaintext://127.0.0.1:40091] ==> INIT
Broker[-1 - plaintext://127.0.0.1:40092] ==> INIT
Broker[-1 - plaintext://127.0.0.1:40093] ==> INIT
[2022-11-01 22:19:28.138534]NOTICE KafkaConsumer[d86cbe85-bce18c70] initialized with properties[auto.commit.interval.ms=0|bootstrap.servers=127.0.0.1:40091,127.0.0.1:40092,127.0.0.1:40093|client.id=d86cbe85-bce18c70|enable.auto.commit=true|enable.auto.offset.store=true|group.id=d86d20f5-e1acece|log_level=5|max.poll.records=500]
[2022-11-01 22:19:28.138573] KafkaConsumer[d86cbe85-bce18c70] started
Broker[-1 - plaintext://127.0.0.1:40092] ==> TRY_CONNECT
Broker[-1 - plaintext://127.0.0.1:40092] ==> CONNECT
Broker[-1 - plaintext://127.0.0.1:40092] ==> APIVERSION_QUERY
Broker[-1 - plaintext://127.0.0.1:40092] ==> UP
Broker[1 - plaintext://127.0.0.1:40092] ==> UPDATE
Broker[1 - plaintext://127.0.0.1:40092] ==> UP
Broker[2 - plaintext://GroupCoordinator:0] ==> TRY_CONNECT
Broker[2 - plaintext://GroupCoordinator:0] ==> CONNECT
Broker[2 - plaintext://GroupCoordinator:0] ==> APIVERSION_QUERY
Broker[2 - plaintext://GroupCoordinator:0] ==> UP
[2022-11-01 22:19:31.413118]NOTICE KafkaConsumer[d86cbe85-bce18c70] re-balance event triggered[ASSIGN_PARTITIONS], cooperative[disabled], topic-partitions[6f794424-c55fa042-0,6f794424-c55fa042-1,6f794424-c55fa042-2,6f794424-c55fa042-3,6f794424-c55fa042-4]
[2022-11-01 22:19:31.413542] assigned partitions: 6f794424-c55fa042-0,6f794424-c55fa042-1,6f794424-c55fa042-2,6f794424-c55fa042-3,6f794424-c55fa042-4
[2022-11-01 22:19:31.413569]NOTICE KafkaConsumer[d86cbe85-bce18c70] subscribed, topics[6f794424-c55fa042]
Broker[0 - plaintext://127.0.0.1:40091] ==> TRY_CONNECT
Broker[2 - plaintext://127.0.0.1:40093] ==> TRY_CONNECT
Broker[0 - plaintext://127.0.0.1:40091] ==> CONNECT
Broker[2 - plaintext://127.0.0.1:40093] ==> CONNECT
Broker[0 - plaintext://127.0.0.1:40091] ==> APIVERSION_QUERY
Broker[2 - plaintext://127.0.0.1:40093] ==> APIVERSION_QUERY
Broker[0 - plaintext://127.0.0.1:40091] ==> UP
Broker[2 - plaintext://127.0.0.1:40093] ==> UP
[2022-11-01 22:19:32.415571] KafkaConsumer[d86cbe85-bce18c70] polled 0 messages
[2022-11-01 22:19:32.415622] Consumer get the beginningOffset[0]
[2022-11-01 22:19:32.522364]NOTICE KafkaProducer[ddc358d6-e8b1c6c1] initializes with properties[bootstrap.servers=127.0.0.1:40091,127.0.0.1:40092,127.0.0.1:40093|client.id=ddc358d6-e8b1c6c1|log_level=5]
[2022-11-01 22:19:32.839425] ProduceMessages: 3 messages have been sent to 6f794424-c55fa042-0
[2022-11-01 22:19:37.857922] KafkaConsumer[d86cbe85-bce18c70] polled 3 messages
[2022-11-01 22:19:37.867516]NOTICE KafkaConsumer[d86cbe85-bce18c70] re-balance event triggered[REVOKE_PARTITIONS], cooperative[disabled], topic-partitions[6f794424-c55fa042-0,6f794424-c55fa042-1,6f794424-c55fa042-2,6f794424-c55fa042-3,6f794424-c55fa042-4]
[2022-11-01 22:19:37.872005]NOTICE KafkaConsumer[d86cbe85-bce18c70] closed
Broker[-1 - plaintext://GroupCoordinator:0] ==> DOWN
Broker[-1 - plaintext://GroupCoordinator:0] ==> INIT
Broker[1 - plaintext://127.0.0.1:40092] ==> DOWN
Broker[-1 - plaintext://:0] ==> DOWN
Broker[0 - plaintext://127.0.0.1:40091] ==> DOWN
Broker[-1 - plaintext://GroupCoordinator:0] ==> DOWN
Broker[2 - plaintext://127.0.0.1:40093] ==> DOWN
[       OK ] KafkaConsumer.BasicPoll (20086 ms)