apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.06k stars 1.83k forks source link

[Feature] [Connector-Redis] Redis connector support delete data #7994

Closed lm-ylj closed 1 week ago

lm-ylj commented 2 weeks ago

Purpose of this pull request

close #7977

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

Hisoka-X commented 2 weeks ago

Please add test case.

lm-ylj commented 2 weeks ago

Please add test case. I'll finish this later

lm-ylj commented 1 week ago

@Hisoka-X 你好,我遇到了一个问题请教下,我发现MySQL CDC实时读取时获取json类型的数据时,读取到的结构与数据库中原始数据不一致,想请问下这里是如何处理的? 原始数据是:f_json是{"key": "value"}中间有空格,我查看了MySQL binlog也是有空格的,如下:

### DELETE FROM `mysql_cdc`.`mysql_cdc_e2e_source_table`
### WHERE
###   @1=3 /* INT meta=0 nullable=0 is_null=0 */
###   @2='abct' /* STRING(64) meta=65088 nullable=1 is_null=0 */
###   @3='hello' /* BLOB/TEXT meta=2 nullable=1 is_null=0 */
###   @4='\x18\x00\x00\x00x▒\x0b▒▒,V\x00▒D▒▒▒t▒▒Ģ▒̼ĢJ\x00is\b▒' /* MEDIUMBLOB/MEDIUMTEXT meta=3 nullable=1 is_null=0 */
###   @5=NULL /* LONGBLOB/LONGTEXT meta=4 nullable=1 is_null=1 */
###   @6='tinyblob' /* TINYBLOB/TINYTEXT meta=1 nullable=1 is_null=0 */
###   @7='Hello world' /* VARSTRING(100) meta=100 nullable=1 is_null=0 */
###   @8=12345 /* SHORTINT meta=0 nullable=1 is_null=0 */
###   @9=-11215 (54321) /* SHORTINT meta=0 nullable=1 is_null=0 */
###   @10=123456 /* MEDIUMINT meta=0 nullable=1 is_null=0 */
###   @11=654321 /* MEDIUMINT meta=0 nullable=1 is_null=0 */
###   @12=1234567 /* INT meta=0 nullable=1 is_null=0 */
###   @13=7654321 /* INT meta=0 nullable=1 is_null=0 */
###   @14=1234567 /* INT meta=0 nullable=1 is_null=0 */
###   @15=7654321 /* INT meta=0 nullable=1 is_null=0 */
###   @16=123456789 /* LONGINT meta=0 nullable=1 is_null=0 */
###   @17=987654321 /* LONGINT meta=0 nullable=1 is_null=0 */
###   @18=123 /* DECIMAL(10,0) meta=2560 nullable=1 is_null=0 */
###   @19=789 /* DECIMAL(10,0) meta=2560 nullable=1 is_null=0 */
###   @20=12.34                /* FLOAT meta=4 nullable=1 is_null=0 */
###   @21=56.780000000000001137 /* DOUBLE meta=8 nullable=1 is_null=0 */
###   @22=90.120000000000004547 /* DOUBLE meta=8 nullable=1 is_null=0 */
###   @23='This is a long text field' /* LONGBLOB/LONGTEXT meta=4 nullable=1 is_null=0 */
###   @24='This is a medium text field' /* MEDIUMBLOB/MEDIUMTEXT meta=3 nullable=1 is_null=0 */
###   @25='This is a text field' /* BLOB/TEXT meta=2 nullable=1 is_null=0 */
###   @26='This is a tiny text field' /* TINYBLOB/TINYTEXT meta=1 nullable=1 is_null=0 */
###   @27='This is a varchar field' /* VARSTRING(200) meta=200 nullable=1 is_null=0 */
###   @28='2022:04:27' /* DATE meta=0 nullable=1 is_null=0 */
###   @29='2022-04-27 14:30:00' /* DATETIME(0) meta=0 nullable=1 is_null=0 */
###   @30=1682593720 /* TIMESTAMP(0) meta=0 nullable=1 is_null=0 */
###   @31=b'1' /* BIT(1) meta=1 nullable=1 is_null=0 */
###   @32=b'0101010101010101010101010101010101010101010101010101010101010101' /* BIT(64) meta=2048 nullable=1 is_null=0 */
###   @33='C' /* STRING(4) meta=65028 nullable=1 is_null=0 */
###   @34=2 /* ENUM(1 byte) meta=63233 nullable=1 is_null=0 */
###   @35='\x1b\x00\x00\x00x▒\x0b▒▒,V\x00▒D▒▒Ԕ▒▒\\▒▒▒▒$▒▒▒Ԝ\x14\x00▒▒\t▒' /* MEDIUMBLOB/MEDIUMTEXT meta=3 nullable=1 is_null=0 */
###   @36='This is a long varchar field' /* MEDIUMBLOB/MEDIUMTEXT meta=3 nullable=1 is_null=0 */
###   @37=12.345000000000000639 /* DOUBLE meta=8 nullable=1 is_null=0 */
###   @38='14:30:00' /* TIME(0) meta=0 nullable=1 is_null=0 */
###   @39=-128 (128) /* TINYINT meta=0 nullable=1 is_null=0 */
###   @40=-1 (255) /* TINYINT meta=0 nullable=1 is_null=0 */
###   @41='{"key": "value"}' /* JSON meta=4 nullable=1 is_null=0 */   注意:这里是有空格的
###   @42=1993 /* YEAR meta=0 nullable=1 is_null=0 */

但是删除数据时读取到的数据是:{"key":"value"}没有空格,导致redis写入list时删除不了这一条数据,读取到的数据我打印了日志如下: org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter [] - emitRecord record: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1731461820, file=mysql-bin.000003, pos=6490, gtids=ba95c6ad-a15e-11ef-8bc7-0242c0a8d002:1-19, row=1, server_id=223344, event=2}} ConnectRecord{topic='mysql_binlog_source.mysql_cdc.mysql_cdc_e2e_source_table', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.mysql_cdc.mysql_cdc_e2e_source_table.Key:STRUCT}, value=Struct{before=Struct{id=1,f_binary=java.nio.HeapByteBuffer[pos=0 lim=64 cap=64],f_blob=java.nio.HeapByteBuffer[pos=0 lim=5 cap=5],f_long_varbinary=java.nio.HeapByteBuffer[pos=0 lim=34 cap=34],f_tinyblob=java.nio.HeapByteBuffer[pos=0 lim=8 cap=8],f_varbinary=java.nio.HeapByteBuffer[pos=0 lim=11 cap=11],f_smallint=12345,f_smallint_unsigned=54321,f_mediumint=123456,f_mediumint_unsigned=654321,f_int=1234567,f_int_unsigned=7654321,f_integer=1234567,f_integer_unsigned=7654321,f_bigint=123456789,f_bigint_unsigned=987654321,f_numeric=123,f_decimal=789,f_float=12.34000015258789,f_double=56.78,f_double_precision=90.12,f_longtext=This is a long text field,f_mediumtext=This is a medium text field,f_text=This is a text field,f_tinytext=This is a tiny text field,f_varchar=This is a varchar field,f_date=19109,f_datetime=1651069800000,f_timestamp=2023-04-27T11:08:40Z,f_bit1=true,f_bit64=[B@52869ec9,f_char=C,f_enum=enum2,f_mediumblob=java.nio.HeapByteBuffer[pos=0 lim=37 cap=37],f_long_varchar=This is a long varchar field,f_real=12.345,f_time=52200000000,f_tinyint=-128,f_tinyint_unsigned=255,f_json={"key":"value"},f_year=1991},source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1731461820000,db=mysql_cdc,table=mysql_cdc_e2e_source_table,server_id=223344,gtid=ba95c6ad-a15e-11ef-8bc7-0242c0a8d002:20,file=mysql-bin.000003,pos=6759,row=0,thread=24},op=d,ts_ms=1731461820320}, valueSchema=Schema{mysql_binlog_source.mysql_cdc.mysql_cdc_e2

Hisoka-X commented 1 week ago

@Hisoka-X 你好,我遇到了一个问题请教下,我发现MySQL CDC实时读取时获取json类型的数据时,读取到的结构与数据库中原始数据不一致,想请问下这里是如何处理的? 原始数据是:f_json是{"key": "value"}中间有空格,我查看了MySQL binlog也是有空格的,如下:

### DELETE FROM `mysql_cdc`.`mysql_cdc_e2e_source_table`
### WHERE
###   @1=3 /* INT meta=0 nullable=0 is_null=0 */
###   @2='abct' /* STRING(64) meta=65088 nullable=1 is_null=0 */
###   @3='hello' /* BLOB/TEXT meta=2 nullable=1 is_null=0 */
###   @4='\x18\x00\x00\x00x▒\x0b▒▒,V\x00▒D▒▒▒t▒▒Ģ▒̼ĢJ\x00is\b▒' /* MEDIUMBLOB/MEDIUMTEXT meta=3 nullable=1 is_null=0 */
###   @5=NULL /* LONGBLOB/LONGTEXT meta=4 nullable=1 is_null=1 */
###   @6='tinyblob' /* TINYBLOB/TINYTEXT meta=1 nullable=1 is_null=0 */
###   @7='Hello world' /* VARSTRING(100) meta=100 nullable=1 is_null=0 */
###   @8=12345 /* SHORTINT meta=0 nullable=1 is_null=0 */
###   @9=-11215 (54321) /* SHORTINT meta=0 nullable=1 is_null=0 */
###   @10=123456 /* MEDIUMINT meta=0 nullable=1 is_null=0 */
###   @11=654321 /* MEDIUMINT meta=0 nullable=1 is_null=0 */
###   @12=1234567 /* INT meta=0 nullable=1 is_null=0 */
###   @13=7654321 /* INT meta=0 nullable=1 is_null=0 */
###   @14=1234567 /* INT meta=0 nullable=1 is_null=0 */
###   @15=7654321 /* INT meta=0 nullable=1 is_null=0 */
###   @16=123456789 /* LONGINT meta=0 nullable=1 is_null=0 */
###   @17=987654321 /* LONGINT meta=0 nullable=1 is_null=0 */
###   @18=123 /* DECIMAL(10,0) meta=2560 nullable=1 is_null=0 */
###   @19=789 /* DECIMAL(10,0) meta=2560 nullable=1 is_null=0 */
###   @20=12.34                /* FLOAT meta=4 nullable=1 is_null=0 */
###   @21=56.780000000000001137 /* DOUBLE meta=8 nullable=1 is_null=0 */
###   @22=90.120000000000004547 /* DOUBLE meta=8 nullable=1 is_null=0 */
###   @23='This is a long text field' /* LONGBLOB/LONGTEXT meta=4 nullable=1 is_null=0 */
###   @24='This is a medium text field' /* MEDIUMBLOB/MEDIUMTEXT meta=3 nullable=1 is_null=0 */
###   @25='This is a text field' /* BLOB/TEXT meta=2 nullable=1 is_null=0 */
###   @26='This is a tiny text field' /* TINYBLOB/TINYTEXT meta=1 nullable=1 is_null=0 */
###   @27='This is a varchar field' /* VARSTRING(200) meta=200 nullable=1 is_null=0 */
###   @28='2022:04:27' /* DATE meta=0 nullable=1 is_null=0 */
###   @29='2022-04-27 14:30:00' /* DATETIME(0) meta=0 nullable=1 is_null=0 */
###   @30=1682593720 /* TIMESTAMP(0) meta=0 nullable=1 is_null=0 */
###   @31=b'1' /* BIT(1) meta=1 nullable=1 is_null=0 */
###   @32=b'0101010101010101010101010101010101010101010101010101010101010101' /* BIT(64) meta=2048 nullable=1 is_null=0 */
###   @33='C' /* STRING(4) meta=65028 nullable=1 is_null=0 */
###   @34=2 /* ENUM(1 byte) meta=63233 nullable=1 is_null=0 */
###   @35='\x1b\x00\x00\x00x▒\x0b▒▒,V\x00▒D▒▒Ԕ▒▒\\▒▒▒▒$▒▒▒Ԝ\x14\x00▒▒\t▒' /* MEDIUMBLOB/MEDIUMTEXT meta=3 nullable=1 is_null=0 */
###   @36='This is a long varchar field' /* MEDIUMBLOB/MEDIUMTEXT meta=3 nullable=1 is_null=0 */
###   @37=12.345000000000000639 /* DOUBLE meta=8 nullable=1 is_null=0 */
###   @38='14:30:00' /* TIME(0) meta=0 nullable=1 is_null=0 */
###   @39=-128 (128) /* TINYINT meta=0 nullable=1 is_null=0 */
###   @40=-1 (255) /* TINYINT meta=0 nullable=1 is_null=0 */
###   @41='{"key": "value"}' /* JSON meta=4 nullable=1 is_null=0 */   注意:这里是有空格的
###   @42=1993 /* YEAR meta=0 nullable=1 is_null=0 */

但是删除数据时读取到的数据是:{"key":"value"}没有空格,导致redis写入list时删除不了这一条数据,读取到的数据我打印了日志如下: org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter [] - emitRecord record: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1731461820, file=mysql-bin.000003, pos=6490, gtids=ba95c6ad-a15e-11ef-8bc7-0242c0a8d002:1-19, row=1, server_id=223344, event=2}} ConnectRecord{topic='mysql_binlog_source.mysql_cdc.mysql_cdc_e2e_source_table', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.mysql_cdc.mysql_cdc_e2e_source_table.Key:STRUCT}, value=Struct{before=Struct{id=1,f_binary=java.nio.HeapByteBuffer[pos=0 lim=64 cap=64],f_blob=java.nio.HeapByteBuffer[pos=0 lim=5 cap=5],f_long_varbinary=java.nio.HeapByteBuffer[pos=0 lim=34 cap=34],f_tinyblob=java.nio.HeapByteBuffer[pos=0 lim=8 cap=8],f_varbinary=java.nio.HeapByteBuffer[pos=0 lim=11 cap=11],f_smallint=12345,f_smallint_unsigned=54321,f_mediumint=123456,f_mediumint_unsigned=654321,f_int=1234567,f_int_unsigned=7654321,f_integer=1234567,f_integer_unsigned=7654321,f_bigint=123456789,f_bigint_unsigned=987654321,f_numeric=123,f_decimal=789,f_float=12.34000015258789,f_double=56.78,f_double_precision=90.12,f_longtext=This is a long text field,f_mediumtext=This is a medium text field,f_text=This is a text field,f_tinytext=This is a tiny text field,f_varchar=This is a varchar field,f_date=19109,f_datetime=1651069800000,f_timestamp=2023-04-27T11:08:40Z,f_bit1=true,f_bit64=[B@52869ec9,f_char=C,f_enum=enum2,f_mediumblob=java.nio.HeapByteBuffer[pos=0 lim=37 cap=37],f_long_varchar=This is a long varchar field,f_real=12.345,f_time=52200000000,f_tinyint=-128,f_tinyint_unsigned=255,f_json={"key":"value"},f_year=1991},source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1731461820000,db=mysql_cdc,table=mysql_cdc_e2e_source_table,server_id=223344,gtid=ba95c6ad-a15e-11ef-8bc7-0242c0a8d002:20,file=mysql-bin.000003,pos=6759,row=0,thread=24},op=d,ts_ms=1731461820320}, valueSchema=Schema{mysql_binlog_source.mysql_cdc.mysql_cdc_e2

Please create an issue for this. Looks like it is a bug.

Hisoka-X commented 1 week ago

Also, you can use fake source to produce delete data. Please refer https://github.com/apache/seatunnel/blob/af4fd8b9336cd511b8affdf540068d066de78ff4/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf#L40

lm-ylj commented 1 week ago

@Hisoka-X 你好,我遇到了一个问题请教下,我发现MySQL CDC实时读取时获取json类型的数据时,读取到的结构与数据库中原始数据不一致,想请问下这里是如何处理的? 原始数据是:f_json是{"key": "value"}中间有空格,我查看了MySQL binlog也是有空格的,如下:

### DELETE FROM `mysql_cdc`.`mysql_cdc_e2e_source_table`
### WHERE
###   @1=3 /* INT meta=0 nullable=0 is_null=0 */
###   @2='abct' /* STRING(64) meta=65088 nullable=1 is_null=0 */
###   @3='hello' /* BLOB/TEXT meta=2 nullable=1 is_null=0 */
###   @4='\x18\x00\x00\x00x▒\x0b▒▒,V\x00▒D▒▒▒t▒▒Ģ▒̼ĢJ\x00is\b▒' /* MEDIUMBLOB/MEDIUMTEXT meta=3 nullable=1 is_null=0 */
###   @5=NULL /* LONGBLOB/LONGTEXT meta=4 nullable=1 is_null=1 */
###   @6='tinyblob' /* TINYBLOB/TINYTEXT meta=1 nullable=1 is_null=0 */
###   @7='Hello world' /* VARSTRING(100) meta=100 nullable=1 is_null=0 */
###   @8=12345 /* SHORTINT meta=0 nullable=1 is_null=0 */
###   @9=-11215 (54321) /* SHORTINT meta=0 nullable=1 is_null=0 */
###   @10=123456 /* MEDIUMINT meta=0 nullable=1 is_null=0 */
###   @11=654321 /* MEDIUMINT meta=0 nullable=1 is_null=0 */
###   @12=1234567 /* INT meta=0 nullable=1 is_null=0 */
###   @13=7654321 /* INT meta=0 nullable=1 is_null=0 */
###   @14=1234567 /* INT meta=0 nullable=1 is_null=0 */
###   @15=7654321 /* INT meta=0 nullable=1 is_null=0 */
###   @16=123456789 /* LONGINT meta=0 nullable=1 is_null=0 */
###   @17=987654321 /* LONGINT meta=0 nullable=1 is_null=0 */
###   @18=123 /* DECIMAL(10,0) meta=2560 nullable=1 is_null=0 */
###   @19=789 /* DECIMAL(10,0) meta=2560 nullable=1 is_null=0 */
###   @20=12.34                /* FLOAT meta=4 nullable=1 is_null=0 */
###   @21=56.780000000000001137 /* DOUBLE meta=8 nullable=1 is_null=0 */
###   @22=90.120000000000004547 /* DOUBLE meta=8 nullable=1 is_null=0 */
###   @23='This is a long text field' /* LONGBLOB/LONGTEXT meta=4 nullable=1 is_null=0 */
###   @24='This is a medium text field' /* MEDIUMBLOB/MEDIUMTEXT meta=3 nullable=1 is_null=0 */
###   @25='This is a text field' /* BLOB/TEXT meta=2 nullable=1 is_null=0 */
###   @26='This is a tiny text field' /* TINYBLOB/TINYTEXT meta=1 nullable=1 is_null=0 */
###   @27='This is a varchar field' /* VARSTRING(200) meta=200 nullable=1 is_null=0 */
###   @28='2022:04:27' /* DATE meta=0 nullable=1 is_null=0 */
###   @29='2022-04-27 14:30:00' /* DATETIME(0) meta=0 nullable=1 is_null=0 */
###   @30=1682593720 /* TIMESTAMP(0) meta=0 nullable=1 is_null=0 */
###   @31=b'1' /* BIT(1) meta=1 nullable=1 is_null=0 */
###   @32=b'0101010101010101010101010101010101010101010101010101010101010101' /* BIT(64) meta=2048 nullable=1 is_null=0 */
###   @33='C' /* STRING(4) meta=65028 nullable=1 is_null=0 */
###   @34=2 /* ENUM(1 byte) meta=63233 nullable=1 is_null=0 */
###   @35='\x1b\x00\x00\x00x▒\x0b▒▒,V\x00▒D▒▒Ԕ▒▒\\▒▒▒▒$▒▒▒Ԝ\x14\x00▒▒\t▒' /* MEDIUMBLOB/MEDIUMTEXT meta=3 nullable=1 is_null=0 */
###   @36='This is a long varchar field' /* MEDIUMBLOB/MEDIUMTEXT meta=3 nullable=1 is_null=0 */
###   @37=12.345000000000000639 /* DOUBLE meta=8 nullable=1 is_null=0 */
###   @38='14:30:00' /* TIME(0) meta=0 nullable=1 is_null=0 */
###   @39=-128 (128) /* TINYINT meta=0 nullable=1 is_null=0 */
###   @40=-1 (255) /* TINYINT meta=0 nullable=1 is_null=0 */
###   @41='{"key": "value"}' /* JSON meta=4 nullable=1 is_null=0 */   注意:这里是有空格的
###   @42=1993 /* YEAR meta=0 nullable=1 is_null=0 */

但是删除数据时读取到的数据是:{"key":"value"}没有空格,导致redis写入list时删除不了这一条数据,读取到的数据我打印了日志如下: org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter [] - emitRecord record: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1731461820, file=mysql-bin.000003, pos=6490, gtids=ba95c6ad-a15e-11ef-8bc7-0242c0a8d002:1-19, row=1, server_id=223344, event=2}} ConnectRecord{topic='mysql_binlog_source.mysql_cdc.mysql_cdc_e2e_source_table', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.mysql_cdc.mysql_cdc_e2e_source_table.Key:STRUCT}, value=Struct{before=Struct{id=1,f_binary=java.nio.HeapByteBuffer[pos=0 lim=64 cap=64],f_blob=java.nio.HeapByteBuffer[pos=0 lim=5 cap=5],f_long_varbinary=java.nio.HeapByteBuffer[pos=0 lim=34 cap=34],f_tinyblob=java.nio.HeapByteBuffer[pos=0 lim=8 cap=8],f_varbinary=java.nio.HeapByteBuffer[pos=0 lim=11 cap=11],f_smallint=12345,f_smallint_unsigned=54321,f_mediumint=123456,f_mediumint_unsigned=654321,f_int=1234567,f_int_unsigned=7654321,f_integer=1234567,f_integer_unsigned=7654321,f_bigint=123456789,f_bigint_unsigned=987654321,f_numeric=123,f_decimal=789,f_float=12.34000015258789,f_double=56.78,f_double_precision=90.12,f_longtext=This is a long text field,f_mediumtext=This is a medium text field,f_text=This is a text field,f_tinytext=This is a tiny text field,f_varchar=This is a varchar field,f_date=19109,f_datetime=1651069800000,f_timestamp=2023-04-27T11:08:40Z,f_bit1=true,f_bit64=[B@52869ec9,f_char=C,f_enum=enum2,f_mediumblob=java.nio.HeapByteBuffer[pos=0 lim=37 cap=37],f_long_varchar=This is a long varchar field,f_real=12.345,f_time=52200000000,f_tinyint=-128,f_tinyint_unsigned=255,f_json={"key":"value"},f_year=1991},source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1731461820000,db=mysql_cdc,table=mysql_cdc_e2e_source_table,server_id=223344,gtid=ba95c6ad-a15e-11ef-8bc7-0242c0a8d002:20,file=mysql-bin.000003,pos=6759,row=0,thread=24},op=d,ts_ms=1731461820320}, valueSchema=Schema{mysql_binlog_source.mysql_cdc.mysql_cdc_e2

Please create an issue for this. Looks like it is a bug.

ok

lm-ylj commented 1 week ago

Also, you can use fake source to produce delete data. Please refer

https://github.com/apache/seatunnel/blob/af4fd8b9336cd511b8affdf540068d066de78ff4/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf#L40

done