redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
7.98k stars 786 forks source link

SQL Change Data Capture #82

Open marceloboeira opened 5 years ago

marceloboeira commented 5 years ago

We have a project at the company I work where we connect to PgSQL using a logical replication slot to receive all data changes (insert, update, delete) for a configurable set of tables and stream data directly from the database to streams (in our case kinesis). We do that using PgSQL WAL.

That's used for multiple reasons, faning-out to an ES instance from multiple sources, metrics/statistics collection, as well as data-warehousing e.g.: using kinesis firehose...

So, we have implemented similar things to benthos, in regards to acknowledgement and sinks. Yet, since our code is also written in go, moving forward, it would be easier for us to contribute to benthos, by adding support to PgSQL, rather than re-implementing lots of things you have already created.

It could be an interesting addition, since we could stream either events and data from the database.

I would like to hear the maintainers opinion on this, if it's something you're interested in I could work on a PR base.

Jeffail commented 5 years ago

Hey @marceloboeira, this sounds very similar to another MySQL based input we're looking at: https://github.com/Jeffail/benthos/pull/76

The current plan is to add it in as a plugin, the API for which is ready to use but some plumbing is still pending for generating configuration examples and documentation etc. I'd be interested in adding this as another plugin so your configuration would look something like this:

input:
  type: pgsql
  plugin:
    ... pgsql config fields here ...

If you can wait a few days for me to get the MySQL input merged you'll end up with a good example to base yours on.

Jeffail commented 5 years ago

Blocked on: https://github.com/Jeffail/benthos/issues/83

Jeffail commented 5 years ago

@marceloboeira I have an example plugin you can check out here: https://github.com/Jeffail/benthos/blob/feature/plugin-example/lib/input/plugins/mysql.go#L48

Let me know what you think of the RegisterPlugin and DocumentPlugin calls, it's fairly experimental right now so I'm looking for general feedback.

ghost commented 5 years ago

Hey

I am trying to use Benthos to parse database logs ( WAL)...for Oracle, mySQL, PostreSQL

@marceloboeira I have an example plugin you can check out here: https://github.com/Jeffail/benthos/blob/feature/plugin-example/lib/input/plugins/mysql.go#L48

The link above is dead. "Shes dead jim, shes dead"

Let me know what you think of the RegisterPlugin and DocumentPlugin calls, it's fairly experimental right now so I'm looking for general feedback.

Jeffail commented 5 years ago

Hey @gedw99, the plugin examples can now be found here: https://github.com/benthosdev/benthos-plugin-example, but there isn't one for WAL I'm afraid.

marceloboeira commented 5 years ago

I guess the WAL part we'll probably figure it out with wal-g and similars, and then have a plugin to hook it to benthos. Thanks @Jeffail for the update.

ghost commented 5 years ago

thanks all for the update.

Yeah playing with the following stuff to do this over at https://github.com/14-bits/cdc. The plan is to get it working with benthos, but lots of database parsing to do first.

Playing around with these libs to do CDC ( Change Capture Control) of the databases to feed into Benthos.

https://github.com/xo/usql

https://github.com/wal-g/wal-g

https://github.com/moiot/gravity

Jeffail commented 3 years ago

Since this is a fairly high demand topic I'm repurposing this issue for a general revisit of SQL CDC within Benthos. CDC is handled very differently across databases and therefore we have two options:

Ideally I wouldn't want to burden the Benthos project with actively maintaining these inputs, and would therefore prefer to utilize a library that is intended to be consumed similar to a typical stream client library.

To start this one off I'd like for us to do another search for Go libraries in this space since a fair amount of time has passed.

tomsej commented 3 years ago

Think implementing CDC in Benthos would be a great step forward and it would attract new users. I was looking at some projects and here are some I liked:

Jeffail commented 3 years ago

Another repo to check out (from https://github.com/Jeffail/benthos/issues/570): https://github.com/tmc/pqstream

mihaitodor commented 3 years ago

I don't need support for this (yet), but did some quick digging and thought that mixing Benthos with https://github.com/prest/prest might be interesting.

ekeric13 commented 2 years ago

In the direction of per database connections, here is another lib that handles postgres logical decoding well: https://github.com/kyleconroy/pgoutput

I liked what pgdeltastream is doing as well.

Main difference between the two is pgoutput creates the replication slot with the plugin pgoutput, the standard logical replication decoder. While pgdeltastream uses the plugin wal2json, which has the benefit of being immediately human readable, but the downside is it needs to be installed (though many postgres providers like aws rds have it installed out of the box).

jbergstroem commented 2 months ago

I've been testing out wal-listener quite extensively the last 24h. I think it would be an incredibly nice feature in benthos to have a similar interface.

artemklevtsov commented 2 months ago

Also look at Conduit project and their postgres connector.

nickchomey commented 1 month ago

Likewise conduit's vitess/mysql connector

https://github.com/conduitio-labs/conduit-connector-vitess