cockroachdb / replicator

replicator is a toolkit for ingesting logical replication feeds into a CockroachDB cluster
Apache License 2.0
60 stars 23 forks source link

RFC: Debezium integration #564

Open sravotto opened 11 months ago

sravotto commented 11 months ago

Integration with Debezium Connectors

Debezium is an open source distributed platform for change data capture. See https://debezium.io/ for documentation, and code.

While Debezium provides a full environment to stream events from upstream databases to other systems via Kafka connect service or a Debezium server, the various connectors can be used directly within a Java application, leveraging the debezium-api module. This module defines a small API that allows an application to easily configure and run Debezium connectors using the Debezium Engine.

cdc-sink currently directly supports several databases as sources, however, leveraging Debezium connectors could significant expand the integration points. One of the challenge is that the Debezium APIs are Java-based, while cdc-sink is written in Go. One way to get around this is to deploy a thin sidecar application that uses the debezium-api module, connects with the upstream database and forwards the events to cdc-sink in one of the output message formats supported (e.g. Json), as shown in the following diagram:

High level design

Sidecar design

The Debezium sidecar reads properties to properly configure the connector to the source database, and call the DebeziumEngine api to connect to the database and wait for events.

Sidecar responsibilities:

The handler to post batches to cdc-sink should implement the ChangeConsumer interface (see https://javadoc.io/doc/io.debezium/debezium-api/2.4.0.Final/io/debezium/engine/DebeziumEngine.ChangeConsumer.html)

The handler posts a JSON array of events with the following structure:

[
    { "key" : 
        {  
            "schema":   "... provides information about the schema for the key ...",
            "payload" :  "... JSON object that represent they key. "
        },
     "value":
       {
               "before": "... a JSON object that represents the row before the change (for updates, deletes) ...",
               "after":   "... a JSON object that represents the row after the change ..." ,
               "op":  ".... the operation (c=insert; d=delete; u=update) ...",
               "source": "...  information about the source emitting the change, e.g. database vendor, version etc. ...",
      }
  }
]

Optionally, transaction boundary events may be provided within the batch to signal begin and end of a transaction. Note: mutations associated to a transaction may extend across multiple batches. To enable transaction boundary events, set the provide.transaction.metadata property to true.

{ "key" : 
        {  
            "schema":   "... provides information about the schema for the key ..."
             "payload" : {
                "id" : "... string representation of the unique transaction identifier."
             }
        },
   "value":
       {
          "status": "... BEGIN or END",
          "id": "... string representation of the unique transaction identifier.",
          "ts_ms": "... time at the data source",
      }
}

The main sidecar code, at high level, would be:


    ...
    config = Configuration.empty();
    final Properties props = config.asProperties();
    try {
        props.load(new FileInputStream(propertiesFile));
    } catch (Exception e) {
        ...
    }
    ChangeConsumer<R> batcher = new Batcher(url);
    try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
            .using(props)
            .notifying((records, committer) -> 
                try {
                    // the batcher collects the change events,
                    // sends them to cdc-sink and signal the
                    // committer on success.
                    batcher.handleBatch(records,committer)
                } catch (Exception e) {
                    ....
                }
            }).build()) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(engine);
        while (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            ... 
        }
    } catch (Exception e) {
       ...
    }

Example of properties to connect to a SQLServer:

# Database information
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
database.hostname=localhost
database.password=password
database.port=1433
database.user=sa
database.names=myDB
database.encrypt=false

name=engine
decimal.handling.mode=string
topic.prefix=cdc-connector

# Where to keep the state
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.storage.file.filename=./tmp/mysql/offsets.dat
schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
schema.history.internal.file.filename=./tmp/mysql/schemahistory.dat

# Cdc sink connection
cdcsink.url=https://localhost:30004/immediate
cdcsink.skip.verify=true

cdc-sink endpoint

On the cdc-sink server side, new endpoints will consume the change events and apply them to the target database, leveraging the usual mode of operations:

Alternatives considered

In integrating with the Debezium ecosystem, there are few other design alternatives that we have considered:

BramGruneir commented 10 months ago

I wanted to summarize the different possible approaches here, please let me know if I'm missing something but I think these do match up with what you've been proposing. I wanted to point out exactly what work would need to be completed.

sravotto commented 10 months ago

The summary is correct. Minor edit: this what I started to work on: Debezium as a library --(Debezium JSON format)--> cdc-sink --(SQL)--> crdb

I'm in the process to add some additional transformation in the sidecar that uses the library, so Debezium as a Library --(cockroach cdc format)--> cdc-sink --(SQL)--> crdb is also supported. It's fairly straightforward, and won't prevent us to extend the previous approach, should we need it.