SAP / kafka-connect-sap

Kafka Connect SAP is a set of connectors, using the Apache Kafka Connect framework for reliably connecting Kafka with SAP systems
Apache License 2.0
119 stars 54 forks source link

Unable to read delete/update data #74

Open Wangddawei opened 3 years ago

Wangddawei commented 3 years ago

Kafka-connect-SAP synchronize data from HANA to Kafka, but the test result is only read data after INSERT operation, DELETE and UPDATE are not able to read data, Kafka can not read data. I see the code you wrote, there is a Query mode in it, as if it can only read the data in HANA, other operations can not get the data, is that right? Or am I missing something?

elakito commented 3 years ago

@Wangddawei you are right. The hana source connector can only handle only the bulk (the entire table at once) or the incrementing (the newly inserted records) fetching. In other words, there is no CDC (i.e., fetching the insert, update, delete events) version of the source connector available at the moment due to some questions that need to be clarified. In contrast, the hana sink connector can handle both update and delete.

We will keep this ticket open to track the status of the source connector's update/delete handling.

Wangddawei commented 3 years ago

Can HANA's data flow to Kafka be configured only in source files? I didn't configure the sink file, I think the sink file is just configuring Kafka -- > HANA. One-way synchronization

elakito commented 3 years ago

@Wangddawei Yes. The source and sink connectors are independent and they can be configured independently and combined with other source or sink connectors.

Wangddawei commented 3 years ago

thank you very much!

garethdebcor commented 3 years ago

We have the connector setup, however incrementing mode is not allowing us to read new records. Example - the MARA table which has all the material / SKU records. When a new record is created, how do you setup the incrementing column to pickup the new record? The incrementing column needs to be a numeric/integer based column, but the key field on MARA is MATNR, which is CHAR40. Is there any trick to getting a table to download the deltas? Or do I need to create a CDS view which auto creates a column which indexes?

elakito commented 3 years ago

@garethdebcor The incrementing column can be of string based types. The only requirement is that your source table is just updated only by insert.

{topic}.incrementing.column.name - ... The type of the column can be numeric types such as INTEGER, FLOAT, DECIMAL, datetime types such as DATE, TIME, TIMESTAMP, and character types VARCHAR, NVARCHAR containing alpha-numeric characters. ...

Do you see some error when you specify this column as the incrementing column in your connector configuration?

garethdebcor commented 3 years ago

We actually don't see any error. We just see the initial load happen (14k records). Then we add a material and the logs show that the connector is working - but nothing happens (no new records show up).

Thanks, Gareth

garethdebcor commented 3 years ago

Here is our config file for reference. Do you see anything out of the ordinary? This is a small custom table with only 9 entries.

name=test-topic-source connector.class=com.sap.kafka.connect.source.hana.HANASourceConnector tasks.max=1 topics=ztest_last connection.url=jdbc:sap://sap.xxxxx..xxxx.xxxx.xxxx connection.user=USER connection.password=PASSWORD mode=incrementing ztest_last.incrementing.column.name=CARRIER_SCAC ztest_last.table.name="SAPABAP1"."ZCVY_LAST_MILE" numeric.mapping=best_fit

Here is a picture of the table

Screen Shot 2021-06-11 at 3 12 43 PM
garethdebcor commented 3 years ago

My one pet peeve is when someone posts a problem but never posts the solution. So, we ended up solving this w/a slight config change. Makes logical sense now after the fact. We swapped the lines of the declarations of the incrementing column and the table name - and the new records now show up. Makes sense from a programming standpoint as well - declare the structure and then declare the field of the structure.

Now we move on to the delete / update task.

(working config) name=test-topic-source connector.class=com.sap.kafka.connect.source.hana.HANASourceConnector tasks.max=1 topics=ztest_last connection.url=jdbc:sap://sap.xxxxx..xxxx.xxxx.xxxx connection.user=USER connection.password=PASSWORD mode=incrementing ztest_last.table.name="SAPABAP1"."ZCVY_LAST_MILE" ztest_last.incrementing.column.name=CARRIER_SCAC numeric.mapping=best_fit

elakito commented 3 years ago

@garethdebcor Thanks for investigating on the problem. It is strange that you observed the order of these two lines changed the behavior because those properties are read sequentially into a map but accessed in the order used.

Mahi4089 commented 2 years ago

We have the HANA connector setup and the initial or full load is successful, however incrementing mode is not allowing us to read new records. Example - the ZCVY_LAST_MILE , When a new record is created, the incrementing column doesn't not pickup the new record? it just polls and says not increment column updates and no error logs.

Please find the below properties that being currently used.

name=dev-topic-source_zcvy_1 connector.class=com.sap.kafka.connect.source.hana.HANASourceConnector tasks.max=1 topics=dev_zcvy_1 connection.url=jdbc:sap://XXXXXXXXXXX connection.user=XXXXXX connection.password=XXXXXXXX mode=incrementing dev_zcvy_1.table.name="SAPABAP1"."ZCVY_LAST_MILE" dev_zcvy_1.incrementing.column.name=CARRIER_SCAC numeric.mapping=best_fit

elakito commented 2 years ago

@Mahi4089 That sounds like there is some data in the initial load that has a higher incrementing value than the newly added data. Maybe you could run an SQL query: SELECT * from SAPABAP1.ZCVY_LAST_MILE WHERE CARRIER_SCAC > 'abcdef'; where abcdef is the current highest incrementing value. and insert some data that have a higher incrementing value than 'abcdef' and run again SELECT * from SAPABAP1.ZCVY_LAST_MILE WHERE CARRIER_SCAC > 'abcdef'; This should return the data that you just inserted. If this doesn't return any data, there are some data in the table that have a higher incrementing value than 'abcdef'.