scylladb / scylla-cdc-source-connector

A Kafka source connector capturing Scylla CDC changes
Apache License 2.0
46 stars 18 forks source link

Detected performance impact query #15

Open pkgonan opened 2 years ago

pkgonan commented 2 years ago

In scylla-cdc-source-connector, the following query pattern is used to retrieve the changed data from the CDC table.

SELECT * FROM test.user_scylla_cdc_log WHERE "cdc$stream_id" IN ? AND "cdc$time">? AND "cdc$time"<=?;

"cdc$stream_id" IN ? The query pattern seems to be sending a lookup request to all the ScyllaDB Cluster Nodes. So ScyllaDB will see spikes in load and crash.

How about improving these query patterns by looking up and merging them in parallel inside the CDC Connector?

SELECT * FROM test.user_scylla_cdc_log WHERE "cdc$stream_id" = "stream_id_A" AND "cdc$time">? AND "cdc$time"<=?;

SELECT * FROM test.user_scylla_cdc_log WHERE "cdc$stream_id" = "stream_id_B" AND "cdc$time">? AND "cdc$time"<=?;

SELECT * FROM test.user_scylla_cdc_log WHERE "cdc$stream_id" = "stream_id_C" AND "cdc$time">? AND "cdc$time"<=?;

....

SELECT * FROM test.user_scylla_cdc_log WHERE "cdc$stream_id" = "stream_id_Z" AND "cdc$time">? AND "cdc$time"<=?;

Then, merge all query results in source connector.

Currently, ScyllaDB has failed after attaching the Source Connector in the production environment.


[ScyllaDB Cluster Environment]

[Scylla CDC Source Connector Configuration]

  "connector.class": "com.scylladb.cdc.debezium.connector.ScyllaConnector",
  "tasks.max": "3",
  "scylla.cluster.ip.addresses": "'${database_url}'",
  "scylla.user": "'${database_user}'",
  "scylla.password": "'${database_password}'",
  "scylla.name": "cdc-data.test",
  "scylla.table.names": "test.a,test.b",
  "scylla.query.time.window.size": "10000",
  "scylla.confidence.window.size": "5000",
  "scylla.consistency.level": "LOCAL_QUORUM",
  "scylla.local.dc": "AWS_US_WEST_1",
  "producer.override.acks": "-1",
  "producer.override.max.in.flight.requests.per.connection": "1",
  "producer.override.compression.type": "snappy",
  "producer.override.linger.ms": "50",
  "producer.override.batch.size": "327680",
  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "topic.creation.default.replication.factor": "3",
  "topic.creation.default.partitions": "11"
스크린샷 2021-11-05 오후 3 34 29
pkgonan commented 2 years ago

@avelanarius Hi, what do you think about the above issue? The code to generate the query is in another project. (https://github.com/scylladb/scylla-cdc-java/blob/ab828eb0305171ffa48d6114dc73aaba0257ffdc/scylla-cdc-driver3/src/main/java/com/scylladb/cdc/cql/driver3/Driver3WorkerCQL.java#)

Connecting the ScyllaDB CDC source connector will cause too many queries to ScyllaDB, causing spikes in CPU load and facing timeouts and failures. I suspect this issue is caused by a query pattern.

haaawk commented 2 years ago

Hi @pkgonan

Please see my answers below.

In scylla-cdc-source-connector, the following query pattern is used to retrieve the changed data from the CDC table.

SELECT * FROM test.user_scylla_cdc_log WHERE "cdc$stream_id" IN ? AND "cdc$time">? AND "cdc$time"<=?;

"cdc$stream_id" IN ? The query pattern seems to be sending a lookup request to all the ScyllaDB Cluster Nodes. So ScyllaDB will see spikes in load and crash.

This is not exactly correct. All the stream ids used for a single query should live on the same set of replicas so if the RF = 3. A select should affect only 3 nodes in the cluster.

How about improving these query patterns by looking up and merging them in parallel inside the CDC Connector?

It's not immediately obvious that splitting the queries will improve the performance.

On one hand, all partition keys used (stream ids) should leave on the same set of nodes so grouping them should actually help performance.

On the other hand, those partition keys (stream ids) can leave on different shards and it might turn out that splitting the queries and sending them one by one, each to the right shard will be more efficient.

If from your experiments splitting the queries helps then we can certainly do that.

Currently, ScyllaDB has failed after attaching the Source Connector in the production environment.

What was the reason of the failure? Was the cluster just overloaded?

At the same time, could you please share more context on your use case @pkgonan ? That would make it easier for us to understand the problem.

Thanks, Piotr

pkgonan commented 2 years ago

@haaawk Hi. There was a requirement to add ScyllaDB's CDC Table to Kafka Connect Cluster.

So, when I register the ScyllaDB Source Connector with the Kafka Connect Cluster, the read requests increase dramatically.

As a result, the load of ScyllaDB Nodes increased rapidly and the CPU increased. Systems using ScyllaDB have crashed due to ScyllaDB read and write timeouts. (See, read & write latency at the images registered in the body)

If you look at the images registered in the body, you can see that the number of read requests has skyrocketed. (20 kops/s -> 60 kops/s)

haaawk commented 2 years ago

It is somehow expected to see the increase in the number of reads. I see that you have 24 nodes meaning that there will be 24 256 number of CPUs per node streams. The aggregated IN select will be executed 24 * 256 times per some period of time. That's ~6k which does not explain why you see the increase by 40k .

pkgonan commented 2 years ago

@haaawk Usually, ScyllaDB is receiving 20k requests, and after registering the ScyllaDB Source Connector, the number of requests has skyrocketed several times. Afterwards, we recognized that there was a failure in other services using ScyllaDB in common and removed the registered ScyllaDB Source Connector. As soon as it is removed, you can see that ScyllaDB's read requests returned to the usual number of 20k.

I remember that when I registered ScyllaDB Source Connectors in the Kafka Connect Cluster in the past, the read requests increased several times for a certain period of time at the beginning, and the read requests returned to the normal level after a certain period of time.

Based on this, I suspect there may be a problem with the source code making the read request when first registering the ScyllaDB Source Connector.

haaawk commented 2 years ago

d removed the registered ScyllaDB Source Connector. As soon as it is removed, you can see that ScyllaDB's read requests returned to the usual number of 20k.

I remember that when I registered ScyllaDB Source Connectors in the Kafka Connect Cluster in the past, the read requests increased several times for a certain period of time at the beginning, and the read requests returned to the normal level after a certain period of time.

Based on this, I suspect there may be a problem with the source code making the read request when first registering the ScyllaDB Source Connector.

That's a very good point. We're usually trying to select a bit quicker at the beginning to catch up but maybe we do this too aggressively. @avelanarius what do you think?

pkgonan commented 2 years ago

@haaawk @avelanarius I currently operate a large Kafka Connect Cluster, and I have 3 ~ 4 CDC Tables registered as ScyllaDB Source Connector.

I need to register at least 3 additional ScyllaDB Source Connectors. However, due to this problem, we are no longer able to register the ScyllaDB Source Connector.

If I additionally register the ScyllaDB Source Connector, the load on ScyllaDB increases rapidly and failures occur in the systems using ScyllaDB.

Therefore, i could not register the new ScyllaDB Source Connector and was waiting for an answer to be registered in the current issue.

Thank you for your quick reply.

haaawk commented 2 years ago

@pkgonan Thanks for sharing. @avelanarius Could you please look at making the startup of the connector less demanding for Scylla?

Why do you need more than one ScyllaDB Source Connector, @pkgonan? Are you putting data on different topics? If so maybe we could change the connector to allow that with a single instance? That would greatly reduce the traffic to Scylla.

pkgonan commented 2 years ago

@haaawk Many microservices each use ScyllaDB, and each needs to use the CDC feature.

Therefore, each microservice must apply the ScyllaDB Source Connector for each keyspace it uses.

1 Microservice == 1 Keyspace == 1 ScyllaDB Source Connector

Each ScyllaDB Source Connector`s tasks.max is 3. We can change to 1. (tasks.max 3 -> 1)

haaawk commented 2 years ago

I see.

hartmut-co-uk commented 2 years ago

pkgonan: I remember that when I registered ScyllaDB Source Connectors in the Kafka Connect Cluster in the past, the read requests increased several times for a certain period of time at the beginning, and the read requests returned to the normal level after a certain period of time.

As I understand from my own debugging - the initial high load comes from the connector 'catching up' from beginning of the CDC table log, is this right? Though even for tables created immediately before creating the connector - it seems the connector begins querying 24h into the past.

Based on the previous calculations

haaawk: It is somehow expected to see the increase in the number of reads. I see that you have 24 nodes meaning that there will be 24 256 number of CPUs per node streams. The aggregated IN select will be executed 24 * 256 times per some period of time. That's ~6k which does not explain why you see the increase by 40k .

I think it might result in another multiplier 24h / "scylla.query.time.window.size". -> 24(h) 60(min) 60(s) / 10(window.size) * 6k = 51.84M queries

@haaawk Is there some logic reading the table CDC TTL (24h default) and backdating the connector 'start time'?

haaawk commented 2 years ago

pkgonan: I remember that when I registered ScyllaDB Source Connectors in the Kafka Connect Cluster in the past, the read requests increased several times for a certain period of time at the beginning, and the read requests returned to the normal level after a certain period of time.

As I understand from my own debugging - the initial high load comes from the connector 'catching up' from beginning of the CDC table log, is this right?

That would be my understanding.

Though even for tables created immediately before creating the connector - it seems the connector begins querying 24h into the past.

Based on the previous calculations

haaawk: It is somehow expected to see the increase in the number of reads. I see that you have 24 nodes meaning that there will be 24 256 number of CPUs per node streams. The aggregated IN select will be executed 24 * 256 times per some period of time. That's ~6k which does not explain why you see the increase by 40k .

I think it might result in another multiplier 24h / "scylla.query.time.window.size". -> 24(h) 60(min) 60(s) / 10(window.size) * 6k = 51.84M queries

@haaawk Is there some logic reading the table CDC TTL (24h default) and backdating the connector 'start time'?

Yes. The connector assumes there might be data as old as now - CDC TTL

pkgonan commented 2 years ago

@haaawk @hartmut-co-uk The test was performed on a new empty table that does not generate cdc log. The problem arises even though it is a table with no create, modify, delete, select queries.

haaawk commented 2 years ago

@haaawk @hartmut-co-uk The test was performed on a new empty table that does not generate cdc log. The problem arises even though it is a table with no create, modify, delete, select queries.

It might happen that we do the queries anyway even though they return nothing. We should optimize this case. @avelanarius could you please look at this issue?

hartmut-co-uk commented 2 years ago
  1. Maybe allow to configure the connector (+scylla-cdc-java) to start reading CDC from either earliest|latest?
  2. Also while 'far behind' maybe a bigger window.size could be used to catch up?
haaawk commented 2 years ago
  1. Maybe allow to configure the connector (+scylla-cdc-java) to start reading CDC from either earliest|latest?

This can be done easily but will require the user not to send any traffic until the connector is running. Another option is to specify in config how far back the connector should start.

  1. Also while 'far behind' maybe a bigger window.size could be used to catch up?

This may be problematic if there's a lot of data accumulated. I guess paging would do the trick so we could try this as well.

hartmut-co-uk commented 2 years ago

Regarding window.size - I think it's also sort of a general issue when trying to reduce the latency (how long it takes from X := write to scylla table ... until Y := message on Kafka topic).

I can see the original post uses

  "scylla.query.time.window.size": "10000",
  "scylla.confidence.window.size": "5000",

Where I was testing/evaluating even more aggressively with e.g.

  "scylla.query.time.window.size": "2000",
  "scylla.confidence.window.size": "2000",
  "poll.interval.ms": "1000",

trying to get this latency <5s.

Once the connector is running and caught up it seemed somewhat stable and OK - but as mentioned - when started anew or after some downtime - it's a huge spike in utilisation (even if there's literally no data in the cdc log table...).

I guess what I'm trying to say is - configuring a small window size (e.g. 2s) results in a massive number of queries...

hartmut-co-uk commented 2 years ago

ref/related: https://github.com/scylladb/scylla-cdc-source-connector/issues/16

Apologies @pkgonan I didn't mean to hijack this issue thread.

haaawk commented 2 years ago

Regarding window.size - I think it's also sort of a general issue when trying to reduce the latency (how long it takes from X := write to scylla table ... until Y := message on Kafka topic).

I can see the original post uses

  "scylla.query.time.window.size": "10000",
  "scylla.confidence.window.size": "5000",

Where I was testing/evaluating even more aggressively with e.g.

  "scylla.query.time.window.size": "2000",
  "scylla.confidence.window.size": "2000",
  "poll.interval.ms": "1000",

trying to get this latency <5s.

Once the connector is running and caught up it seemed somewhat stable and OK - but as mentioned - when started anew or after some downtime - it's a huge spike in utilisation (even if there's literally no data in the cdc log table...).

I guess what I'm trying to say is - configuring a small window size (e.g. 2s) results in a massive number of queries...

At the moment, due to the eventual consistency nature of Scylla we need to keep the confidence window big enough to allow delayed writes to reach the replicas before we query for the window that contains those writes. It is not recommended to reduce the confidence window size as it can lead to some updates being missed. In the future with a consistent raft-based tables we will be able to relax this restriction.

hartmut-co-uk commented 2 years ago

🙇 Oh! Great to hear raft will also be beneficial for this use case. May I kindly ask on which release train this is?

haaawk commented 2 years ago

🙇 Oh! Great to hear raft will also be beneficial for this use case. May I kindly ask on which release train this is?

I'm not sure. Raft is being developed by another team and I'm not even sure there is an official release date already announced.

hartmut-co-uk commented 2 years ago
  1. Maybe allow to configure the connector (+scylla-cdc-java) to start reading CDC from either earliest|latest?

This can be done easily but will require the user not to send any traffic until the connector is running. Another option is to specify in config how far back the connector should start.

added https://github.com/scylladb/scylla-cdc-source-connector/issues/17

mykaul commented 1 year ago

bow Oh! Great to hear raft will also be beneficial for this use case. May I kindly ask on which release train this is?

I'm not sure. Raft is being developed by another team and I'm not even sure there is an official release date already announced.

Raft is enabled by default - for schema changes - in 5.2. Consistent tables is still in the backlog, but we are moving forward (after migrating topology changes and so on to the Raft).