apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.18k stars 3.57k forks source link

In Producer batching is not working with synchronous mode using send api #22439

Open ragaur-tibco opened 6 months ago

ragaur-tibco commented 6 months ago

Search before asking

Read release policy

Version

3.2.x

Minimal reproduce step

ProducerBuilder producerBuilderBatching = client.newProducer(Schema.STRING)
                                .accessMode("Shared")
                                .compressionType("NONE")
                                .enableBatching(true)
                                .blockIfQueueFull(true)
                                .batchingMaxMessages(5)
                                .batchingMaxBytes(13270)
                                .batchingMaxPublishDelay(70000, TimeUnit.MILLISECONDS);

ProducerBase<String> producerStringBatching = (ProducerBase<String>) producerBuilderBatching.topic("batch-test-topic")
                                .sendTimeout(20, TimeUnit.SECONDS).create();    

MessageId msgId = producerStringBatching.newMessage().value("Check batching").send();

What did you expect to see?

number of messages should increase from 0 to 4 in a single batch(as max number of messages configured in a single batch was 5) and below are the pulsar SDK logs which should print in console


2024-04-02T14:54:08,963 DEBUG [pool-22-thread-2] org.apache.pulsar.client.impl.BatchMessageContainerImpl - [batch-test-topic] [standalone_onessl-4-800] add message to batch, num messages in batch so far 0
2024-04-02T14:54:08,964 DEBUG [pool-22-thread-15] org.apache.pulsar.client.impl.BatchMessageContainerImpl - [batch-test-topic] [standalone_onessl-4-800] add message to batch, num messages in batch so far 1
2024-04-02T14:54:08,964 DEBUG [pool-22-thread-9] org.apache.pulsar.client.impl.BatchMessageContainerImpl - [batch-test-topic] [standalone_onessl-4-800] add message to batch, num messages in batch so far 2
2024-04-02T14:54:08,965 DEBUG [pool-22-thread-6] org.apache.pulsar.client.impl.BatchMessageContainerImpl - [batch-test-topic] [standalone_onessl-4-800] add message to batch, num messages in batch so far 3
2024-04-02T14:54:08,965 DEBUG [pool-22-thread-20] org.apache.pulsar.client.impl.BatchMessageContainerImpl - [batch-test-topic] [standalone_onessl-4-800] add message to batch, num messages in batch so far 4

just like working in async mode

What did you see instead?

number of messages in a single batch is always 0 while using the synchronous mode

2024-04-02T14:43:26,850 DEBUG [pool-22-thread-15] org.apache.pulsar.client.impl.BatchMessageContainerImpl - [batch-test-topic] [standalone_onessl-4-792] add message to batch, num messages in batch so far 0
2024-04-02T14:43:26,852 DEBUG [pool-22-thread-8] org.apache.pulsar.client.impl.BatchMessageContainerImpl - [batch-test-topic] [standalone_onessl-4-792] add message to batch, num messages in batch so far 0
2024-04-02T14:43:26,854 DEBUG [pool-22-thread-7] org.apache.pulsar.client.impl.BatchMessageContainerImpl - [batch-test-topic] [standalone_onessl-4-792] add message to batch, num messages in batch so far 0

Anything else?

No response

Are you willing to submit a PR?

dao-jun commented 6 months ago

For the purpose of reduce the sync-api send message latency, client will flush messages to broker when you call the sync-api. And sync-api will block your app's threads, if we don't flush messages to broker but waiting until the batch is full, it may block all the threads of your app.

/cc @gaoran10 @codelipenghui

lhotari commented 6 months ago

Thanks for the good issue report @ragaur-tibco !

lhotari commented 6 months ago

For the purpose of reduce the sync-api send message latency, client will flush messages to broker when you call the sync-api.

@dao-jun Yes, this is reasonable, but the problem is that this doesn't seem to be documented any where.

I found the flushing logic here: https://github.com/apache/pulsar/blob/ffff639a1b73a34bbb5115503d4c7783bb2a2770/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java#L82-L86

I think @ragaur-tibco's issue report is great and the minimum resolution is to document the behavior. It seems that there could be a use case where one would want to achieve batching while using the synchronous API. That's not currently supported. We have 2 options for resolving this:

  1. Documenting the behavior and making it explicit that synchronous send will trigger flushing of the current batch message.
  2. Adding support for controlling the triggering behavior so that batching would be possible also with the synchronous API. Since the caller thread will be blocked until the batch is complete, it would be useful only in cases where other caller threads could contribute to the same batch.

I'm personally in favor of option 1., but I'm open for option 2. if someone supports that and the change goes through the typical PIP process we have in Pulsar.

dao-jun commented 6 months ago

@lhotari I think support batch message for sync-api has potential risks, as I mentioned before, if we don't flush messages but waiting until the batch is full, it may block users' app threads for a long while. Especially there are many producers send message in batch in a single app, it may block all the threads.

I prefer option 1, and I'll fix the doc later.