scylladb / kafka-connect-scylladb

Kafka Connect Scylladb Sink
Apache License 2.0
42 stars 22 forks source link

Wrong replication of Kafka messages happening in the same millisecond #20

Open avelanarius opened 3 years ago

avelanarius commented 3 years ago

Overview

When sending Kafka messages quickly, some of them might have the same timestamp (millisecond precision). If multiple messages regarding the same row happen on a single millisecond, the Connector incorrectly applies only the first message (in that millisecond) onto the database, because it relies on the timestamp for determining order.

The environment I was testing the Connector on involved only a single partition. Kafka guarantees preserving the order of messages within a partition (partition offset). The exact scenario can also happen on a multi-partition topic, as many Kafka producers send messages with the same key to the same partition, so messages regarding the same row will end up in the same Kafka topic partition.

As you will see in "Futher investigation" the Connector already receives the messages in correct order (in my testing), but is unable to apply them correctly. A partition offset is also available to determine the correct order.

Reproduction

  1. Start up Confluent, ScyllaDB Sink Connector. Set up Connector with topic t.

  2. Download the input file: input. It consists of 10 operation triplets, which add a row with v = 0, delete the row and add it again with v = 1. Therefore, the final table should contain 10 rows with v = 1:

    {"pk":{"int":1},"ck":{"int":1}}${"ks.t.value_schema":{"pk":{"int":1},"ck":{"int":1},"v":{"int":0}}}
    {"pk":{"int":1},"ck":{"int":1}}$null
    {"pk":{"int":1},"ck":{"int":1}}${"ks.t.value_schema":{"pk":{"int":1},"ck":{"int":1},"v":{"int":1}}}
    {"pk":{"int":2},"ck":{"int":2}}${"ks.t.value_schema":{"pk":{"int":2},"ck":{"int":2},"v":{"int":0}}}
    {"pk":{"int":2},"ck":{"int":2}}$null
    {"pk":{"int":2},"ck":{"int":2}}${"ks.t.value_schema":{"pk":{"int":2},"ck":{"int":2},"v":{"int":1}}}
    [... MORE ...]
  3. Using the kafka-avro-console-producer provided by Confluent to write messages from input:

    bin/kafka-avro-console-producer --broker-list localhost:9092 --topic t --property parse.key=true \ 
    --property key.schema='{"fields":[{"name":"pk","type":["null","int"]},{"name":"ck","type":["null","int"]}],"name":"key_schema","namespace":"ks.t","type":"record"}'  \
    --property "key.separator=$" --property value.schema='["null",{"fields":[{"name":"pk","type":["null","int"]},{"name":"ck","type":["null","int"]},{"name":"v","type":["null","int"]}],"name":"value_schema","namespace":"ks.t","type":"record"}]' \
    --timeout 100 --request-required-acks 0 < input
  4. Select rows in the destination table:

    SELECT * FROM test.t;

    Got:

image

Expected: 10 rows with v = 1 and pk, ck from 1 to 10.

Futher investigation

Using kafka-avro-console-consumer I verified the messages were sent in the correct order (value part only shown):

image

After adding additional log statements in the Connector, (surprisingly?) it also received the messages in the correct order:

image

Root cause

https://github.com/scylladb/kafka-connect-scylladb/blob/aa89618ccbab7aa2d14da67c7b86a305dd1e914d/src/main/java/io/connect/scylladb/ScyllaDbSinkTaskHelper.java#L94-L98

The Connector uses a setDefaultTimestamp() method with the timestamp of a Kafka message. It is translated into CQL USING TIMESTAMP when executing a query and it prevents execution of next queries with timestamp lesser or equal.

In the reproduction example, row with pk = 1, ck = 1 is missing. It is caused by the DELETE and INSERT operation having the same USING TIMESTAMP, so INSERT is ignored:

image

There is another smaller issue in those lines in ScyllaDbSinkTaskHelper.java: setDefaultTimestamp() expects a epoch timestamp in microseconds, but a millisecond timestamp (from Kafka) is assigned, so the WRITETIME in database is off by a factor of 1000:

image

hartmut-co-uk commented 2 years ago

hmm since Kafka timestamp is in milliseconds, how about adding the kafka record offset % 1000 (mod 1000) as microseconds?

e.g. with offset of below first kafka msg at 32176 -> 32176 % 1000 = 176

Offset: 32176, CreateTime:1595177491954    {pk...}    -> USING TIMESTAMP 1595177491954176
Offset: 32177, CreateTime:1595177491978.   null.      -> USING TIMESTAMP 1595177491978177
Offset: 32178, CreateTime:1595177491978.   {pk...}    -> USING TIMESTAMP 1595177491978178
hartmut-co-uk commented 2 years ago

seems the timestamp incorrect unit has already been fixed in https://github.com/scylladb/kafka-connect-scylladb/pull/29

hartmut-co-uk commented 2 years ago

but not yet released? I was testing with Version 1.0.0 which still has old timestamps... https://www.confluent.io/hub/scylladb/kafka-connect-scylladb

hartmut-co-uk commented 2 years ago

I'd create PR for the proposed solution, what do you think @avelanarius?

hartmut-co-uk commented 2 years ago

changed into draft, didn't consider the order might actually be reversed when the offset happens to be passing *999...1000 🤐

Will give that some more thought...