akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka/current/
Other
1.26k stars 644 forks source link

Debezium connector #2110

Open gabfssilva opened 4 years ago

gabfssilva commented 4 years ago

Short description

Implement Debezium connector

Details

Debezium is a Change Data Capture (CDC) kafka connector that integrates with several databases. Although Debezium is a kafka connector, it also works as a standalone tool: https://debezium.io/documentation/reference/1.0/operations/embedded.html

The idea here is to use Debezium as a CDC source. The only issue is understand how backpressure would work here.

I also noticied there's already a PR with a CDC connector for PostgreSQL (#891), but, be able to use CDC with other databases is a nice to have.

As I create this request, the lastest final version supports the following databases:

Database Versions
MySQL 5.7, 8.0.13
MongoDB 3.2, 3.4, 3.6, 4.0, 4.2
PostgreSQL 9.6, 10, 11, 12
Oracle 11g, 12c
SQL Server 2017, 2019
Cassandra 3.11.4

Apache Camel already has a component for it (actually, there are a few: one for each database). We could check on how they implement it and see if it's a viable solution for Alpakka.

seglo commented 4 years ago

A Debezium connector sounds like a great idea. I took a look at the docs you mentioned. In the "Advanced Record Consuming" section it describes creating your own implementation to handle source batches of records. If calling the RecordCommitter controls when the next batch is sent, then a Debezium Source stage would only commit to request the next batch when there's a demand request.

https://debezium.io/documentation/reference/1.0/operations/embedded.html#advanced-consuming

I don't know enough about Debezium to know how failure recovery and/or restarts would work. How do you resume from some last known position? Is the state managed in whatever state store you configure with database.history?

gabfssilva commented 4 years ago

I think it has to do with offset.storage properties. There's other thing we have to look up to see if it fits well with Akka Streams. I'm afraid it relies on blocking RPC calls. It's not that big of a problem, but, it has to be very well documented.

gabfssilva commented 4 years ago

It seems like the behavior of RecordCommitter is just like you said. This way will be easy to implement backpressure. I'll run some tests myself to be sure about this one, but, looks promising.

seglo commented 4 years ago

Sounds good. Looking forward to your analysis.

gabfssilva commented 4 years ago

As I suspected, Debezium has no asynchronous backpressure mechanism. I opened a PR and commented about it with a possible solution. If it's a viable one or not, I'm not sure.

Maatary commented 2 years ago

Hi, I wonder what is the status of this issue, if there is currently a workaround, best practice self implementation and so on