redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.1k stars 824 forks source link

Implement CDC for MySQL using binlog #984

Open Locustv2 opened 2 years ago

Locustv2 commented 2 years ago

MySQL is a very common reliable RDS. However listening to events as CDC (change data capture) from MySQL has still rooms to grow as there are not many solutions readily available.

In order to create an add-on for benthos, we need to understand how to handle the different events from MySQL:

To use binlog from MySQL, it has to be enabled on the server side. Once enabled, the mysqlbinlog can read the binlog files which looks as follows:

# mysqlbinlog -vvv /var/lib/mysql/master.000001 

BINLOG '
JxiqVxMBAAAALAAAAI7LegAAAHQAAAAAAAEABHRlc3QAAWEAAQMAAUTAFAY=
JxiqVx4BAAAAKAAAALbLegAAAHQAAAAAAAEAAgAB//5kAAAAedRLHg==
'/*!*/;
### INSERT INTO `test`.`a`
### SET
###   @1=100 /* INT meta=0 nullable=1 is_null=0 */
# at 8047542
#160809 17:51:35 server id 1  end_log_pos 8047573 CRC32 0x56b36ca5      Xid = 24453
COMMIT/*!*/;

Which is basically the historical logs of queries that can be executed on a new database to restore the state.

Idea:

With the binlog file, we have several options to move forward

However regardless of the solution we choose, we will still need to come up with an output that will handle the 3 CDC events.

If we take the common use case of mysql-kafka, there are only 2 events:

So we should be able to know at all time within a benthos pipeline what the key of the current message/data is.

Maybe it would be good to have a benthos generated id/key for each message in the pipeline (and this should be for all the benthos input available). If the input support id/key like mysql, kafka or any other inputs, we can override the benthos id/key with it. This will make sure that all the messages have the same meta pk in benthos terms.

The reason for this is that if we don't have it, we need to handle it separately. Example CSV to kafka benthos:

input:
  gcp_cloud_storage:
    bucket: ${GCS_BUCKET}
    prefix: ${GCS_PREFIX}
    codec: csv
    delete_objects: true

pipeline:
  processors:
    - bloblang: |
        meta pk = this.person_id
        root.person_id = this.person_id
        root.name = this.name
        root.address  = this.address
        # Check if we need to send tombstone of the is_deletion flag.
        # To send tombstone, the root should be set to null
        root = if this.is_deletion == "true" {
            null
        }

output:
  kafka:
    addresses: [ ${KAFKA_BROKER_ADDRESS} ]
    topic: ${KAFKA_TOPIC}
    client_id: ${KAFKA_CLIENT_ID}
    target_version: 2.4.0
    key: ${! meta("pk") }
    max_in_flight: 10
    batching:
      count: 1000
      period: "60s"

In this example you can see that i had to handle the key and tombstone in the bloblang. However if we have always 2 object to work with (i.e. 1 for key and 1 for value) this can be much simpler. (of course there is no way to determine a delete from a csv file, this is purely based on the type of input that actually supports delete)

The MySQL Binlog perspective

The new input add-on should handle the 3 events mentioned above. Let's assume we can have a key and value of the message in each step of the pipeline of a benthos app. The add-on could process the events as follows (taking a basic person object with basic fields as example):

# key object
{
  "person_id": 100
}

# value object
{
  "person_id": 100,
  "name": "Tom,
  "address": "Some Street"
}

Example of a benthos pipeline:

pipeline:
  processors:
    - bloblang: |
        # here we can play with key and value instead of root
        # by default it would be something like (what we receive from the binlog):
        key.person_id = 100
        value.person_id = 100
        value.name = "Tom"
        value.address = "Some Street"

       # And if we want to process the values, we can use bloblang as we did already
       # renaming the key field for example:
       key.person_id = deleted()
       key.id = this.person_id

Examples of binlog json parsers:

I'll add more updates later. If. you have more ideas or questions, feel free to add.

gedw99 commented 2 years ago

https://github.com/wal-g/wal-g is for postres.

jakthom commented 2 years ago

Speaking of postgres, a pg logical slot input would be fantastic.

https://www.postgresql.org/docs/10/logicaldecoding-explanation.html#LOGICALDECODING-REPLICATION-SLOTS

nickchomey commented 4 months ago

Here is conduit's vitess/mysql connector - being a golang tool that is very similar to benthos, it may provide some good inspiration for this.

https://github.com/conduitio-labs/conduit-connector-vitess