microsoft / kafka-connect-cosmosdb

Kafka Connect connectors for Azure Cosmos DB
MIT License
49 stars 54 forks source link

Batching of updates/inserts #398

Open kahole opened 3 years ago

kahole commented 3 years ago

## Problem Statement Writes to Cosmos from the sink are done one by one causing a lot of traffic and RUs to be expended.

## Proposed Solution Batching updates/inserts that come in together. If 50 updates come in in a matter of e.g. 0.5 seconds, they should be batched together. The way this is usually done is by having a window that opens when receiving a message and closes after x amount of time. Everything arriving in that timespan will be part of the same batch update.

## Additional context Other Kafka sinks in the same style as this one has this functionality. JDBC:

See batch.size here https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/sink_config_options.html.

Interesting discussion about batching in JDBC: https://github.com/confluentinc/kafka-connect-jdbc/issues/290 Seems to be adjustable to an extent using some other configuration as well:

consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
consumer.max.poll.records=4000

Next Steps

ryancrawcour commented 3 years ago

It appears this is quite a common pattern in sink connectors to provide batching as a knob for tuning throughput. Connectors such as S3 sink, mongodb sink, JDBC sink, HTTP sink, etc all do this.

The prerequisite here is that the underlying client needs to provide a batching interface for us to call or at least be able to receive some concept of batched records with a separator. Each connector typically implements its own batch.size type config to configure the batch size before it sends a write to the backend.

The implementation needs to be careful about offset tracking to only advance the offset on the topic/partition of records that have been acked by the backend.

This can be done in a couple ways

Since the interface to Kafka is a batch interface (put method takes a collection of records as argument, the user gets to toggle things on the consumer such as max.poll.records to adjust this batch size) , it is typically more efficient to have batch writes to the external system as well to minimize on network round trips.

*preferred implementation

ryancrawcour commented 3 years ago

The Java SDK for Cosmos DB does not have batching support, yet. It is currently in preview, once this feature has been stabilized and released as GA then we can adopt batching of inserts.

ryancrawcour commented 3 years ago

If the user configures max.poll.records to adjust the size of the batches read from Kafka, and then the connector exposes a batch.size to control its batch size that is written, would these two different configurations need to be the same?

 _The batch sizes don't necessarily need to be the same, as long as the connector is correctly implementing offset tracking so that only persisted records have their offsets advanced._

What happens if the user configures a max.poll.records of 100 and configures the connector batch.size to be 20? Now we’d get 100 records, but write 20 at a time to Cosmos DB? This is where we can run in to some issues because we haven’t yet written 80 records and there is a possibility of losing these. Is this where the precommit hook comes in to play?

_In this example, we'd get up to 100 records in each iteration of `poll` . Assuming that the connector does a commit/write every 20 records, we should do up to 5 write operations to the backend for each `poll` . If we fail half way through commiting a batch, we would resume from when we last had committed offsets. This means we might redeliver some records if the connector task failed but we didn't get a chance to commit the offset yet. In most cases, this at-least-once delivery is good enough. With the cosmos connector in upsert mode and a deterministic ID on the record, we simply get duplicate upserts, which is not ideal, but also not harmful._
pkleindl commented 2 years ago

@ryancrawcour Just a question, from the way the CosmosDBSinkTask is implemented, if I understand it correctly the upserts are done sequentially. Even without batching, wouldn't it be a improvement if those were done in parallel to make more use of the connection?