SAP / kafka-connect-sap

Kafka Connect SAP is a set of connectors, using the Apache Kafka Connect framework for reliably connecting Kafka with SAP systems
Apache License 2.0
122 stars 56 forks source link

Increment mode is not working properly for large string incremental columns #138

Closed nithins1989 closed 1 year ago

nithins1989 commented 1 year ago

Hi, I am using below connector :

curl -i -X PUT http://localhost:8083/connectors/sap_inv_avro_inc_source_04/config \ -H "Content-Type: application/json" \ -d '{ "connector.class": "com.sap.kafka.connect.source.hana.HANASourceConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url" : "http://schema-registry:8081", "value.converter.schema.registry.url" : "http://schema-registry:8081", "value.converter.schemas.enable": true, "topics": "inventoryavroinc04", "quickstart": "inventoryavroinc04", "iterations": 10000000, "tasks.max": "1", "connection.url": "jdbc:sap://host:30015", "connection.user": "", "connection.password": "", "inventoryavroinc04.table.name": "\"SAPABAP1\".\"ZEUODFIVSSTRV4\"", "mode": "incrementing", "inventoryavroinc04.incrementing.column.name": "CONCATKEY", "db.timezone": "Europe/Paris", "batch.max.rows" : "100000", "transforms":"copyFieldToKey,extractKeyFromStruct", "transforms.copyFieldToKey.type":"org.apache.kafka.connect.transforms.ValueToKey", "transforms.copyFieldToKey.fields":"CONCATKEY", "transforms.extractKeyFromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractKeyFromStruct.field":"CONCATKEY", "errors.log.enable": "true", "errors.log.include.messages": "true" }'

    My incremental column is of example : CONCATKEY : 20220822162214000000000XW0XW0059510000100000871925753986800THAS0600000

And the total number of rows in the source CDS view are 42 million. When loading to Kafka, around 50k messages are lost or duplicated.

Any idea how this can be mitigated ?

Thanks, Nithin

elakito commented 1 year ago

There are a couple of open tickets complaining about the incremental mode. Unfortunately, it is very difficult to identify the cause. Can you repeat the ingestion with the same data? From your description, you are getting the different records lost or duplicated each time? And you observe no error? Thanks.

nithins1989 commented 1 year ago

We were able to identify the root cause.

We are using CDS snapshot views as the source and Incremental mode was using LIMIT and OFFSET SQL to read the data. As Snapshots were not returning data in the same order always, results from LIMIT and OFFSET SQL were having duplicates and missing data.