SAP / kafka-connect-sap

Kafka Connect SAP is a set of connectors, using the Apache Kafka Connect framework for reliably connecting Kafka with SAP systems
Apache License 2.0
119 stars 54 forks source link

Decimal type doesn't works well #71

Closed chendrp closed 2 years ago

chendrp commented 3 years ago

column KZWI4 is decimal (13,2) in hana db. when the value is 0, it was converted to 'AA=='

{"type":"bytes","optional":false,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2"},"field":"KZWI4"}

payload: {......KZWI2":"AA==","KZWI3":"AA==","KZWI4":"AA==","KZWI5":"AA==","KZWI6":"AA==".....}

in debezium connector for mysql, we use "decimal.handling.mode": "double", to convert decimal to double,but I cannot find any config like that in the README of hana connector,so how can we get the right value of decimal in hana connector? thanks

elakito commented 3 years ago

@chendrp We don't have something similar to debezium's decimal.handling.mode option. But could you just use the standard SMT transformer to convert your decimal values? If you know the column names, you could use something like this at the HANA source connector.

        "transforms": "cast",
        "transforms.cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
        "transforms.cast.spec": "KZWI1:float64,KZWI2:float64,KZWI3:float64,,...",

The above connector configuration will use double for those matched columns. If you know the column names in advance, this will be a workaround.

chendrp commented 3 years ago

@chendrp We don't have something similar to debezium's decimal.handling.mode option. But could you just use the standard SMT transformer to convert your decimal values? If you know the column names, you could use something like this at the HANA source connector.

        "transforms": "cast",
        "transforms.cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
        "transforms.cast.spec": "KZWI1:float64,KZWI2:float64,KZWI3:float64,,...",

The above connector configuration will use double for those matched columns. If you know the column names in advance, this will be a workaround.

Thanks,it works properly now,but I have got another problem。 for type of VARVINARY,the cast parameter was set to "transforms.cast.spec": "RUUID:string“,but the result is not right。 I have check the Reference book at https://docs.confluent.io/platform/current/connect/transforms/cast.html but only string is valid.

chendrp commented 3 years ago

Another question,we got the same message twice for each row change. The two messages are exactly the same,this makes it difficult for consumer program to handle with。

{ "connector.class": "com.sap.kafka.connect.source.hana.HANASourceConnector", "mode": "incrementing", "connection.password": "", "HANA253.SAPHANADB.VBFA.table.name": "\"SAPHANADB\".\"**"", "tasks.max": "1", "topics": "HANA253.SAPHANADB.****", "connection.user": "saphanadb", "HANA253.SAPHANADB.VBFA.incrementing.column.name": "KAFKA_ID", "name": "HANA253.SAPHANADB.VBFA", "connection.url": "jdbc:sap://10...:30215/", "transforms": "cast", "transforms.cast.type": "org.apache.kafka.connect.transforms.Cast$Value", "transforms.cast.spec": "RUUID:string,RFMNG:float64,RFWRT:float64,BRGEW:float64,VOLUM:float64,NTGEW:float64" }

elakito commented 3 years ago

@chendrp For your VARBINARY question, what you mean by "the result is not right"? The converter will use the java's standard toString conversion for the byte array and that will result in something like "[B@xxxxxxx". Why can you just process the bytes as bytes? For the duplicate question, what you mean by "each row change"? In the incremental mode, there is no change. The new rows are added with an incremental value so that comparing the values of the incrementing column against the current value at each polling cycle to retrieve new records.

elakito commented 2 years ago

I am closing this ticket. I hope the above comment clarified the behavior.