Open sheryl-susman opened 1 year ago
Looks like a bug. I will create an internal issue for our team.
@sheryl-susman You may be encountering the default limit of 500 records defined by the max.poll.records configuration: https://kafka.apache.org/documentation/#consumerconfigs_max.poll.records .
In order to see larger batches, you will need both the consumer and the connector configured to use larger batches.
You can configure this worker-wide, or just for one connector via the consumer.override.max.poll.records
client overrides feature: https://kafka.apache.org/documentation/#connect_running
Based on the batch implementation in the connector, I expect that it is actually accumulating multiple read poll()s into a single write to OpenSearch, but if the bottleneck is the consumer, that won't substantially improve the throughput.
I'm working with version 2.0.4 of the opensearch connector, to sink to AWS Opensearch cluster version 2.3. We've activated TRACE level for org.apache.kafka.connect.runtime.WorkerSinkTask, and we're seeing that the batch size is never above 500, even when the specific consumer for that partition is tens of thousands of messages behind. This is my elasticsearch sink configuration: { "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "type.name": "_doc", "behavior.on.null.values": "delete", "tasks.max": "24", "connection.timeout.ms": "3000", "max.retries": "10", "key.ignore": "false", "retry.backoff.ms": "1000", "max.buffered.records": "100000", "errors.deadletterqueue.topic.replication.factor": "1", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter": "org.apache.kafka.connect.converters.IntegerConverter", "read.timeout.ms": "10000", "behavior.on.version.conflict": "warn", "topics": "SEARCH", "batch.size": "10000", "max.in.flight.requests": "25", "schema.ignore": "false", "behavior.on.malformed.documents": "fail", "value.converter.schema.registry.url": "http://cp-schema-registry:8081", "flush.timeout.ms": "20000", "errors.deadletterqueue.topic.name": "dlq_search", "name": "ELASTICSEARCH_SINK", "errors.tolerance": "all", "connection.url": "https://....", "linger.ms": "1000" }
Is there some hard limitation on the batch size of 500? Even when I change the config value of batch.size to 400, I see in the trace "batch.size = 400 ....Delivering batch of 500 messages to task". Can you help me understand how to control the batch size?
Thank you.