apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.78k stars 1.74k forks source link

[Feature][CDC] Support Flink CDC #1461

Open JesseAtSZ opened 2 years ago

JesseAtSZ commented 2 years ago

Search before asking

Description

Having been following the Seatunnel project for a long time, I am very interested in this project and have downloaded and tried this project. Unfortunately, due to the unfamiliarity with Fink and Java, my attempt to implement Flink CDC for this project encountered some problems. After spending more effort on study, I have come up with a few ideas on the implementation of CDC. The following is some of my ideas I would like to discuss with you and hope you could give some suggestions:

Seatunnel currently supports two plug-in systems: Flink and Spark. However, there is still room for improvement in the existing plug-in system's support for CDC. I suggest adding CDC system for the following reasons:

  1. For the existing Flink system, in order to facilitate the processing of the transform stage, the Source is output in the form of DataStream, but the CDC does not need transform, only Source and Sink.
  2. CDC may need to synchronize schema changes, and the row contains only one row of data and corresponding operations. Using DataStream cannot carry schema information. Schema information needs to be carried in the Source stage.

So I think the new CDC system consists of two phases: Source and Sink. Source outputs DataStream; Sink receives DataStream, parses the fields, and modifies the target database through Flink JDBC.

Taking MySQL synchronization to MySQL as an example, the key points of data processing are as follows:

  1. The custom serializer SeatunnelDeserializationSchema parses the SourceRecord (Debezium ChangeLogEvent) into a JSON String.
  2. The op field identifies the operation of the row, the value field identifies the change of the data field, and the schema field identifies the change of the table structure.
  3. The SQL statements of Insert, Delete and Update are spliced according to the schema field, and the data synchronization is realized through Flink JDBC.

Remaining problems:

  1. Is it necessary to convert SourceRecord to DataStream (JSON)? Is there any other type of Stream more suitable for SourceRecord conversion?
  2. Can JDBC on the sink side only Insert, Delete and Update data by splicing SQL?
  3. Is the license allowed? Flink CDC connectors relies on MySQL JDBC, which is a GPL license, but Flink CDC connectors is an Apache 2.0 license.

The above is my simple design of seatunnel supporting CDC. I'm really not familiar with Flink (I've only studied it for about one week). I hope you can give me more suggestions, Thank you very much!

Usage Scenario

No response

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

ruanwenjun commented 2 years ago

I am also newly in seatunnel and flink, I think this feature might be interesting and useful.

The current seatunnel-connector-flink-jdbc is just used for offline batch job, it doesn't support realtime capture the data change of database. And flink-cdc is good at capturing the data change, if we add flink-cdc into seatunnel, this can help seatunnel do the realtime capture job.

And I try to answer your concern:

  1. I think is ok to transform sourcerecord into JSON,
  2. This depends on which system we want to sink, if we want to sink into database we can just use SQL, if we want to into other system like kafka, we can use clienk. I think the flink-cpc is also focus on get the data change event, it doesn't care about how to use the change event.
  3. I am not sure the license is allowed, this may depend on whether the binary contains mysql-connector

This is a good begin, wait for other community member give more suggestions.

JesseAtSZ commented 2 years ago

@ruanwenjun Thank you very much for your advice, but I still have the following questions:

  1. Is it inefficient to convert SourceRecord to DataStream frequently? I wonder if Flink has any more suitable API to implement this conversion. For example, I found a structure called ChangeLogStream (just an example), although I don't know what it is used for, it seems to be very suitable for conversion from the perspective of naming. Is there any more efficient method to do conversion? In addition, I also saw the description of Avro on the Debezium official website, is there any method to store Schema and deal with schema changes through Avro? I think this is very helpful to improve efficiency.
  2. In fact, I'm confused about how to write into mysql, the official website only introduces the use of Flink SQL, and I have seen many examples by writing SQL and then executing JDBC as Sink, I wonder if there is a more elegant way, or I think writing to MySQL is a very common way, Is there any library that can directly process SourceRecord?
  3. I have also tried to use RowDataDebeziumdeSerializationSchema, hoping to reduce the conversion overhead, but I found that using this serialization method requires manually specifying the schema. I think this means that I cannot cope with the scenario of schema change, so it is not suitable for me; And I found that the RowData converted in this way carries very little information. Is my understanding correct? In addition, I'd like to ask your opinion on the difference between RowData and Row, I think Row carries more information than RowData.
Hisoka-X commented 2 years ago

In my view, CDC just one special source. This source including data's change. User can use these data do anything. I think we should do the thing is create special cdc series connector (both source and sink). So user can transfer data change use seatunnel.

Hisoka-X commented 2 years ago

@CalvinKirs Can you share some information about how support flink cdc?

CalvinKirs commented 2 years ago

ping @CalvinKirs

Hisoka-X commented 2 years ago

@JesseAtSZ Any progress? If you want, I want to contribute this feature with you

JesseAtSZ commented 2 years ago

@BenJFan Recently, I haven't started to write code, mainly to understand how to ensure strict consistency of transactions when using Flink CDC: Can Flink CDC guarantee MySQL transactions

In addition, for the specific code level, I still have the above three problems to be solved.

Hisoka-X commented 2 years ago

@BenJFan Recently, I haven't started to write code, mainly to understand how to ensure strict consistency of transactions when using Flink CDC: Can Flink CDC guarantee MySQL transactions

In addition, for the specific code level, I still have the above three problems to be solved.

Guaranteed strict consistency of transactions is not just one component can complete, the transaction that cdc can guarantee requires not only that the data source can be replayed (binlog can be replayed), but also the sink side to support transactions (traditional transaction or distributed transaction) or write idempotency

JesseAtSZ commented 2 years ago

@BenJFan Recently, I haven't started to write code, mainly to understand how to ensure strict consistency of transactions when using Flink CDC: Can Flink CDC guarantee MySQL transactions In addition, for the specific code level, I still have the above three problems to be solved.

Guaranteed transaction is not only one component that can be completed, the transaction that cdc can guarantee requires not only that the data source can be replayed (binlog can be replayed), but also the sink side to support transactions (traditional transaction or distributed transaction) or write idempotency

Flink CDC supports binlog replay. The problem I want to solve is that the sink side can strictly guarantee the transactions on the source side, rather than simply inserting and modifying them line by line through SQL ( it only replays SQL, but it can not guarantee transactions. For example, if a sink side transaction suddenly goes down in the middle of execution, there is a problem with the data on the sink side at this time). I think there are several key points to this problem:

  1. The source side can obtain transaction information and ensure the order
  2. The sink side can ensure the sequential insertion of transactions and the idempotency during fault recovery

I have some understanding of these two questions:

  1. I found that the changelog event in debezium contains transaction information, but the transaction information in Flink's SourceRecord is not complete. I'm considering whether to improve the transaction information of Flink CDC, then construct different queues through different transaction id, and finally submit in the order of gtids?
  2. The idempotency of fault recovery is mainly reflected in ensuring that the transaction will not be executed repeatedly, so it may be necessary to introduce checkpoints to record the transaction id, which I haven't thought about yet.
Hisoka-X commented 2 years ago

Maybe we can support cdc first, then add exactly-once support.

Hisoka-X commented 2 years ago

@BenJFan Recently, I haven't started to write code, mainly to understand how to ensure strict consistency of transactions when using Flink CDC: Can Flink CDC guarantee MySQL transactions In addition, for the specific code level, I still have the above three problems to be solved.

Guaranteed transaction is not only one component that can be completed, the transaction that cdc can guarantee requires not only that the data source can be replayed (binlog can be replayed), but also the sink side to support transactions (traditional transaction or distributed transaction) or write idempotency

Flink CDC supports binlog replay. The problem I want to solve is that the sink side can strictly guarantee the transactions on the source side, rather than simply inserting and modifying them line by line through SQL ( it only replays SQL, but it can not guarantee transactions. For example, if a sink side transaction suddenly goes down in the middle of execution, there is a problem with the data on the sink side at this time). I think there are several key points to this problem:

  1. The source side can obtain transaction information and ensure the order
  2. The sink side can ensure the sequential insertion of transactions and the idempotency during fault recovery

I have some understanding of these two questions:

  1. I found that the changelog event in debezium contains transaction information, but the transaction information in Flink's SourceRecord is not complete. I'm considering whether to improve the transaction information of Flink CDC, then construct different queues through different transaction id, and finally submit in the order of gtids?
  2. The idempotency of fault recovery is mainly reflected in ensuring that the transaction will not be executed repeatedly, so it may be necessary to introduce checkpoints to record the transaction id, which I haven't thought about yet.
  1. The order of transactions is determined by the transaction id. Idempotency needs to be supported by the design of data writing methods, and has nothing to do with fault recovery.
  2. CDC should already support checkpoint.
JesseAtSZ commented 2 years ago

@BenJFan Recently, I haven't started to write code, mainly to understand how to ensure strict consistency of transactions when using Flink CDC: Can Flink CDC guarantee MySQL transactions In addition, for the specific code level, I still have the above three problems to be solved.

Guaranteed transaction is not only one component that can be completed, the transaction that cdc can guarantee requires not only that the data source can be replayed (binlog can be replayed), but also the sink side to support transactions (traditional transaction or distributed transaction) or write idempotency

Flink CDC supports binlog replay. The problem I want to solve is that the sink side can strictly guarantee the transactions on the source side, rather than simply inserting and modifying them line by line through SQL ( it only replays SQL, but it can not guarantee transactions. For example, if a sink side transaction suddenly goes down in the middle of execution, there is a problem with the data on the sink side at this time). I think there are several key points to this problem:

  1. The source side can obtain transaction information and ensure the order
  2. The sink side can ensure the sequential insertion of transactions and the idempotency during fault recovery

I have some understanding of these two questions:

  1. I found that the changelog event in debezium contains transaction information, but the transaction information in Flink's SourceRecord is not complete. I'm considering whether to improve the transaction information of Flink CDC, then construct different queues through different transaction id, and finally submit in the order of gtids?
  2. The idempotency of fault recovery is mainly reflected in ensuring that the transaction will not be executed repeatedly, so it may be necessary to introduce checkpoints to record the transaction id, which I haven't thought about yet.
  1. The order of transactions is determined by the transaction id. Idempotency needs to be supported by the design of data writing methods, and has nothing to do with fault recovery.
  2. CDC should already support checkpoint.

The combination of Flink CDC and Flink JDBC has achieved idempotency. There are checkpoints on the Source side and upsert on the Sink side, however, this combination can only meet the final consistency, but can not meet the real-time consistency, (as I said above, Flink CDC and Flink JDBC will split the operations in a transaction into many SQL). The transaction order and checkpoint I mentioned here refer to the implementation under the condition of ensuring transactions.

If we just want to ensure the final consistency, I don't think it's difficult to realize data synchronization. However, if strict consistency is required, there will be transaction problems, which depends on the transaction information provided by Flink CDC. However, at present, the transaction information is incomplete. I'm not sure to what extent we want to achieve, maybe we just need to ensure the final consistency.

Hisoka-X commented 2 years ago

@BenJFan Recently, I haven't started to write code, mainly to understand how to ensure strict consistency of transactions when using Flink CDC: Can Flink CDC guarantee MySQL transactions In addition, for the specific code level, I still have the above three problems to be solved.

Guaranteed transaction is not only one component that can be completed, the transaction that cdc can guarantee requires not only that the data source can be replayed (binlog can be replayed), but also the sink side to support transactions (traditional transaction or distributed transaction) or write idempotency

Flink CDC supports binlog replay. The problem I want to solve is that the sink side can strictly guarantee the transactions on the source side, rather than simply inserting and modifying them line by line through SQL ( it only replays SQL, but it can not guarantee transactions. For example, if a sink side transaction suddenly goes down in the middle of execution, there is a problem with the data on the sink side at this time). I think there are several key points to this problem:

  1. The source side can obtain transaction information and ensure the order
  2. The sink side can ensure the sequential insertion of transactions and the idempotency during fault recovery

I have some understanding of these two questions:

  1. I found that the changelog event in debezium contains transaction information, but the transaction information in Flink's SourceRecord is not complete. I'm considering whether to improve the transaction information of Flink CDC, then construct different queues through different transaction id, and finally submit in the order of gtids?
  2. The idempotency of fault recovery is mainly reflected in ensuring that the transaction will not be executed repeatedly, so it may be necessary to introduce checkpoints to record the transaction id, which I haven't thought about yet.
  1. The order of transactions is determined by the transaction id. Idempotency needs to be supported by the design of data writing methods, and has nothing to do with fault recovery.
  2. CDC should already support checkpoint.

The combination of Flink CDC and Flink JDBC has achieved idempotency. There are checkpoints on the Source side and upsert on the Sink side, however, this combination can only meet the final consistency, but can not meet the real-time consistency, (as I said above, Flink CDC and Flink JDBC will split the operations in a transaction into many SQL). The transaction order and checkpoint I mentioned here refer to the implementation under the condition of ensuring transactions.

If we just want to ensure the final consistency, I don't think it's difficult to realize data synchronization. However, if strict consistency is required, there will be transaction problems, which depends on the transaction information provided by Flink CDC. However, at present, the transaction information is incomplete. I'm not sure to what extent we want to achieve, maybe we just need to ensure the final consistency.

In my opinon,step by step. The fisrt thing is support cdc, then consider about consistency

JesseAtSZ commented 2 years ago

@BenJFan I still have these questions that I hope can be answered: https://github.com/apache/incubator-seatunnel/issues/1461#issuecomment-1064156814

cswangzheng commented 2 years ago

Hi, we have similar question for this. If seatunnel support flink cdc, there should be a flink cdc source plugin, is it in the plan of the roadmap?