streamnative / pulsar-rs

Rust Client library for Apache Pulsar
Other
369 stars 121 forks source link

feat: Change send to not block on waiting receipt #298

Closed cirias closed 1 year ago

cirias commented 1 year ago

This is a follow up of https://github.com/streamnative/pulsar-rs/pull/297.

I figured it's probably better to reuse the SendFuture for async send, since the batch send already delays the resolve of SendFuture.

mattisonchao commented 1 year ago

Hi, @cirias Btw, do you have any reports for performance improvement?

cirias commented 1 year ago

Hi, @cirias Btw, do you have any reports for performance improvement?

I've done some not very scientific test. My goal is to send the messages in 40 milliseconds for each, then the messages per second would be 1000ms / 40ms = 25mps. Without the change, it can only reach 4mps. While after applying the change, it reaches 25mps easily. I'll attach the code I used for testing below. Also notice that the ping from the test machine to the pulsar cluster is about 200ms, which probably reduces the mps significantly when each send is blocked for the receipt.

async fn produce(
    mut producer: pulsar::Producer<pulsar::TokioExecutor>,
) -> Result<(), BoxedError> {
    let mut count = 0;
    let mut itv = tokio::time::interval(std::time::Duration::from_millis(40));
    let start_time = std::time::Instant::now();
    loop {
        itv.tick().await;
        let message = mock_message();
        producer.send(message).await?;
        count += 1;
        let mps = count as f32 / start_time.elapsed().as_secs_f32();
        tracing::info!("messages per second: {mps:.2}");
    }
}

Edit: I also have a consumer running on the same test machine. The message receiving rate matches to the producing rate.

tisonkun commented 1 year ago

@cirias do you expect a new release for this patch? If so, I can prepare a 6.1.0 version for it.

cirias commented 1 year ago

@cirias do you expect a new release for this patch? If so, I can prepare a 6.1.0 version for it.

Yes! I'm currently referencing the commit hash directly in Cargo.toml. But it would be much better to have a version. Thank you!

tisonkun commented 1 year ago

@cirias Thanks for your information!

I'm going to include https://github.com/streamnative/pulsar-rs/pull/299 also in 6.1.0 and will appreciate it if you can take a look at its description and changes to check the manner "fix" align with your use case - that is, do not auto redirect broker URL to tls scheme if non-tls broker URL set.

If it's OK, I'll merge it and shceulde the release tomorrow :D

cirias commented 1 year ago

@tisonkun Thanks for the information!

The PR looks good to me. It shouldn't cause problems in my use case. I left some comments but it's totally fine for you to merge and release. Good luck!

tisonkun commented 1 year ago

v6.1.0 is avaiable now - https://crates.io/crates/pulsar/6.1.0