scylladb / scylla-cdc-source-connector

A Kafka source connector capturing Scylla CDC changes
Apache License 2.0
46 stars 18 forks source link

cdc events are not sent to kafka #2

Closed pkgonan closed 3 years ago

pkgonan commented 3 years ago

Hi. cdc events are not sent to kafka. When we tested in dev environemnt (SIngle DC) worked well. But in production environment (Multi DC) did not work.

When create & update & delete command is executed in my_table (CDC Enabled), cdc log is generated to my_table_scylla_cdc_log successfully. But cdc event not sent to kafka topic. However, heartbeat event is produced to kafka successfully. (Kafka Topic : __debezium-heartbeat.cdc-data.test)

If an error log is detected, we can tell what the problem is, but it is difficult to know what the problem is because the error log does not occur.

[Versions]

Kafka Broker Version : 2.6.0
Confluent Kafka Connect Version : 6.1.1
scylla-cdc-source-connector Version : 1.0.0
ScyllaDB Open Source Version : 4.4.1

[Configs - Same in all environments.]

  "connector.class": "com.scylladb.cdc.debezium.connector.ScyllaConnector",
  "tasks.max": "3",  
  "scylla.cluster.ip.addresses": "'${database_url}'",  
  "scylla.user": "'${database_user}'",
  "scylla.password": "'${database_password}'",
  "scylla.name": "cdc-data.test",
  "scylla.table.names": "'${table_include_list}'",
  "scylla.query.time.window.size": "1000",
  "scylla.confidence.window.size": "1000",
  "producer.override.acks": "-1",
  "producer.override.max.in.flight.requests.per.connection": "1",
  "producer.override.compression.type": "snappy",
  "producer.override.linger.ms": "50",
  "producer.override.batch.size": "327680",
  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "topic.creation.default.replication.factor": "'${replication_factor}'",
  "topic.creation.default.partitions": "11"
CREATE TABLE IF NOT EXISTS test_service.my_table(
user_id varchar,
blocked_user_id varchar,
type varchar,
created_at timeuuid,
PRIMARY KEY((user_id, blocked_user_id)))
with cdc={'enabled': true, 'ttl': 172800};

[Dev Environment Config - Single DC]

CREATE KEYSPACE IF NOT EXISTS test_service WITH REPLICATION = {
'class' : 'NetworkTopologyStrategy',
'ap-northeast-1' : 3
};

[Production Environment Config - Multi DC]

CREATE KEYSPACE IF NOT EXISTS test_service WITH REPLICATION = {
'class' : 'NetworkTopologyStrategy',
'us-west-1' : 3,
'eu-central-1' : 3
};

[Confluent Kafka Connect Log]

[2021-04-26 08:01:55,208] INFO    tasks.max = 3 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    scylla.table.names = test_service.my_table (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    producer.override.batch.size = 327680 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    scylla.cluster.ip.addresses = AA:9042,BB:9042,CC:9042 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    heartbeat.interval.ms = 30000 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    scylla.password = ******** (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    producer.override.linger.ms = 50 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    scylla.query.time.window.size = 1000 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    task.class = com.scylladb.cdc.debezium.connector.ScyllaConnectorTask (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    producer.override.acks = -1 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    scylla.confidence.window.size = 1000 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    topic.creation.default.replication.factor = 2 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    name = my_table_db_connector (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    errors.tolerance = all (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    errors.log.enable = true (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,209] INFO    scylla.name = cdc-data.test (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,209] INFO    producer.override.max.in.flight.requests.per.connection = 1 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,219] INFO Requested thread factory for connector ScyllaConnector, id = cdc-data.test named = change-event-source-coordinator (io.debezium.util.Threads:270)
[2021-04-26 08:01:55,219] INFO Creating thread debezium-scyllaconnector-cdc-data.test-change-event-source-coordinator (io.debezium.util.Threads:287)
[2021-04-26 08:01:55,220] INFO WorkerSourceTask{id=my_table_db_connector-1} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
[2021-04-26 08:01:55,220] INFO Metrics registered (io.debezium.pipeline.ChangeEventSourceCoordinator:91)
[2021-04-26 08:01:55,220] INFO Context created (io.debezium.pipeline.ChangeEventSourceCoordinator:94)
[2021-04-26 08:01:55,220] INFO Snapshot ended with SnapshotResult [status=SKIPPED, offset=com.scylladb.cdc.debezium.connector.ScyllaOffsetContext@58cdf7fb] (io.debezium.pipeline.ChangeEventSourceCoordinator:106)
[2021-04-26 08:01:55,220] INFO Connected metrics set to 'true' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics:60)
[2021-04-26 08:01:55,220] INFO Starting streaming (io.debezium.pipeline.ChangeEventSourceCoordinator:139)
[2021-04-26 08:01:55,220] INFO Using native clock to generate timestamps. (shaded.com.scylladb.cdc.driver3.driver.core.ClockFactory:57)
===== Using optimized driver!!! =====
[2021-04-26 08:01:55,220] INFO ===== Using optimized driver!!! ===== (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:186)
[2021-04-26 08:01:55,222] INFO Requested thread factory for connector ScyllaConnector, id = cdc-data.test named = change-event-source-coordinator (io.debezium.util.Threads:270)
[2021-04-26 08:01:55,222] INFO Creating thread debezium-scyllaconnector-cdc-data.test-change-event-source-coordinator (io.debezium.util.Threads:287)
[2021-04-26 08:01:55,223] INFO WorkerSourceTask{id=my_table_db_connector-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
[2021-04-26 08:01:55,253] WARN Unable to register the MBean 'debezium.scylla:type=connector-metrics,context=snapshot,server=cdc-data.test': debezium.scylla:type=connector-metrics,context=snapshot,server=cdc-data.test (io.debezium.pipeline.ChangeEventSourceCoordinator:56)
[2021-04-26 08:01:55,253] WARN Unable to register the MBean 'debezium.scylla:type=connector-metrics,context=streaming,server=cdc-data.test': debezium.scylla:type=connector-metrics,context=streaming,server=cdc-data.test (io.debezium.pipeline.ChangeEventSourceCoordinator:56)
[2021-04-26 08:01:55,253] INFO Metrics registered (io.debezium.pipeline.ChangeEventSourceCoordinator:91)
[2021-04-26 08:01:55,253] INFO Context created (io.debezium.pipeline.ChangeEventSourceCoordinator:94)
[2021-04-26 08:01:55,253] INFO Snapshot ended with SnapshotResult [status=SKIPPED, offset=com.scylladb.cdc.debezium.connector.ScyllaOffsetContext@7b13f3ae] (io.debezium.pipeline.ChangeEventSourceCoordinator:106)
[2021-04-26 08:01:55,254] INFO Connected metrics set to 'true' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics:60)
[2021-04-26 08:01:55,254] INFO Starting streaming (io.debezium.pipeline.ChangeEventSourceCoordinator:139)
[2021-04-26 08:01:55,254] INFO Using native clock to generate timestamps. (shaded.com.scylladb.cdc.driver3.driver.core.ClockFactory:57)
===== Using optimized driver!!! =====
[2021-04-26 08:01:55,254] INFO ===== Using optimized driver!!! ===== (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:186)
[2021-04-26 08:01:55,831] INFO Using data-center name 'us-west-1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor) (shaded.com.scylladb.cdc.driver3.driver.core.policies.DCAwareRoundRobinPolicy:110)
[2021-04-26 08:01:55,832] INFO New Cassandra host /AA:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /BB:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /CC:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /DD:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO Using data-center name 'us-west-1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor) (shaded.com.scylladb.cdc.driver3.driver.core.policies.DCAwareRoundRobinPolicy:110)
[2021-04-26 08:01:55,832] INFO New Cassandra host /EE:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /FF:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /AA:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /BB:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /DD:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /CC:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /EE:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /FF:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)

[Below is a log that does not occur often and only occurs once.]

[2021-04-26 05:19:11,602] INFO Query SELECT * FROM test_service.my_table_scylla_cdc_log WHERE "cdc$stream_id" IN ? AND "cdc$time">? AND "cdc$time"<=?; is not prepared on null, preparing before retrying executing. Seeing this message a few times is fine, but seeing it a lot may be source of performance problems (shaded.com.scylladb.cdc.driver3.driver.core.RequestHandler:822)
[2021-04-26 05:19:11,604] INFO Query SELECT * FROM test_service.my_table_scylla_cdc_log WHERE "cdc$stream_id" IN ? AND "cdc$time">? AND "cdc$time"<=?; is not prepared on null, preparing before retrying executing. Seeing this message a few times is fine, but seeing it a lot may be source of performance problems (shaded.com.scylladb.cdc.driver3.driver.core.RequestHandler:822)
[2021-04-26 05:19:11,644] INFO Query SELECT * FROM test_service.my_table_scylla_cdc_log WHERE "cdc$stream_id" IN ? AND "cdc$time">? AND "cdc$time"<=?; is not prepared on null, preparing before retrying executing. Seeing this message a few times is fine, but seeing it a lot may be source of performance problems (shaded.com.scylladb.cdc.driver3.driver.core.RequestHandler:822)
pkgonan commented 3 years ago

@avelanarius @haaawk Hi. Is there an expected problem with the above issue?

haaawk commented 3 years ago

@pkgonan It might be related to the cross-DC latency. Could you please increase scylla.confidence.window.size to 30s and see if you still observe the problem?

pkgonan commented 3 years ago

@haaawk Thank you very much. After changed configuration to scylla.query.time.window.size : 30 and scylla.confidence.window.size : 30, it works well.

pkgonan commented 3 years ago

@haaawk I think max.response.time = scylla.query.time.window.size + scylla.confidence.window.size.

If the maximum response time is 60 seconds(30s + 30s) , it seems too long. I would like to receive cdc events in the near real time as possible. How is the best way to apply the scylla.query.time.window.size and scylla.confidence.window.size settings in multiple datacenter environment?

haaawk commented 3 years ago

Try setting them both to 10s.

kbr- commented 3 years ago

scylla.query.time.window.size should not have a significant effect on the lag, the lag should always be roughly equal to scylla.confidence.window.size. So your lag should be ~10s if you set scylla.confidence.window.size to 10s. Now you could try experimenting and setting it to a lower value, but you should check what your inter-DC latencies are, see how long it takes for a row to show up in another DC, and adjust for that. But be wary that latencies may spike due to network interruptions so the lower you set confidence.window.size the higher the chance that a row will reach a remote DC later so the connector's query window will miss it. It's a tradeoff between consistency and availability - if you're OK with some rows being missed by the connector during network-problematic periods, you may lower the window size.

pkgonan commented 3 years ago

@kbr- Thank you. I have additional questions. I tried some test based on the environment settings below.

When scylla.query.time.window.size is set to 5000, ScyllaDB receives 500 ~ 600 read requests per second. It seems that the load to read the CDC log loaded in the CDC table is too much than expected. If the Scylla source connector is configured separately for each table, it seems that too many requests to read the CDC log will occur.

We plan to add 1 Scylla CDC Connector for each Keyspace to separate the internal keyspace access rights. If two tables in one keyspace need to collect CDC logs, register in the same connector to collect CDC logs.

One problem arises in the above situation. If there is 1 Scylla CDC Connector with scylla.query.time.window.size setting of 5000, 500~600 too many requests are generated, causing Scylla's load to increase rapidly.

If a CDC Connector is registered for each of the 3 Kespaces, 1500~1800 read requests are generated to read the CDC Log.

Can you effectively reduce the read load? It doesn't seem appropriate to give up collecting CDC logs at near real-time speeds by increasing scylla.query.time.window.size to reduce the read load.

[Multi DC Environment]

CREATE KEYSPACE IF NOT EXISTS test_service WITH REPLICATION = {
'class' : 'NetworkTopologyStrategy',
'us-west-1' : 3,
'eu-central-1' : 3
};

[Connector Settings]

"tasks.max": "3",
"scylla.query.time.window.size": "5000",
"scylla.confidence.window.size":"2000"

[Traffic in us-west-1]

스크린샷 2021-04-27 오후 10 01
haaawk commented 3 years ago

I'm not sure what's the exact problem is @pkgonan. If you need the changes to be handled shortly after they appear, the connector needs to frequently check (read) the CDC Log.

What's your write throughput? If you're writing a lot then frequent connector reads are exactly what's needed. If you're not writing much then the connector may be reading too frequently. We're working on an optimisation for this low write scenario but it's not yet there. At the moment all you can do is increase the scylla.query.time.window.size which will cause the connector to read less frequently but in bigger chunks.