apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.58k stars 1.66k forks source link

【BUG】When synchronizing data from MySQL CDC to Kafka, certain fields are automatically converted to scientific notation #7010

Open zhangxx0123 opened 2 weeks ago

zhangxx0123 commented 2 weeks ago

Search before asking

What happened

Mysql-Cdc同步数据到kafka,某些字段自动转换成科学计数法

SeaTunnel Version

seatunnel-2.3.4

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 15000
}

source {
  MySQL-CDC {
    result_table_name = "xxx"
    username = "xxx"
    password = "xxx"
    hostname = xxx
    base-url="jdbc:mysql://xxxxx/xxx"
    "startup.mode"=INITIAL
    table-names=[
        "xxx"
    ]

    # compatible_debezium_json options
    format = compatible_debezium_json
    debezium = {
        # include schema into kafka message
        key.converter.schemas.enable = false
        value.converter.schemas.enable = false
        # include ddl
        include.schema.changes = true
        # topic prefix
       database.server.name =  "xxxx"
    }
  }
}

sink {
  Kafka {
    source_table_name = "xxxx"

    bootstrap.servers = "xxxxx:9092"

    # compatible_debezium_json options
    format = compatible_debezium_json
  }
}

Running Command

sh bin/seatunnel.sh --config config/mysql2kafka

Error Exception

Column2字段值显示为科学计数法
{"before":null,"after":{"Column1":2,"Column2":0E-12},"source":{"version":"1.6.4.Final","connector":"mysql","name":"dm","ts_ms":1718680791000,"snapshot":"false","db":"dm","sequence":null,"table":"zxxxe","server_id":1,"gtid":null,"file":"binlog.000009","pos":559866317,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1718680791858,"transaction":null}

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

3C9D293C-5DCF-4728-A040-EB7352035A8C

Are you willing to submit PR?

Code of Conduct

Carl-Zhou-CN commented 2 weeks ago

Can you provide the mysql table construction clause and some test data so that we can reproduce the problem?

zhangxx0123 commented 2 weeks ago

你能提供mysql表构造子句和一些测试数据,以便我们重现问题吗?

CREATE TABLE zxxxe ( Column1 int NOT NULL, Column2 decimal(20,12) DEFAULT NULL, Column3 decimal(10,8) DEFAULT NULL, Column4 decimal(10,7) DEFAULT NULL, Column5 decimal(10,6) DEFAULT NULL, Column6 decimal(10,5) DEFAULT NULL, PRIMARY KEY (Column1) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

INSERT INTO dm.zxxxe (Column1,Column2,Column3,Column4,Column5,Column6) VALUES (1,0.000000000000,0.00000000,0.0000000,0.000000,0.00000);

Carl-Zhou-CN commented 2 weeks ago

你能提供mysql表构造子句和一些测试数据,以便我们重现问题吗?

CREATE TABLE zxxxe ( Column1 int NOT NULL, Column2 decimal(20,12) DEFAULT NULL, Column3 decimal(10,8) DEFAULT NULL, Column4 decimal(10,7) DEFAULT NULL, Column5 decimal(10,6) DEFAULT NULL, Column6 decimal(10,5) DEFAULT NULL, PRIMARY KEY (Column1) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

INSERT INTO dm.zxxxe (Column1,Column2,Column3,Column4,Column5,Column6) VALUES (1,0.000000000000,0.00000000,0.0000000,0.000000,0.00000);

I will try

Carl-Zhou-CN commented 1 week ago

I have located the reason, which is that JsonConverter in DebeziumJsonConverter cannot be converted to the Decimal type of scientific notation normally

image

Carl-Zhou-CN commented 1 week ago
@Test
public void testSerializeDecimalToNumber()
        throws InvocationTargetException, IllegalAccessException, JsonProcessingException {
    String key = "k";
    String value = "v";
    Struct keyStruct =
            new Struct(SchemaBuilder.struct().field(key, Decimal.builder(12).build()).build());
    keyStruct.put(key, BigDecimal.valueOf(0000000000001l,12));
    Struct valueStruct =
            new Struct(SchemaBuilder.struct().field(value, Decimal.builder(12).build()).build());
    valueStruct.put(value, BigDecimal.valueOf(0000000000001l,12));

    SourceRecord sourceRecord =
            new SourceRecord(
                    Collections.emptyMap(),
                    Collections.emptyMap(),
                    null,
                    keyStruct.schema(),
                    keyStruct,
                    valueStruct.schema(),
                    valueStruct);

    DebeziumJsonConverter converter = new DebeziumJsonConverter(false, false);
    Assertions.assertEquals("{\"k\":0000000000001}", converter.serializeKey(sourceRecord));
    Assertions.assertEquals("{\"v\":0000000000001}", converter.serializeValue(sourceRecord));
}
Carl-Zhou-CN commented 1 week ago

@hailin0 Seems like a big adjustment to the json schema, don't you think

Carl-Zhou-CN commented 1 week ago

@zhangxx0123 Try to add
debezium { decimal.handling.mode = string }

zhangxx0123 commented 1 week ago

@zhangxx0123 Try to add debezium { decimal.handling.mode = string }

Thank you very much, try to add decimal.handling.mode = string, it looks like there will still be scientific notation, I changed it to decimal.handling.mode = double, it looks OK, I need to test further

Carl-Zhou-CN commented 1 week ago

@zhangxx0123 Try to add debezium { decimal.handling.mode = string }

Thank you very much, try to add decimal.handling.mode = string, it looks like there will still be scientific notation, I changed it to decimal.handling.mode = double, it looks OK, I need to test further

ok, looking forward to more feedback

zhangxx0123 commented 3 days ago

尝试添加 debezium { decimal.handling.mode = string }

非常感谢,尝试添加 decimal.handling.mode = 字符串,看起来还会有科学记数法,我把它改成了 decimal.handling.mode = double,看起来还可以,我需要进一步测试

好的,期待更多反馈

After a new round of testing, it was found that adding decimal.handling.mode =double did not work. In the initial default configuration, scientific notation occurs only if the field type is decimal, the decimal point is greater than 6 places, and the value is 0