fede1024 / rust-rdkafka

A fully asynchronous, futures-based Kafka client library for Rust based on librdkafka
MIT License
1.59k stars 268 forks source link

Mocking doesn't work #629

Open connormullett opened 10 months ago

connormullett commented 10 months ago

We use this library in production and it's been great so far. We noticed that there was a new mocking module and upgraded to help extend test coverage to ensure stability.

I'm getting the error KafkaError (Client creation error: librdkafka failed to create consumer queue) which I found sits really close to the C code in the base library. Our mocks look like the following and have used the example in examples/mocking.rs for reference.

fn create_mock_cluster<'c>() -> MockCluster<'c, DefaultProducerContext> {
        let cluster = MockCluster::new(3).expect("failed to create mock cluster");
        cluster
            .create_topic("<our topic>", 32, 3)
            .expect("failed to set topic");
        cluster
    }

    fn create_mock_producer(bootstrap_servers: String) -> FutureProducer {
        ClientConfig::new()
            .set("bootstrap.servers", bootstrap_servers)
            .create()
            .expect("failed to create mock producer")
    }

    fn create_mock_consumer(bootstrap_servers: String) -> StreamingConsumer {
        StreamingConsumer::mock(bootstrap_servers)
    }

We use the new type pattern on StreamConsumer for additional mocking and expectations such as committing. That's where StreamingConsumer comes from.

Because of the above error, I cloned the repo and attempted to run the example mocking.rs locally and failed with the following.

     Running `target/debug/examples/mocking`
%5|1699901725.314|CONFWARN|rdkafka#producer-1| [thrd:app]: No `bootstrap.servers` configured: client will not be able to connect to Kafka cluster
Warming up for 10s...
Recording for 10s...
measurements: 1938
mean latency: 255.35655314757489ms
p50 latency:  255ms
p90 latency:  456ms
p99 latency:  501ms
%3|1699901745.548|FAIL|rdkafka#producer-2| [thrd:127.0.0.1:42977/bootstrap]: 127.0.0.1:42977/1: Connect to ipv4#127.0.0.1:42977 failed: Connection refused (after 0ms in state CONNECT)
%3|1699901745.548|FAIL|rdkafka#producer-2| [thrd:127.0.0.1:33881/bootstrap]: 127.0.0.1:33881/2: Connect to ipv4#127.0.0.1:33881 failed: Connection refused (after 0ms in state CONNECT)
%3|1699901745.548|FAIL|rdkafka#producer-2| [thrd:127.0.0.1:37597/bootstrap]: 127.0.0.1:37597/3: Connect to ipv4#127.0.0.1:37597 failed: Connection refused (after 0ms in state CONNECT)

Because the example failed, I'm optimistic that this is something on my end but I can't find any documentation surrounding this.

Thanks for looking into this

scanterog commented 10 months ago

@connormullett The example actually works and finishes as expected though it's indeed confusing we're getting bootstrap connection refused failures being logged (that might be a bug on librdkafka). But if you check the code, the consumer actually receives messages and the latency is actually measured.

Client creation error: librdkafka failed to create consumer queue

This sounds like group.id was not provided to the consumer. What is StreamingConsumer::mock doing?

connormullett commented 10 months ago

This sounds like group.id was not provided to the consumer. What is StreamingConsumer::mock doing?

This was the issue and it works!

Does is make sense here to have a message to check configuration in case of these types of errors?

scanterog commented 10 months ago

Does is make sense here to have a message to check configuration in case of these types of errors?

mm I need to check but I think the latest does it already. Which version are you using?

connormullett commented 9 months ago

Which version are you using?

0.35.0

bretthoerner commented 9 months ago

Something else seems funny about the Mock consumer, given this diff:

git --no-pager diff
diff --git a/examples/mocking.rs b/examples/mocking.rs
index c4a1d1c4..8b967e25 100644
--- a/examples/mocking.rs
+++ b/examples/mocking.rs
@@ -38,7 +38,7 @@ async fn main() {
                 .send_result(
                     FutureRecord::to(TOPIC)
                         .key(&i.to_string())
-                        .payload("dummy")
+                        .payload(&i.to_string())
                         .timestamp(now()),
                 )
                 .unwrap()
@@ -54,6 +54,7 @@ async fn main() {
     println!("Warming up for 10s...");
     loop {
         let message = consumer.recv().await.unwrap();
+        println!("{:?}", message.payload_view::<str>().unwrap());
         let then = message.timestamp().to_millis().unwrap();
         if start.elapsed() < Duration::from_secs(10) {
             // Warming up.

The first message I see via the consumer is around ~i=500. This makes it useful for benchmarking, but rough for testing messages produced by some bit of code.

If I only send a small handful of events my consumer never finishes the first recv().await call. (i.e., change the example to produce 1 message.) I messed with consumer configuration for a while (I was using the default, exactly like the example), thinking this had to do with fetch.min.bytes or fetch.wait.max.ms or something, but nothing seemed to work.

Plus, I feel like the fact that this example (with my diff) doesn't start by printing 0, but instead only picks up at some random point around ~500, is proof that messages produced to the mock cluster are being lost? It's like some buffer has to be filled, but then a big chunk of the first messages in the topic are lost?

bretthoerner commented 9 months ago

Ahhh, I finally checked out the mocking code, and saw the unit test inside, which publishes and consumes 1 item and works. The only difference was .set("auto.offset.reset", "earliest")

I guess there's something about Mocking topic creation where latest (the default) doesn't work as I'd expect. I just created the topic and subscribed with the consumer before I published anything, and I didn't see anything.

Changing to earliest worked, which is good enough for my needs.