SAP / kafka-connect-sap

Kafka Connect SAP is a set of connectors, using the Apache Kafka Connect framework for reliably connecting Kafka with SAP systems
Apache License 2.0
119 stars 54 forks source link

HANA source connector with Incrementing mode is missing some messages #106

Open srkpers opened 2 years ago

srkpers commented 2 years ago

We have run several tests to replicate messages from HANA table into Kafka topic using HANA source connector by using Incrementing column (Timestamp based with microsecond precision). At random we are noticing that the number of rows in HANA table is not matching with the number of messages in the Kafka topic. Over a period of time when more rows are inserted in HANA the difference with messages in topic is increasing. It appears the select statement which the connector is running to fetch data from HANA table has some issue and it is skipping some rows. Not sure where the exact issue is. For this testing we are using a HANA table which has 22 partitions and has more than 4 billion rows. We are creating the connector offset ahead of time before launching the connector so we get the messages from a certain date/timestamp onwards or else it will start replicating the entire table. When there is no activity or very low activity the rows in HANA and messages in topic match but over a period of time when there is more activity there is discrepancy in the count. We tested with 22 kafka partitions, 22 tasks and in another test used just 1 partition and 1 task. Basically tried multiple combinations with different tasks, partitions, polling internal, batch max rows etc but the issue is still there. Any input on what can be done?

elakito commented 2 years ago

@srkpers I may be wrong but I suspect the problem occurs because the timestamp values are not strictly incrementing and may have duplicates. When a series of records with the same timestamp values may be inserted into the source table not at once. In that case, the first polls by the connector may fetch n records whose last timestamp value is inc_col = t, and the following fetch with the where-clause where inc_col > t will miss the remaining records with timestamp inc_col = t.

If this is indeed what is happening and the incrementing column's values are not strictly increasing, we could think about the following options.

  1. drop some records having the same incrementing values
  2. allow some duplicate records having the same incrementing values
  3. when primary keys are provided, use the key values to filter out the duplicates among those having the same incrementing values
srkpers commented 2 years ago

@elakito For the option one we did check for duplicates with same timestamp but the difference in number of rows in HANA table and number of messages in Kafka topic is much higher than the number of duplicates. So it appears that when the connector is reading the HANA table from certain offset onwards, it returns certain number of rows but in between that there may be some additional inserts/updates going on with an older timestamp and those records are being missed completely.

elakito commented 2 years ago

Each poll by the source connector will update its incrementing boundary value. Therefore, if there are some records inserted to the table that have older timestamp values, those records won't be read. So, none of the three options will work for such source table.

In other words, if you don't have a column that has values monotonically increasing with the physical time, you cannot use the incrementing mode. Your option would be https://github.com/SAP/kafka-connect-sap/issues/105, which will be updated with more info.

srkpers commented 2 years ago

@elakito We have tested by introducing HANA identity column (Sequence) and used that column as incrementing column which will produce monotonous incrementing values. There will not be any duplicates which this approach but we still noticed missing rows. Could you please update #105 with info on any tentative timeline when it will be available. It will help us a lot to plan for testing the connector. Thank you.

elakito commented 2 years ago

@srkpers Maybe the above fix regarding the incremental query using timestamp values has solved this problem. Could you try it again?

srkpers commented 2 years ago

@srkpers Maybe the above fix regarding the incremental query using timestamp values has solved this problem. Could you try it again?

@elakito That issue does not apply to us. In our case the Timestamp field is being added by SLT as part of replicating data from SAP ECC to HANA Database and it calculates the Timestamp value in the SLT instance and not in HANA database.

elakito commented 2 years ago

@srkpers The mentioned problem affects the timestamp based incremental queries in general and it definitely affected your scenario as well unless your system's timezone was set to UTC. But since you also mentioned that you observed the problem when using a plain sequencing column, there could be another cause how some records are missing.