SeaQL / sea-streamer

🌊 A real-time stream processing toolkit for Rust
https://www.sea-ql.org/SeaStreamer/
Apache License 2.0
263 stars 9 forks source link

Cannot consume from Kafka using the streams. #31

Closed gadLinux closed 4 months ago

gadLinux commented 4 months ago

Hi Team,

Thank you for this marvelous piece of software. It has the bits I needed for my project, hope I can publish soon.

I'm doing something like this:

#[tokio::main]
async fn main() -> Result<()> {
    tracing_subscriber::registry()
        .with(EnvFilter::from_default_env())
        .with(tracing_subscriber::fmt::layer())
        .try_init()
        .context("initialize tracing").expect("Cannot initialize tracing");

    let config = Config::load().context("load configuration").expect("Cannot load configuration");
    println!("Starting with configuration: {config:?}");
    let stream:StreamUrl = config.event_log.bootstrap.parse().unwrap();
    let mut options = KafkaConsumerOptions::new(ConsumerMode::RealTime);

    let streamer = KafkaStreamer::connect(stream.streamer(), Default::default())
        .await.unwrap();

    let mut consumer: KafkaConsumer = streamer
        .create_consumer(stream.stream_keys(), options)
        .await.unwrap();

    let test_mess = consumer.next().await.expect("Cannot consume");
    info!("Message: {}", test_mess.message().as_str().unwrap());;
    Ok(())
}

But the software stops in the consumer.next() I also used the stream API with the same result.

I tried setting the consumer group, consuming from earliest, send more messages to the topic. Nothing. It just stops in poll().

What can be going in. Version I use is 0.5.0.

But I have messages

❯ kcat -t test-topic-2 -b 172.16.0.163:9092 -p 4 % Auto-selecting Consumer mode (use -P or -C to override) {"test-message": "testPayload"} {"test-message": "testPayload"} {"test-message": "testPayload"} {"test-message": "testPayload"} {"test-message": "testPayload"} % Reached end of topic test-topic-2 [4] at offset 5

gadLinux commented 4 months ago

Tried with LoadBalanced with a consumer group and I got same result, seems like there's no messages... I also did reset, and seek, same results.

gadLinux commented 4 months ago

2024-06-03T09:00:11.676721Z TRACE rdkafka::client: Create new librdkafka client 0x7fc1e803cc00
2024-06-03T09:00:11.677003Z TRACE rdkafka::admin: Admin polling thread loop started
2024-06-03T09:00:11.688990Z TRACE rdkafka::admin: Stopping polling
2024-06-03T09:00:11.689049Z TRACE rdkafka::admin: Waiting for polling thread termination
2024-06-03T09:00:11.782150Z TRACE rdkafka::admin: Admin polling thread loop terminated
2024-06-03T09:00:11.782447Z TRACE rdkafka::admin: Polling stopped
2024-06-03T09:00:11.782492Z TRACE rdkafka::util: Destroying client: 0x7fc1e803cc00
2024-06-03T09:00:11.782882Z TRACE rdkafka::util: Destroyed client: 0x7fc1e803cc00
2024-06-03T09:00:11.782920Z TRACE rdkafka::util: Destroying queue: 0x60000271ae20
2024-06-03T09:00:11.782941Z TRACE rdkafka::util: Destroyed queue: 0x60000271ae20
2024-06-03T09:00:11.783666Z TRACE rdkafka::client: Create new librdkafka client 0x7fc1e883d400
2024-06-03T09:00:11.784046Z TRACE rdkafka::util: Destroying topic partition list: 0x600002514570
2024-06-03T09:00:11.784032Z TRACE rdkafka::consumer::stream_consumer: Starting stream consumer wake loop: 0x7fc1e883d400
2024-06-03T09:00:11.784072Z TRACE rdkafka::util: Destroyed topic partition list: 0x600002514570
2024-06-03T09:00:11.784665Z TRACE rdkafka::util: Destroying topic partition list: 0x6000025143f0
2024-06-03T09:00:11.784687Z TRACE rdkafka::util: Destroyed topic partition list: 0x6000025143f0
2024-06-03T09:00:11.785127Z TRACE rdkafka::client: Starting metadata fetch
2024-06-03T09:00:11.785145Z TRACE rdkafka::util: Destroying native topic: 0x7fc1e803c400
2024-06-03T09:00:11.785159Z TRACE rdkafka::util: Destroyed native topic: 0x7fc1e803c400
2024-06-03T09:00:11.795941Z TRACE rdkafka::client: Metadata fetch completed
2024-06-03T09:00:11.796194Z TRACE rdkafka::util: Destroying metadata: 0x7fc1d0808200
2024-06-03T09:00:11.796216Z TRACE rdkafka::util: Destroyed metadata: 0x7fc1d0808200
2024-06-03T09:00:11.796282Z INFO counter_kafka: Consumer created test-topic-3

tyt2y3 commented 4 months ago

doesn't ring a bell at first glance. can you try using a local Kafka cluster under docker? may be it has something to do with the server?

gadLinux commented 4 months ago

It's a docker kafka cluster of 1 node. Clean and created for the test. I can get the records with kcat and with offsetexplorer, java client. Will do some tests more... But I did a few already

gadLinux commented 4 months ago

Wiped both images zookeeper and kafka, and rebuilt everything and now it's fetching. I don't know why it happened only with rdfkafka, it should be a problem there... cause as I said I could read with other programs. Now it's fixed.

Love the lib, hope people starts using it and contribute...

tyt2y3 commented 3 months ago

Thank you