scylladb / kafka-connect-scylladb

Kafka Connect Scylladb Sink
Apache License 2.0
42 stars 22 forks source link

Remove CQL batching from connector #36

Closed avelanarius closed 3 years ago

avelanarius commented 3 years ago

Remove batching from ScyllaDbSinkTask. The rows are now inserted one-by-one in parallel (1 Future per 1 INSERT).

Using (unlogged) batches was not correct as:

  1. The heuristic calculating batch size was incorrect, resulting in warning log messages (batch too big).
  2. The batches could span multiple Scylla partitions, in which case they are not recommended and could lead to degraded performance.

I observed better performance and better CPU utilization after this fix.

I also removed inserting offsets in (logged!) batches - I don't think there was any reason for it, and this logged batch could touch rows in many different Scylla nodes (across multiple partitions).

Removed scylladb.timestamp.resolution.ms, as this is not relevant if we are not doing batches. Similarly, removed scylladb.max.batch.size.kb.

tarzanek commented 3 years ago

couldn't this just batch by partition? then you would be able to get more out of it than single statements (and I originally thought they batch by PK)

avelanarius commented 3 years ago

Previously it was not batching correctly:

  1. It was not batching by PK - in a single batch there were multiple PKs
  2. Batch size calculation was off - generated batches were too large, causing thousands of warnings in Scylla log ("batch too large")

Of course, I don't reject the possibility of bringing this back in the future (properly implemented). But to do it properly, we should do proper benchmarks and measure its speedup. In the meantime, I made a decision to remove this functionality.

The other consideration is: if we batched by PK, would we gain anything in a typical Kafka workload? By default, messages in Kafka are partitioned by hash of key. That means that the messages are partitioned by: murmur2(avro/json(pk1, pk2, ..., ck1, ck2)) which is a different partitioning scheme than Scylla's murmur3(pk1, pk2, ...) - messages in the same Scylla partition are in different Kafka partitions.