apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.06k stars 1.83k forks source link

[Bug] [Connector-MySQL CDC] Inconsistent data format when deleting JSON type data #8041

Open lm-ylj opened 1 week ago

lm-ylj commented 1 week ago

Search before asking

What happened

I found that when MySQL CDC processes JSON type data, the structure read is inconsistent with the original data in the database The original data is: f_json is {"key": "value"} with a space in between. I checked MySQL binlog and it also has a space, as shown below:

### DELETE FROM `mysql_cdc`.`mysql_cdc_e2e_source_table`
### WHERE
###   @1=3 /* INT meta=0 nullable=0 is_null=0 */
###   @2='abct' /* STRING(64) meta=65088 nullable=1 is_null=0 */
###   @3='hello' /* BLOB/TEXT meta=2 nullable=1 is_null=0 */
###   @4='\x18\x00\x00\x00x▒\x0b▒▒,V\x00▒D▒▒▒t▒▒Ģ▒̼ĢJ\x00is\b▒' /* MEDIUMBLOB/MEDIUMTEXT meta=3 nullable=1 is_null=0 */
###   @5=NULL /* LONGBLOB/LONGTEXT meta=4 nullable=1 is_null=1 */
###   @6='tinyblob' /* TINYBLOB/TINYTEXT meta=1 nullable=1 is_null=0 */
###   @7='Hello world' /* VARSTRING(100) meta=100 nullable=1 is_null=0 */
###   @8=12345 /* SHORTINT meta=0 nullable=1 is_null=0 */
###   @9=-11215 (54321) /* SHORTINT meta=0 nullable=1 is_null=0 */
###   @10=123456 /* MEDIUMINT meta=0 nullable=1 is_null=0 */
###   @11=654321 /* MEDIUMINT meta=0 nullable=1 is_null=0 */
###   @12=1234567 /* INT meta=0 nullable=1 is_null=0 */
###   @13=7654321 /* INT meta=0 nullable=1 is_null=0 */
###   @14=1234567 /* INT meta=0 nullable=1 is_null=0 */
###   @15=7654321 /* INT meta=0 nullable=1 is_null=0 */
###   @16=123456789 /* LONGINT meta=0 nullable=1 is_null=0 */
###   @17=987654321 /* LONGINT meta=0 nullable=1 is_null=0 */
###   @18=123 /* DECIMAL(10,0) meta=2560 nullable=1 is_null=0 */
###   @19=789 /* DECIMAL(10,0) meta=2560 nullable=1 is_null=0 */
###   @20=12.34                /* FLOAT meta=4 nullable=1 is_null=0 */
###   @21=56.780000000000001137 /* DOUBLE meta=8 nullable=1 is_null=0 */
###   @22=90.120000000000004547 /* DOUBLE meta=8 nullable=1 is_null=0 */
###   @23='This is a long text field' /* LONGBLOB/LONGTEXT meta=4 nullable=1 is_null=0 */
###   @24='This is a medium text field' /* MEDIUMBLOB/MEDIUMTEXT meta=3 nullable=1 is_null=0 */
###   @25='This is a text field' /* BLOB/TEXT meta=2 nullable=1 is_null=0 */
###   @26='This is a tiny text field' /* TINYBLOB/TINYTEXT meta=1 nullable=1 is_null=0 */
###   @27='This is a varchar field' /* VARSTRING(200) meta=200 nullable=1 is_null=0 */
###   @28='2022:04:27' /* DATE meta=0 nullable=1 is_null=0 */
###   @29='2022-04-27 14:30:00' /* DATETIME(0) meta=0 nullable=1 is_null=0 */
###   @30=1682593720 /* TIMESTAMP(0) meta=0 nullable=1 is_null=0 */
###   @31=b'1' /* BIT(1) meta=1 nullable=1 is_null=0 */
###   @32=b'0101010101010101010101010101010101010101010101010101010101010101' /* BIT(64) meta=2048 nullable=1 is_null=0 */
###   @33='C' /* STRING(4) meta=65028 nullable=1 is_null=0 */
###   @34=2 /* ENUM(1 byte) meta=63233 nullable=1 is_null=0 */
###   @35='\x1b\x00\x00\x00x▒\x0b▒▒,V\x00▒D▒▒Ԕ▒▒\\▒▒▒▒$▒▒▒Ԝ\x14\x00▒▒\t▒' /* MEDIUMBLOB/MEDIUMTEXT meta=3 nullable=1 is_null=0 */
###   @36='This is a long varchar field' /* MEDIUMBLOB/MEDIUMTEXT meta=3 nullable=1 is_null=0 */
###   @37=12.345000000000000639 /* DOUBLE meta=8 nullable=1 is_null=0 */
###   @38='14:30:00' /* TIME(0) meta=0 nullable=1 is_null=0 */
###   @39=-128 (128) /* TINYINT meta=0 nullable=1 is_null=0 */
###   @40=-1 (255) /* TINYINT meta=0 nullable=1 is_null=0 */
###   @41='{"key": "value"}' /* JSON meta=4 nullable=1 is_null=0 */   Attention: There is a space here
###   @42=1993 /* YEAR meta=0 nullable=1 is_null=0 */

But when I delete data, the data in SeaTunnelRecord was: {"key":"value"} without any spaces. It lead to Redis cannot delete data because the data format is inconsistent. I printed the log as follows: org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter [] - emitRecord record: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1731461820, file=mysql-bin.000003, pos=6490, gtids=ba95c6ad-a15e-11ef-8bc7-0242c0a8d002:1-19, row=1, server_id=223344, event=2}} ConnectRecord{topic='mysql_binlog_source.mysql_cdc.mysql_cdc_e2e_source_table', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.mysql_cdc.mysql_cdc_e2e_source_table.Key:STRUCT}, value=Struct{before=Struct{id=1,f_binary=java.nio.HeapByteBuffer[pos=0 lim=64 cap=64],f_blob=java.nio.HeapByteBuffer[pos=0 lim=5 cap=5],f_long_varbinary=java.nio.HeapByteBuffer[pos=0 lim=34 cap=34],f_tinyblob=java.nio.HeapByteBuffer[pos=0 lim=8 cap=8],f_varbinary=java.nio.HeapByteBuffer[pos=0 lim=11 cap=11],f_smallint=12345,f_smallint_unsigned=54321,f_mediumint=123456,f_mediumint_unsigned=654321,f_int=1234567,f_int_unsigned=7654321,f_integer=1234567,f_integer_unsigned=7654321,f_bigint=123456789,f_bigint_unsigned=987654321,f_numeric=123,f_decimal=789,f_float=12.34000015258789,f_double=56.78,f_double_precision=90.12,f_longtext=This is a long text field,f_mediumtext=This is a medium text field,f_text=This is a text field,f_tinytext=This is a tiny text field,f_varchar=This is a varchar field,f_date=19109,f_datetime=1651069800000,f_timestamp=2023-04-27T11:08:40Z,f_bit1=true,f_bit64=[B@52869ec9,f_char=C,f_enum=enum2,f_mediumblob=java.nio.HeapByteBuffer[pos=0 lim=37 cap=37],f_long_varchar=This is a long varchar field,f_real=12.345,f_time=52200000000,f_tinyint=-128,f_tinyint_unsigned=255,f_json={"key":"value"},f_year=1991},source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1731461820000,db=mysql_cdc,table=mysql_cdc_e2e_source_table,server_id=223344,gtid=ba95c6ad-a15e-11ef-8bc7-0242c0a8d002:20,file=mysql-bin.000003,pos=6759,row=0,thread=24},op=d,ts_ms=1731461820320}, valueSchema=Schema{mysql_binlog_source.mysql_cdc.mysql_cdc_e2

Does anyone know the reason?

SeaTunnel Version

dev

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
  read_limit.bytes_per_second=7000000
  read_limit.rows_per_second=400
}

source {
  MySQL-CDC {
    username = "st_user_source"
    password = "mysqlpw"
    table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
    base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
  }
}

sink {
  Redis {
    host = "redis-e2e"
    port = 6379
    auth = "U2VhVHVubmVs"
    key = "list_check"
    data_type = list
    batch_size = 33
  }
}

Running Command

run e2e test case
/tmp/seatunnel/bin/start-seatunnel-flink-13-connector-v2.sh --config /tmp/mysqlcdc-to-redis-test-delete-list.conf --name fake-to-redis-test-delete-zset.conf

Error Exception

no exception

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct