fede1024 / rust-rdkafka

A fully asynchronous, futures-based Kafka client library for Rust based on librdkafka
MIT License
1.62k stars 274 forks source link

Nothing gets consumed from at_least_once example #678

Open miladamery opened 6 months ago

miladamery commented 6 months ago

Hi, im trying to consume messages from kafka with rust-rdkafka. i copy pasted at_least_once.rs code but nothing gets consumed from kafka. in console i get following logs:

rebalance: RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
tpl: TPL {ifbrlc-20230805/0: offset=Invalid metadata="", error=Ok(())}

and in my broker:

[2024-05-02 05:00:06,947] INFO [GroupCoordinator 3]: Assignment received from leader rdkafka-e3728e3e-11ae-43b1-a9dd-71716d3642b3 for group milad.group for generation 3. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:00:14,683] INFO [GroupCoordinator 3]: Dynamic member with unknown member id joins group milad.group in Stable state. Created a new member id rdkafka-e808bf6b-1acb-4a46-940c-9e3b48a0a8ac and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:00:51,954] INFO [GroupCoordinator 3]: Member rdkafka-e3728e3e-11ae-43b1-a9dd-71716d3642b3 in group milad.group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:00:51,955] INFO [GroupCoordinator 3]: Preparing to rebalance group milad.group in state PreparingRebalance with old generation 3 (__consumer_offsets-34) (reason: removing member rdkafka-e3728e3e-11ae-43b1-a9dd-71716d3642b3 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:00:51,957] INFO [GroupCoordinator 3]: Group milad.group with generation 4 is now empty (__consumer_offsets-34) (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:00:59,684] INFO [GroupCoordinator 3]: Pending member rdkafka-e808bf6b-1acb-4a46-940c-9e3b48a0a8ac in group milad.group has been removed after session timeout expiration. (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:03:58,331] INFO [GroupCoordinator 3]: Dynamic member with unknown member id joins group milad.group in Empty state. Created a new member id rdkafka-069fb102-d938-4916-a7b5-411c635d4c6c and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:03:58,332] INFO [GroupCoordinator 3]: Preparing to rebalance group milad.group in state PreparingRebalance with old generation 4 (__consumer_offsets-34) (reason: Adding new member rdkafka-069fb102-d938-4916-a7b5-411c635d4c6c with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:04:01,335] INFO [GroupCoordinator 3]: Stabilized group milad.group generation 5 (__consumer_offsets-34) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:04:01,341] INFO [GroupCoordinator 3]: Assignment received from leader rdkafka-069fb102-d938-4916-a7b5-411c635d4c6c for group milad.group for generation 5. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

here is my code:

use std::future;
use std::time::Duration;
use futures::StreamExt;
use rdkafka::{ClientConfig, ClientContext, Message, Statistics, TopicPartitionList};
use rdkafka::client::NativeClient;
use rdkafka::config::RDKafkaLogLevel;
use rdkafka::consumer::{Consumer, ConsumerContext, Rebalance, StreamConsumer};
use rdkafka::error::{KafkaError, KafkaResult};
use rdkafka::types::RDKafkaRespErr;

// A simple context to customize the consumer behavior and print a log line every time
// offsets are committed
struct LoggingConsumerContext;

impl ClientContext for LoggingConsumerContext {
    fn stats(&self, statistics: Statistics) {
        println!("Stats Received: {:?}", statistics);
    }

    fn error(&self, error: KafkaError, reason: &str) {
        println!("error: {}, reason: {}", error, reason);
    }
}

impl ConsumerContext for LoggingConsumerContext {
    fn rebalance(&self, native_client: &NativeClient, err: RDKafkaRespErr, tpl: &mut TopicPartitionList) {
        println!("rebalance: {:?}", err);
        println!("tpl: {:?}", tpl);
    }

    fn pre_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {
        println!("pre_rebalance: {:?}", rebalance);
    }

    fn post_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {
        println!("post_rebalance: {:?}", rebalance);
    }

    fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {
        match result {
            Ok(_) => println!("Offsets committed successfully"),
            Err(e) => println!("Error while committing offsets: {}", e),
        };
    }

}

type LoggingConsumer = StreamConsumer<LoggingConsumerContext>;

fn create_consumer(brokers: &str, group_id: &str, topic: &str) -> LoggingConsumer {
    let context = LoggingConsumerContext;

    let consumer: LoggingConsumer = ClientConfig::new()
        .set("group.id", group_id)
        .set("bootstrap.servers", brokers)
        //.set("enable.partition.eof", "false")
        .set("session.timeout.ms", "6000")
        .set("max.poll.interval.ms", "20000")
        // Commit automatically every 5 seconds.
        .set("enable.auto.commit", "true")
        //.set("auto.commit.interval.ms", "5000")
        // but only commit the offsets explicitly stored via `consumer.store_offset`.
        //.set("enable.auto.offset.store", "false")
        //.set("auto.offset.reset", "earliest")
        .set_log_level(RDKafkaLogLevel::Debug)
        .create_with_context(context)
        .expect("Consumer creation failed");

    consumer
        .subscribe(&[topic])
        .expect("Can't subscribe to specified topic");

    consumer
}

#[tokio::main]
async fn main() {
    let consumer = create_consumer(
        "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094",
        "milad.group",
        "ifbrlc-20230805"
    );

    loop {
        match consumer.recv().await {
            Err(e) => {
                println!("Kafka error: {}", e);
            }
            Ok(m) => {
                println!("{:?}", m.payload());
            }
        }
    }
}

and this is my docker compose that runs kafka:

version: '2.1'

services:
  zoo1:
    image: confluentinc/cp-zookeeper:latest
    hostname: zoo1
    container_name: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888

  kafka1:
    image: confluentinc/cp-kafka:latest
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1

  kafka2:
    image: confluentinc/cp-kafka:latest
    hostname: kafka2
    container_name: kafka2
    ports:
      - "9093:9093"
      - "29093:29093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1

  kafka3:
    image: confluentinc/cp-kafka:latest
    hostname: kafka3
    container_name: kafka3
    ports:
      - "9094:9094"
      - "29094:29094"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 3
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1

  kafka-ui:
    container_name: kafka-ui
    image: docker.arvancloud.ir/provectuslabs/kafka-ui:latest
    ports:
      - 8080:8080
    depends_on:
      - kafka1
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:19092

In the loop section of main nothing gets printed and even when i debug and set breakpoint on both Error and Ok section it doesnt stop.

my cargo.toml is :

[package]
name = "kafka-consumer-test"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rdkafka = "0.36.2"
tokio = {version = "1.37.0", features = ["full"]}
futures = "0.3.30"

What part Im doing wrong? How can i fix this?

buraktabn commented 2 months ago

Did you manage to solve this?

miladamery commented 2 months ago

@buraktabn I can't say I solved this issue. But creating a custom consumer won't consume anything. but if you use default consumer of rdkafka it will work