apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.03k stars 1.82k forks source link

[Feature]support CDC in framework (not only flink cdc, also for spark) #963

Open CalvinKirs opened 2 years ago

CalvinKirs commented 2 years ago

support change data capture(CDC) in SeaTunnel both both in flink and spark backend,

xtr1993 commented 2 years ago

cdc is for real-time calculations flink is compatible with debezium to support CDC maybe others My understanding is that you want to achieve it in other ways without debezium?

chenhu commented 2 years ago

It would be woh ~ for spark !

yuangjiang commented 2 years ago

spark cdc is feasible, only need to define a spark source to do a simple test. CDC can be developed based on debezium similar to this class DefaultSource extends StreamSourceProvider with DataSourceRegister with Logging { override def sourceSchema(sqlContext: SQLContext, schema: Option[StructType], providerName: String, parameters: Map[String, String]): (String, StructType) = { (shortName(),schema.get) }

override def createSource(sqlContext: SQLContext, metadataPath: String, schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source = { val debeziumOffset:DebeziumOffset = new DebeziumOffset val handover:Handover = new Handover val changeConsumer:DebeziumChangeConsumer = new DebeziumChangeConsumer(handover) val debeziumEngine:SparkDebeziumEngine = new SparkDebeziumEngine(debeziumOffset,handover,changeConsumer) val javaParameters:java.util.Map[String,String] = new util.HashMap[String,String]() for (parameter <- parameters){ javaParameters.put(parameter._1,parameter._2) } debeziumEngine.EngineInit(javaParameters) new DebeziumSource(sqlContext,schema.get,debeziumOffset,debeziumEngine) }

override def shortName(): String = "debezium" }

obobj commented 2 years ago

Is there any progress ?

UUIDUsername commented 2 years ago

Can we introduce BloomFilter under the characteristics of timing?

yuangjiang commented 2 years ago

this is a spark datasource using by seatunnel connector test code like this

object DebeziumTest { def main(args: Array[String]): Unit = { val sparkSession = SparkSession .builder() .master("local[3]") .appName("debezium") .getOrCreate()

val structType =
  StructType(
      StructField("a", StringType, nullable = true) ::
      StructField("b", DecimalType(6,2), nullable = true) ::
      StructField("c", DecimalType(22,0), nullable = true) ::
      StructField("d", DecimalType(6,0), nullable = true) ::
      StructField("e", DecimalType(22,0), nullable = true) ::
      StructField("f", DecimalType(6,0), nullable = true) ::
        StructField("g", DecimalType(22,0), nullable = true)::
      StructField("h", StringType, nullable = true) ::
      StructField("op", StringType, nullable = true) ::
      StructField("ts_ms", StringType, nullable = true) ::
        Nil
  )

val parameters = new util.HashMap[String,String]()
parameters.put("database.hostname","localhost")
parameters.put("database.port","3306")
parameters.put("database.user","root")
parameters.put("database.password","123456")
parameters.put("database.include.list","test")
parameters.put("snapshot.mode","schema_only")
parameters.put("table.include.list","temp.ts_cdc_test")
parameters.put("connector.class","io.debezium.connector.mysql.MySqlConnector")
val dataFrame = sparkSession
   .readStream
  .options(parameters)
   .schema(structType)
   .format("org.apache.spark.sql.execution.streaming.debezium.DefaultSource")
   .load()

dataFrame.createOrReplaceTempView("test")
sparkSession.sql("select * from test")
  .writeStream
  .format("console")
  .outputMode(OutputMode.Append())
  .start()
  .awaitTermination()

} }

can support all debezium connetcor by seatunnel stream mode

yuangjiang commented 2 years ago

Don't know how to implement this function

------------------ 原始邮件 ------------------ 发件人: "apache/incubator-seatunnel" @.>; 发送时间: 2022年2月23日(星期三) 中午1:40 @.>; @.**@.>; 主题: Re: [apache/incubator-seatunnel] [Feature]support CDC in framework (not only flink cdc, also for spark) (Issue #963)

Can we introduce BloomFilter under the characteristics of timing?

— Reply to this email directly, view it on GitHub, or unsubscribe. Triage notifications on the go with GitHub Mobile for iOS or Android. You are receiving this because you commented.Message ID: @.***>

dijiekstra commented 2 years ago

I think we need to upgrade all Row to RowData first in Flink Module