apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.42k stars 951 forks source link

[Bug] fix `float` type convert error in DebeziumSchemaUtils#fromDebeziumType #3846

Closed chunji96 closed 3 months ago

chunji96 commented 3 months ago

Search before asking

Paimon version

commit id : 6da33cb6072ce90ad311450e0143aa9bf069d624

Compute Engine

Flink

Minimal reproduce step

when use KafkaSyncDatabaseAction ,if field type in debezium-json format is float e.g.

{"type":"float","optional":false,"default":1,"field":"c1"}

will cause error

What doesn't meet your expectations?

firstly, we use Kafka CDC to synchronize data from kafka to paimon and kafka format is debezium-json.

in piamon table the field c1 type is float,in kafka the c1 type is float too,but cause the follow error :

Caused by: java.lang.UnsupportedOperationException: Cannot convert field c1 from type FLOAT to STRING of Paimon table testdb.test_table.
    at org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunctionBase.applySchemaChange(UpdatedDataFieldsProcessFunctionBase.java:125) ~[?:?]
    at org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction.processElement(UpdatedDataFieldsProcessFunction.java:60) ~[?:?]
    at org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction.processElement(UpdatedDataFieldsProcessFunction.java:39) ~[?:?]
    at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) ~[flink-dist-1.17.0-ctrip-RELEASE.jar:1.17.0-ctrip-RELEASE]
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) ~[flink-dist-1.17.0-ctrip-RELEASE.jar:1.17.0-ctrip-RELEASE]
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) ~[flink-dist-1.17.0-ctrip-RELEASE.jar:1.17.0-ctrip-RELEASE]
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.17.0-ctrip-RELEASE.jar:1.17.0-ctrip-RELEASE]
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.17.0-ctrip-RELEASE.jar:1.17.0-ctrip-RELEASE]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-dist-1.17.0-ctrip-RELEASE.jar:1.17.0-ctrip-RELEASE]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.17.0-ctrip-RELEASE.jar:1.17.0-ctrip-RELEASE]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.0-ctrip-RELEASE.jar:1.17.0-ctrip-RELEASE]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.0-ctrip-RELEASE.jar:1.17.0-ctrip-RELEASE]
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.0-ctrip-RELEASE.jar:1.17.0-ctrip-RELEASE]
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) ~[flink-dist-1.17.0-ctrip-RELEASE.jar:1.17.0-ctrip-RELEASE]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~[flink-dist-1.17.0-ctrip-RELEASE.jar:1.17.0-ctrip-RELEASE]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.17.0-ctrip-RELEASE.jar:1.17.0-ctrip-RELEASE]
    at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_121]

fix

I find in org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils#fromDebeziumType, if dbzType is float DataType will convert to DataTypes.STRING()

private static DataType fromDebeziumType(String dbzType) {
        switch (dbzType) {
            case "int8":
                return DataTypes.TINYINT();
            case "int16":
                return DataTypes.SMALLINT();
            case "int32":
                return DataTypes.INT();
            case "int64":
                return DataTypes.BIGINT();
            case "float32":
            case "float64":
                return DataTypes.FLOAT();
            case "double":
                return DataTypes.DOUBLE();
            case "boolean":
                return DataTypes.BOOLEAN();
            case "bytes":
                return DataTypes.BYTES();
            case "string":
            default:
                return DataTypes.STRING();
        }
    }

so just add "float" situation as following :

    private static DataType fromDebeziumType(String dbzType) {
        switch (dbzType) {
            case "int8":
                return DataTypes.TINYINT();
            case "int16":
                return DataTypes.SMALLINT();
            case "int32":
                return DataTypes.INT();
            case "int64":
                return DataTypes.BIGINT();
            case "float":
            case "float32":
            case "float64":
                return DataTypes.FLOAT();
            case "double":
                return DataTypes.DOUBLE();
            case "boolean":
                return DataTypes.BOOLEAN();
            case "bytes":
                return DataTypes.BYTES();
            case "string":
            default:
                return DataTypes.STRING();
        }
    }

Anything else?

No response

Are you willing to submit a PR?

chunji96 commented 3 months ago

merged