In Pulsar Functions and Pulsar IO Connectors, there's a Pulsar Producer cache, that
doesn't expire the producers and doesn't have a maximum size.
This is a problem for functions/connectors that produce to a large number of topics.
Modifications
Extract common ProducerCache
common Producer cache with separate cache areas for
producers created by calling Context, SinkContext, SourceContext methods
producers created in Pulsar Sources
multiple topics are possible by returning destination topics by SinkRecord.getDestinationTopic call
Keep backwards compatibility on the cache key
useThreadLocalProducers will use the Thread's ID as part of the key so that each thread has it's own isolated producer
In Sources, the SinkRecord.getPartitionId() is used as part of the cache key
Cache size is limited by the number of partitions in the producer
Default limit of the cache is 10000 partitions in total. Older entries will get removed from the cache if the cache overflows
Default cache entry expiration in 300 seconds after last access
Motivation
In Pulsar Functions and Pulsar IO Connectors, there's a Pulsar Producer cache, that doesn't expire the producers and doesn't have a maximum size. This is a problem for functions/connectors that produce to a large number of topics.
Modifications
useThreadLocalProducers
will use the Thread's ID as part of the key so that each thread has it's own isolated producerDocumentation
doc
doc-required
doc-not-needed
doc-complete