Closed morozov closed 2 years ago
I'm available all week 9-10am PST. Does that work for you?
Yes; would tomorrow (Friday 19th) that time work for you?
One semi-random note I realized when discussing this in another context: key and value schema names will need adjustment to include the database name.
Yes; would tomorrow (Friday 19th) that time work for you?
Yes, that works.
One semi-random note I realized when discussing this in another context: key and value schema names will need adjustment to include the database name.
Yes, we implemented that currently w/o a migration path. Will be happy to discuss the ideas on making it smooth from the upgrade perspective.
Open question: how to deal with metrics?
I looked a bit deeper at using the Signaling Table API to implement snapshotting new databases by the same connector that captures streaming changes from the same server instance.
If I understood the idea correctly, the mechanics would the following:
max.tasks
config parameter is indeed advisory. The code doesn't prevent the connector from returning more configurations than max.tasks
.There are a few unclear moments:
SourceTaskContext
API).SchemaChanges
command implemented via the signaling table, and the Snapshot
command. The result of processing the SchemaChange
command is produced to one of the topics produced by the connector before processing the following command. So if the connector commits its offset, there is a guarantee that the command has been fully processed. This doesn't apply to the Snapshot
command since it's meant to be performed asynchronously to capturing CDC events. At the same time, there's no way for the connector to persist its configuration changes.database.names
stored in Kafka Connect will remain the same until changed via the API).Technically, most of the concerns could be addressed by adding a "signaling" topic through which the task could interact with the connector, it could also store the state of snapshots. But I'm not sure if the framework will benefit from that. Maybe the same is possible via the existing Kafka Connect topics (assuming they are supposed to be used by the implementations).
Did I get the idea correctly? What do you think?
@morozov
@jpechane in your comment above, do you refer to the first list or to the second one? Both have 4 points in them. Judging by #3 in your response, it's about the second list.
In this regard IIRC the proposal was to have one task parked for exactly hadnling such situations.
I must have missed this bit. In this case, as I understand, this task will read the changes from the database where the signaling table resides and do the snapshots. Right? This way, the connector could do only one snapshot at a time which isn't a problem per se but seems to conflict with some statements below.
That's IMHO unnecessary
Makes sense if I understand the above right.
The idea is to store current running command in the offsets so it will resume upon restart.
What database should these offsets belong to? It's either the database where the signaling table resides or the database to be snapshotted. I assume it's the former because if the connector restarts it won't even know that it needs to consume the offsets of the database to be snapshotted.
In this case, as far as I understand, the task that captures the changes from the signaling table will have to produce some source records from which Kafka Connect would derive and store the offsets. To which topic should those messages be produced? Or maybe a task can commit its offsets directly w/o producing source messages?
The offsets could contain extra list of database names that would be used for filtering
I assume it's expected that the connector is configured with a full list of databases including those that haven't been snapshotted yet. The streaming tasks will ignore the databases that haven't been snapshotted until their snapshots are done. Does it mean that the streaming tasks should consume the offsets topic?
A "list of database names" implies that there may be multiple snapshots being done in parallel. Is it possible if the parked task will read the signaling table and do the snapshots, one at a time?
@morozov Hi, yes these were answers to the unclear points
I must have missed this bit. In this case, as I understand, this task will read the changes from the database where the signaling table resides and do the snapshots. Right? This way, the connector could do only one snapshot at a time which isn't a problem per se but seems to conflict with some statements below.
Well, IIRC that was on of the proposals how to solve the problem that it is not possible to start the taskas needed. @gunnarmorling then proposed to have a back task parked to handle this cases. And yes you are right, in this case it will execute the snapshot serially.
What database should these offsets belong to? It's either the database where the signaling table resides or the database to be snapshotted. I assume it's the former because if the connector restarts it won't even know that it needs to consume the offsets of the database to be snapshotted.
I understand this is a little bit unclear right now but I believe it should go to both databases. For the signalling table database it is necessary to be stored there to highlight the action has alread been started. For the new database it is exactly for the purpose of having the conector to know that the database is already captured.
In this case, as far as I understand, the task that captures the changes from the signaling table will have to produce some source records from which Kafka Connect would derive and store the offsets. To which topic should those messages be produced? Or maybe a task can commit its offsets directly w/o producing source messages?
Well, in case of high-traffic database the offsets could delivered as part of regular data message. But you are right, this is not guranteed so we might need to introduce a separate topic to maybe even send some status messages or reuse heartbeat topic for that?
I assume it's expected that the connector is configured with a full list of databases including those that haven't been snapshotted yet. The streaming tasks will ignore the databases that haven't been snapshotted until their snapshots are done. Does it mean that the streaming tasks should consume the offsets topic?
The streaming tasks will receive thier offsets via the offset loading API call. Win't they?
A "list of database names" implies that there may be multiple snapshots being done in parallel. Is it possible if the parked task will read the signaling table and do the snapshots, one at a time?
Sorry for the confusion, it was not intended in that way. It was intended as a queue of databases not yet snapshotted and they'd be snapshotted one by one as mentioned above.
@jpechane thank you for the clarification. I think I understand the idea at a high level, and it looks like it's farther beyond the existing Debezium architecture than my initial proposal about multi-tasking/multi-partitioning. At least given my current knowledge of Debezium.
If it's okay with you and Gunnar, I'd like to put this part of the proposal on hold and keep focusing on the multi-tasking/multi-partitioning support. In the meanwhile, we will temporarily address the scenario of adding a new database by a semi-automated scripted solution via a temporary snapshotting connector. I'll be happy to document the details if you're interested. And yes, you were right: the stored offset unconditionally contains the connector name :-)
The streaming tasks will receive their offsets via the offset loading API call. Won't they?
They will receive all relevant offsets once during the start, yes. But if we want to hand over a snapshotted database from the snapshotting task to the streaming one, something (maybe the connector) should consume the offsets topic and wait until the last snapshotted message is produced. Or poll the OffsetStorageReader
(no idea how efficient that would be).
[…] we might need to introduce a separate topic to maybe even send some status messages or reuse heartbeat topic for that?
Yeah, I wasn't aware of the heartbeat feature but it looks relevant to what we'd need.
In the meanwhile, we will temporarily address the scenario of adding a new database by a semi-automated scripted solution via a temporary snapshotting connector. I'll be happy to document the details if you're interested.
I'm definitely interested :) In particular, how do you intend to implement the switch from that snapshotting connector over to the regular connector?
Here's the full process including the switch over (starting step 4).
A few notes:
Assuming there is an already running connector (hereinafter 1:M connector) that captures changes from the databases "a" and "b" on a given server:
metadata:
name: sql-01
spec:
database.server.name: sql-01.example.com
database.names: a,b
database.history.kafka.topic: sql-01.dbhistory
A new database "c" needs to be added to the connector.
Deploy a temporary snapshot connector:
metadata:
name: sql-01-c-snapshot
spec:
database.server.name: sql-01.example.com
database.names: c
snapshot.mode: initial_only
database.history.kafka.topic: c.snapshot.dbhistory
It's safer to use a dedicated database history topic to be able to start from scratch if the snapshot fails. The rest of the configuration should be the same as in the 1:M connector. Maybe there's a better solution.
Wait until the snapshot is done:
Start consuming from the end of the Kafka Connect offsets topic using kafka-console-consumer
.
Consider only the messages that belong to the snapshot connector. Their key should look like the following (formatted for readability):
[
"sql-01-c-snapshot",
{
"server": "sql-01",
"database": "c"
}
]
If the offset has {"snapshot": "last"}
, break. Otherwise, continue consuming.
Delete the snapshot connector.
Copy database history from the temporary topic to the primary one without any modification (kafka-console-consumer -t c.snapshot.dbhistory | kafka-console-producer sql-01.dbhistory
).
Use the value of the last message consumed on step 2 and produce a message to the Kafka Connect offsets topic with the same value and the key of the 1:M connector (formatted for readability):
[
"sql-01",
{
"server": "sql-01",
"database": "c"
}
]
Update the list of database names on the running connector:
spec:
database.names: a,b,c
Delete the c.snapshot.dbhistory
topic.
Upon restart, the connector should pick up from where the snapshot connector left off.
@morozov, hey there, completely missed that this one still is pending. Could you perhaps take a look into this and see whether there are any open discussion items inline and try to wrap up those one? Would be nice to have this merged, so we have a referenceable description of the intended (and hopefully actual) implementation of this feature. Thanks a lot, and sorry again for the long delay here.
@gunnarmorling please see the update.
@gunnarmorling do you want to merge this document in its current state? It's primarily up to date except for the "metrics" part. The only mismatch is that instead of reworking the core class hierarchy, we introduced the Partition
arguments to the listeners and implemented a separate hierarchy for SQL Server.
I'm afraid, describing the design in a text format will take quite some time, and hopefully, the code speaks better for itself.
What do you think?
@morozov, yeah, let's merge it as-is, I think it definitely provides value in the current form. Thanks a lot!
That would be cool. I'm available all week 9-10am PST. Does that work for you?