mfontanini / cppkafka

Modern C++ Apache Kafka client library (wrapper for librdkafka)
BSD 2-Clause "Simplified" License
600 stars 206 forks source link

getting old messages when reconnecting consumer #253

Open alagiz opened 4 years ago

alagiz commented 4 years ago

hello,

i have several consumers in the same consumer group, when a consumer goes offline and comes back online - it still gets old messages.

was trying to follow directions given in https://github.com/mfontanini/cppkafka/issues/243 and https://github.com/mfontanini/cppkafka/issues/244 but still getting old messages when a consumer comes back online.

would appreciate any pointers at the potential solution.

here's my code:

code ```cpp #include #include #include #include #include #include "cppkafka/topic_partition_list.h" #include #include "cppkafka/consumer.h" #include "cppkafka/producer.h" #include "cppkafka/group_information.h" #include "cppkafka/kafka_handle_base.h" #include "cppkafka/utils/roundrobin_poll_strategy.h" using std::string; using std::exception; using std::vector; using std::cout; using std::endl; using cppkafka::Consumer; using cppkafka::Producer; using cppkafka::Configuration; using cppkafka::TopicConfiguration; using cppkafka::GroupInformation; using cppkafka::Message; using cppkafka::TopicPartitionList; using cppkafka::TopicPartition; using cppkafka::RoundRobinPollStrategy; using cppkafka::GroupMemberInformation; using cppkafka::MemberAssignmentInformation; using cppkafka::GroupInformationList; int main(void) { try { std::string topicName = "queueing.job.test"; std::string groupId = "testGroup"; // create consumer config Configuration configConsumer = { {"group.id", groupId.c_str()}, {"metadata.broker.list", "kafka:9092"}, {"enable.auto.commit", false}, {"client.id", rand()} }; // Build a topic configuration TopicConfiguration topicConfig = { {"auto.offset.reset", "latest"} }; configConsumer.set_default_topic_configuration(topicConfig); // create consumer Consumer consumer(configConsumer); TopicPartitionList assignment; // manage assigned partitions consumer.set_assignment_callback([&](const TopicPartitionList &partitions) { cout << "Got assigned: " << partitions << endl; TopicPartitionList initialPartitionList = consumer.get_offsets_committed(partitions); cout << "initialPartitionList: " << initialPartitionList << endl; TopicPartitionList topicPartitionList = {}; for (int i = 0; i < initialPartitionList.size(); i++) { cout << "new offset " << initialPartitionList[i].get_offset() + 1 << endl; TopicPartition topicPartition(topicName, initialPartitionList[i].get_partition(), initialPartitionList[i].get_offset() + 1); topicPartitionList.push_back(topicPartition); } assignment = topicPartitionList; consumer.assign(topicPartitionList); }); // print the revoked partitions on revocation consumer.set_revocation_callback([](const TopicPartitionList &partitions) { cout << "Got revoked: " << partitions << endl; }); consumer.subscribe({topicName}); cout << "Consuming messages from topic " << topicName << endl; RoundRobinPollStrategy poll_strategy(consumer); // read lines and write them into kafka while (true) { // try to consume a message Message message = poll_strategy.poll(); if (message) { // if we managed to get a message if (message.get_error()) { // ignore EOF notifications from rdkafka if (!message.is_eof()) { cout << "[+] Received error notification: " << message.get_error() << endl; } } else { // print the key (if any) if (message.get_key()) { cout << message.get_key() << " -> "; } // print the payload cout << message.get_payload() << endl; // commit the message consumer.commit(message); } } } } catch (std::exception &ex) { std::cerr << "Exception: " << ex.what() << endl; return -1; } cout << "No exceptions" << endl; return 0; } ```

here are the logs for 1 consumer:

# start a consumer with topic that has 2 partitions
kafka_1       | [2020-06-18 12:39:49,957] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group testGroup in Empty state. Created a new member id 1804289383-2c0edf6a-4823-4c1c-b070-7cade019c109 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
kafka_1       | [2020-06-18 12:39:49,959] INFO [GroupCoordinator 1001]: Preparing to rebalance group testGroup in state PreparingRebalance with old generation 0 (__consumer_offsets-49) (reason: Adding new member 1804289383-2c0edf6a-4823-4c1c-b070-7cade019c109 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
kafka_1       | [2020-06-18 12:39:49,961] INFO [GroupCoordinator 1001]: Stabilized group testGroup generation 1 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
kafka_1       | [2020-06-18 12:39:49,967] INFO [GroupCoordinator 1001]: Assignment received from leader for group testGroup for generation 1 (kafka.coordinator.group.GroupCoordinator)
consumer-0_1  | Got assigned: [ queueing.job.test[0:#], queueing.job.test[1:#] ]
consumer-0_1  | initialPartitionList: [ queueing.job.test[0:#], queueing.job.test[1:#] ]
consumer-0_1  | new offset -1000
consumer-0_1  | new offset -1000

# produce 2 messages
consumer-0_1  | {"userId":"jimmy","jobId":"jimmy_40c70617-ec08-40ab-b81a-c69081df3a6c","jobStep":0,"isJobDone":false}
consumer-0_1  | {"userId":"jimmy","jobId":"jimmy_1ed542b3-b82c-44a7-8b99-61c036046039","jobStep":0,"isJobDone":false}

# kill the consumer
consumer-0_1 exited with code 137
kafka_1       | [2020-06-18 12:42:05,023] INFO [GroupCoordinator 1001]: Member 1804289383-2c0edf6a-4823-4c1c-b070-7cade019c109 in group testGroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
kafka_1       | [2020-06-18 12:42:05,025] INFO [GroupCoordinator 1001]: Preparing to rebalance group testGroup in state PreparingRebalance with old generation 1 (__consumer_offsets-49) (reason: removing member 1804289383-2c0edf6a-4823-4c1c-b070-7cade019c109 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
kafka_1       | [2020-06-18 12:42:05,028] INFO [GroupCoordinator 1001]: Group testGroup with generation 2 is now empty (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)

# bring the consumer back to life (back to reality)
consumer-0_1  | Consuming messages from topic queueing.job.test
consumer-0_1  | Got assigned: [ queueing.job.test[0:#], queueing.job.test[1:#] ]
consumer-0_1  | initialPartitionList: [ queueing.job.test[0:#], queueing.job.test[1:#] ]
consumer-0_1  | new offset -1000
consumer-0_1  | new offset -1000

# here messages are consumed again at startup
consumer-0_1  | {"userId":"jimmy","jobId":"jimmy_40c70617-ec08-40ab-b81a-c69081df3a6c","jobStep":0,"isJobDone":false}
consumer-0_1  | {"userId":"jimmy","jobId":"jimmy_1ed542b3-b82c-44a7-8b99-61c036046039","jobStep":0,"isJobDone":false}
consumer-0_1  | Consuming messages from topic queueing.job.test
kafka_1       | [2020-06-18 12:42:31,485] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group testGroup in Empty state. Created a new member id 1804289383-612701f3-d5ee-4e69-804c-83e4b520cb8f for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
kafka_1       | [2020-06-18 12:42:31,486] INFO [GroupCoordinator 1001]: Preparing to rebalance group testGroup in state PreparingRebalance with old generation 2 (__consumer_offsets-49) (reason: Adding new member 1804289383-612701f3-d5ee-4e69-804c-83e4b520cb8f with group instance id None) (kafka.coordinator.group.GroupCoordinator)
kafka_1       | [2020-06-18 12:42:31,488] INFO [GroupCoordinator 1001]: Stabilized group testGroup generation 3 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
kafka_1       | [2020-06-18 12:42:31,492] INFO [GroupCoordinator 1001]: Assignment received from leader for group testGroup for generation 3 (kafka.coordinator.group.GroupCoordinator)
consumer-0_1  | Got assigned: [ queueing.job.test[0:#], queueing.job.test[1:#] ]
consumer-0_1  | initialPartitionList: [ queueing.job.test[0:1], queueing.job.test[1:1] ]
consumer-0_1  | new offset 2
consumer-0_1  | new offset 2
alagiz commented 4 years ago

here's my setup:

install cmake 3.15.5 (running apt-get install cmake installs 3.10.0 as it is default in ubuntu:bionic)

ADD https://cmake.org/files/v3.15/cmake-3.15.5-Linux-x86_64.sh /cmake-3.15.5-Linux-x86_64.sh RUN mkdir /opt/cmake RUN sh /cmake-3.15.5-Linux-x86_64.sh --prefix=/opt/cmake --skip-license RUN ln -s /opt/cmake/bin/cmake /usr/local/bin/cmake RUN cmake --version

RUN apt-get update && apt-get install -y build-essential libgl1-mesa-dev curl unzip tar git libboost-all-dev \ librdkafka-dev libssl-dev freeglut3 libglu1-mesa-dev \ freeglut3-dev libturbojpeg0-dev && rm -rf /var/lib/apt/lists/*

RUN git clone https://github.com/mfontanini/cppkafka.git RUN cd cppkafka && mkdir build && cd build && cmake .. && make && make install

COPY src/messaging/consumer/consumer.cpp /cppkafka/

RUN cd cppkafka && g++ consumer.cpp -std=c++17 -lpthread -lz -lstdc++ -lcppkafka -lrdkafka -o main

================== run image ==================

FROM ubuntu:bionic

RUN apt-get update && apt-get install -y librdkafka-dev

cppkafka library is in /usr/local/lib

ENV LD_LIBRARY_PATH /usr/local/lib:$LD_LIBRARY_PATH

COPY --from=BUILD /cppkafka/build/* /usr/app/ COPY --from=BUILD /cppkafka/main /usr/app/ COPY --from=BUILD /usr/local/lib/libcppkafka.so /usr/local/lib/ COPY --from=BUILD /usr/local/lib/libcppkafka.so.0.3.1 /usr/local/lib/

WORKDIR /usr/app

CMD ["./main"]

* docker-compose file i am using 
```yaml
version: '3.2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    depends_on:
      - "zookeeper"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_PORT: "9092"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_HEAP_OPTS: -Xmx256m -Xms256m
      KAFKA_NUM_PARTITIONS: 2
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    ports:
      - "9092:9092"
  performer:
    image: kafka-utils
    command: ["java",  "-jar", "/app.jar"]
    ports:
      - "3103:8080"
  consumer-0:
    image: consumer
    command: sh -c "
      sleep 15 && ./main"
    depends_on:
    - kafka
  consumer-1:
    image: consumer
    command: sh -c "
      sleep 15 && ./main"
    depends_on:
      - kafka

docker stop

* to bring the consumer back to life
```bash
docker start <id-of-consumer-0>