streamnative / pulsar-rs

Rust Client library for Apache Pulsar
Other
369 stars 121 forks source link

Missing `batchingMaxPublishDelay` option #306

Open jiangpengcheng opened 9 months ago

jiangpengcheng commented 9 months ago

In Java client, there is a batchingMaxPublishDelay option, which is 1 millisecond by default:

https://github.com/apache/pulsar/blob/02147454c425b92f0cd1caefa73b9339db6a0269/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java#L245

    public ProducerBuilder<T> batchingMaxPublishDelay(long batchDelay, @NonNull TimeUnit timeUnit) {
        conf.setBatchingMaxPublishDelayMicros(batchDelay, timeUnit);
        return this;
    }

And:

    @ApiModelProperty(
            name = "batchingMaxPublishDelayMicros",
            value = "Batching time period of sending messages."
    )
    private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);

Without such an option, if we set the batch_size to 1000 in pulsar-rs, and there are only 500 messages to be sent, then no messages will be sent to the Pulsar broker side at all:

use pulsar::{Authentication, ProducerOptions, Pulsar, TokioExecutor};

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "pulsar://localhost:6650".to_string();
    let mut builder = Pulsar::builder(addr, TokioExecutor);

    let pulsar: Pulsar<_> = builder.build().await?;
    let mut producer = pulsar
        .producer()
        .with_topic("persistent://public/default/test-batch")
        .with_options(ProducerOptions {
            encrypted: None,
            metadata: Default::default(),
            schema: None,
            batch_size: Some(1000),
            batch_byte_size: Some(128 * 1024),
            compression: None,
            access_mode: None,
        })
        .build()
        .await?;

    let mut counter = 0usize;
    loop {
        producer
            .send(format!("Hello-{}", counter))
            .await
            .unwrap();

        counter += 1;
        println!("{counter} messages");

        if counter >= 500 {
            producer.close().await.expect("Unable to close connection");
            break;
        }
    }

    Ok(())
}

It looks like there is a buffer in the backend, and only when the buffer is full(size == batch_size), the pulsar-rs will do the actual sending work.?

I think the batch sending needs to be triggered when one of the below conditions is met:

  1. the buffer is full

  2. a given timeout, such as 1 millisecond