scylladb / scylla-cdc-source-connector

A Kafka source connector capturing Scylla CDC changes
Apache License 2.0
48 stars 18 forks source link

CDC log stream state (cdc$time) persisted via connect topic `connect-offsets` #16

Open hartmut-co-uk opened 3 years ago

hartmut-co-uk commented 3 years ago

Hi, when looking at the data published to connect-offsets table I noticed the latest window state is tracked by

image

Why is this at the vnode_id level and where does this information come from? When querying the table the vnode_id is not used as a query condition, right?

Further implication (maybe?): The topic connect-offsets is created by kafka connect (not the scylla connector) and is not a compacted topic. While running a simple test (scylla.query.time.window.size: 2000) for 1 connector, 1 task, 1 table - resulted in ~1M messages on the docker-connect-offsets topic. @pkgonan may I ask if you've got numbers to confirm this for a more comprehensive setup?

@haaawk how is this topic consumed upon connector (re)start / task/consumer rebalancing? From beginning?


Update 2021-12-15:

ℹ️ For reference: the part on connect-offsets already has been well described and addressed in a section in the repo README: https://github.com/scylladb/scylla-cdc-source-connector/blob/ecbeb1d3f643a20b6387d1e54b4aa6837f171738/README.md?plain=1#L601-L605

Offset (progress) storage

Scylla CDC Source Connector reads the CDC log by quering on Vnode granularity level. It uses Kafka Connect to store current progress (offset) for each Vnode. By default, there are 256 Vnodes per each Scylla node. Kafka Connect stores those offsets in its connect-offsets internal topic, but it could grow large in case of big Scylla clusters. You can minimize this topic size, by adjusting the following configuration options on this topic:

  1. segment.bytes or segment.ms - lowering them will make the compaction process trigger more often.
  2. cleanup.policy=delete and setting retention.ms to at least the TTL value of your Scylla CDC table (in milliseconds; Scylla default is 24 hours). Using this configuration, older offsets will be deleted. By setting retention.ms to at least the TTL value of your Scylla CDC table, we make sure to delete only those offsets that have already expired in the source Scylla CDC table.
haaawk commented 3 years ago

In a single query, the connector queries all the streams that belong to a given vnode. That's why the offset is tracked by vnode_id.

Does that answer your question @hartmut-co-uk ?

hartmut-co-uk commented 3 years ago

thanks! How is this topic consumed upon connector (re)start / task/consumer rebalancing? From beginning?

How do 'generation_start' and streams relate? Are there Scylla system topics where all of this is maintained?

haaawk commented 3 years ago

@avelanarius Could you please answer with details here?

hartmut-co-uk commented 2 years ago

I've been playing with the TableBackedProgressManager of scylla-cdc-go and I think it might be a good alternative candidate on how to persist current CDC log stream state (cdc$time)...

Are there plans to add similar functionality to either this repo or scylla-cdc-java?