Closed aymkhalil closed 1 year ago
This approach is not correct. because at this point the actTimeout already started on the message and so messages will start being re-delivered.
if we want to throttle we have to do it in the Pulsar runtime.
There is already a setting, receiveQueueSize that you can use to tune the maximum number of messages dispatched to the Consumer but not yet passed to the application.
if you really want to add some throttling in the Sink you must do it "after" processing a message (after calling "acknowledge"), this way the Sink is slowing down but messages won't be redelivered.
In that case let me experiment with the consumer queue size settings and see if it mitigates the OOM errors. If it does the job, I can close this PR and we can decide later if we want to implement a sink throttling solution (or a generic reusable "batched" sink).
Configuring the receiverQueueSize
per topic consumer via inputSpecs worked well. Closing this PR for now (will reopen the sink explicit throttling discussion as needed.
Convert incomingList to a blocking queue to protect the sink from OOM.
Implementation notes:
2 * batchSize
so that:batchSize
on the flush path (as opposed to clear the entire list in the previous impl)isFlushing
variable is sufficient to ensure 1 flush task at any given time