SAP / kafka-connect-sap

Kafka Connect SAP is a set of connectors, using the Apache Kafka Connect framework for reliably connecting Kafka with SAP systems
Apache License 2.0
119 stars 54 forks source link

HANA sink connector is not working #123

Closed srkpers closed 2 years ago

srkpers commented 2 years ago

Tested the below scenario and unable to get HANA Sink connector to work.

Sample message in Kafka topic (file attached)

Got the below error when connector tried to create HANA table. This was when the auto create parameter in connector config was set to true. Looks like the sql command formulated by the HANA Sink connector to create the table has issue as it is not having any field names.

[INFO] 2022-02-10 16:25:40,571 [task-thread-bw1-hana-source-and-sink-test2-0] com.sap.kafka.client.hana.HANAJdbcClient $anonfun$createTable$1 - Creating table:ZSKTEST2 with SQL: CREATE COLUMN TABLE "KAFKA"."ZSKTEST2" () [ERROR] 2022-02-10 16:25:40,619 [task-thread-bw1-hana-source-and-sink-test2-0] com.sap.kafka.client.hana.HANAJdbcClient $anonfun$createTable$1 - Error during table creation com.sap.db.jdbc.exceptions.JDBCDriverException: SAP DBTech JDBC: [257] (at 41): sql syntax error: incorrect syntax near ")": line 1 col 41 (at pos 41)

Also tried by creating the target table in HANA manually and got the below error in connector log

[INFO] 2022-02-10 16:29:17,386 [task-thread-bw1-hana-source-and-sink-test2-0] com.sap.kafka.connect.sink.hana.HANASinkTask put - PHASE - 1 ended for task, with assigned partitions [bw1-hana-source-test2-0] [ERROR] 2022-02-10 16:29:17,386 [task-thread-bw1-hana-source-and-sink-test2-0] org.apache.kafka.connect.runtime.WorkerSinkTask deliverMessages - WorkerSinkTask{id=bw1-hana-source-and-sink-test2-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Table "KAFKA"."ZSKTEST2" has an incompatible schema to the record Schema. Auto Evolution of schema is not supported com.sap.kafka.utils.SchemaNotMatchedException: Table "KAFKA"."ZSKTEST2" has an incompatible schema to the record Schema. Auto Evolution of schema is not supported Sample message.txt

srkpers commented 2 years ago

Did some troubleshooting and noticed issue with Schema Registry. Got the Sink connector working. Will do further testing. Thank you.

srkpers commented 2 years ago

Did further testing by using the below example https://github.com/SAP/kafka-connect-sap/blob/master/examples/inventory7db/README.md Debezium MySQL Source connector is replicating from MySQL database table to Kafka topic HANA Sink connector is replicating the Kafka topic to HANA Database table Test scenario

  1. Initial replication worked. The table got created in HANA and the number of messages from Kafka topic got created as number of rows in HANA table
  2. Inserted a row in MySQL table and it made it to HANA table
  3. Updated a row in MySQL table and saw the same row got updated in HANA table
  4. Deleted a row in MySQL table and noticed the __deleted field in the HANA table did not get updated to "true". It is still showing as "false". So the delete thing did not work. Here are the logs from the HANA Sink connector corresponding to the Delete.

INFO] 2022-02-11 02:19:39,331 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test1-0] com.sap.kafka.connect.sink.hana.HANASinkTask put - Number of Records read for Sink: 2 [INFO] 2022-02-11 02:19:39,331 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test1-0] com.sap.kafka.connect.sink.hana.HANAWriter write - write records to HANA [INFO] 2022-02-11 02:19:39,331 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test1-0] com.sap.kafka.connect.sink.hana.HANAWriter write - initialize connection to HANA [INFO] 2022-02-11 02:19:39,333 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test1-0] com.sap.kafka.connect.sink.hana.HANASinkTask add - Table "MYSQLDOC2"."ZPERSONS" exists. Validate the schema and check if schema needs to evolve [INFO] 2022-02-11 02:19:39,334 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test1-0] com.sap.kafka.connect.sink.hana.HANAWriter flush - flush records into HANA [INFO] 2022-02-11 02:19:39,334 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test1-0] com.sap.kafka.client.hana.AbstractHANAPartitionLoader prepareInsertIntoStmt - Creating prepared statement: UPSERT "MYSQLDOC2"."ZPERSONS" ("persons_id", "first_name", "last_name", "__deleted") VALUES (?, ?, ?, ?) WITH PRIMARY KEY [INFO] 2022-02-11 02:19:39,336 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test1-0] com.sap.kafka.client.hana.AbstractHANAPartitionLoader $anonfun$loadPartition$3 - Skipping update for the to-be-deleted record [WARN] 2022-02-11 02:19:39,336 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test1-0] com.sap.kafka.client.hana.AbstractHANAPartitionLoader $anonfun$loadPartition$3 - Missing schema for the tombstone record [INFO] 2022-02-11 02:19:39,337 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test1-0] com.sap.kafka.connect.sink.hana.HANAWriter write - flushing records to HANA successful [INFO] 2022-02-11 02:19:39,337 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test1-0] com.sap.kafka.connect.sink.hana.HANASinkTask put - PHASE - 1 ended for task, with assigned partitions [mysqldocker2.mysqldb.zpersons-0]

The above connector logs indicate it skipped the update for the to-be-deleted record

srkpers commented 2 years ago

Small correction. The above test had delete enabled parameter as "true". I had wrongly stated that it was set to "false". So with that parameter set as "true" the record from HANA table should have got deleted but it did not. You can see in the connector log that it formulated an UPSERT statement instead of a DELETE statement. I then tested by setting delete enabled parameter as "false" and that worked as per the documentation. The __deleted field value changed from "false" to "true" for the deleted record. The record itself did not get deleted which is what the documentation also says.

So if we set the deleted enabled parameter as "true" then the HANA record is not getting deleted as it is forming an UPSERT statement instead of DELETE statement. Let me know if any additional information is needed to fix this issue? Thank you.

elakito commented 2 years ago

@srkpers Thank you for reporting this problem. Initially, I thought the problem was caused by some changes in debezium 1.7.0. But it seems to be coming from somewhere. I'll have to look into it...

elakito commented 2 years ago

@srkpers The deletion logic is tied to how debezium source connector generates records. For each record deleted, there is one record which has __deleted=true and a tombstone message that has an empty body but has the key schema value set. The first record is ignored and just logged as Skipping update for the to-be-deleted record and the second record, which is the tombstone record for this deletion is used to create a delete statement. But in your case, this tombstone record does not include its key schema, as logged with Missing schema for the tombstone record. As a result, the delete statement is not created.

When you execute a single record deletion. You should see Number of Records read for Sink: 2 because of the above two records mentioned. And you should see one insert or upsert prepare statement (because this code is called for each batch and it it not known when no insert/upsert record is in the batch). In addition, you should see one delete prepare statement.

Could it be that your table has no primary keys?

But in addition, in the current inventory7db version, I observed that sometimes the sink connector gets only 1 record when a deletion is executed. In this case, the connector doesn't see the tombstone record and the deletion won't get executed. I haven't found out why sometimes only one record is put to the connector. In that case, we might want to trigger the deletion with the first record and not with the second.

srkpers commented 2 years ago

@elakito I can see that 2 records are produced by Debezium MySQL Source connector for delete in MySQL table and also see 2 records are read by the HANA Sink connector. I have attached a file with entire HANA Sink connector log entries and below listed just the ones which are imporatant parameters related with how the tombstone and delete records will be handled.

"auto.create": "true", "auto.evolve": "true", "mysqldocker2.mysqldb.zpersons.table.name": "\"MYSQLDOC2\".\"ZPERSONS\"", "mysqldocker2.mysqldb.zpersons.table.type": "column", "mysqldocker2.mysqldb.zpersons.insert.mode": "upsert", "mysqldocker2.mysqldb.zpersons.delete.enabled": "true", "mysqldocker2.mysqldb.zpersons.pk.fields": "persons_id", "mysqldocker2.mysqldb.zpersons.pk.mode": "record_key", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.delete.handling.mode": "none",

HANA_Sink_Connector_config.txt

Here are the HANA log entries

[INFO] 2022-02-14 04:51:52,626 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test2-0] com.sap.kafka.connect.sink.hana.HANASinkTask put - Number of Records read for Sink: 2 [INFO] 2022-02-14 04:51:52,626 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test2-0] com.sap.kafka.connect.sink.hana.HANAWriter write - write records to HANA [INFO] 2022-02-14 04:51:52,626 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test2-0] com.sap.kafka.connect.sink.hana.HANAWriter write - initialize connection to HANA [INFO] 2022-02-14 04:51:52,628 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test2-0] com.sap.kafka.connect.sink.hana.HANASinkTask add - Table "MYSQLDOC2"."ZPERSONS" exists. Validate the schema and check if schema needs to evolve [INFO] 2022-02-14 04:51:52,628 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test2-0] com.sap.kafka.connect.sink.hana.HANAWriter flush - flush records into HANA [INFO] 2022-02-14 04:51:52,629 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test2-0] com.sap.kafka.client.hana.AbstractHANAPartitionLoader prepareInsertIntoStmt - Creating prepared statement: UPSERT "MYSQLDOC2"."ZPERSONS" ("persons_id", "first_name", "last_name") VALUES (?, ?, ?) WITH PRIMARY KEY [WARN] 2022-02-14 04:51:52,630 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test2-0] com.sap.kafka.client.hana.AbstractHANAPartitionLoader $anonfun$loadPartition$3 - Missing schema for the tombstone record [WARN] 2022-02-14 04:51:52,630 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test2-0] com.sap.kafka.client.hana.AbstractHANAPartitionLoader $anonfun$loadPartition$3 - Missing schema for the tombstone record

So the "Missing schema for the tombstone record" message is there. Not sure why. The table has Primary key and I can see the Tombstone record in the kafka topic has the Key with primary key. I have taken screenshot of the Tombstone message and provided that in the attached word document Tombstone_record_in_Kafka_topic.docx

If I use "rewrite" for delete handling mode then additional column called "__deleted" is getting created in the HANA table and that has value "true" or "false". By default it will have "false" and if a record is deleted then it will have a value of "true". This is working as it is supposed to.

elakito commented 2 years ago

@srkpers I don't know why there is no key schema included. Are you using MySQL debezium connector? Could you configure the transformation at the source connector by adding the below parameters to the source connector's configuration.

    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite"

and dump the messages placed in the kafka topic for a single insert followed by a single delete which deletes this row? For example, you can run this command to dump the messages with the key. kafka-console-consumer.sh --bootstrap-server <bootstrap-server> --topic <topic> --property print.key=true

There should be three messages. Do all of them include the key schema?

srkpers commented 2 years ago

@elakito Yes. Noticed the key schema related config was not done in the Debezium MySQL Source connector and as a result the Schema for Key was not created in the Confluent Schema Registry. Had to incorporate the below additional config parameters in source connector after which the the schema for Key got created and the messages in Kafka topic had schema for both value and key. After this change the record in HANA table got deleted.

"key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "https://schemaregistry.confluent.svc.cluster.local:8081", "key.converter.schema.registry.basic.auth.credentials.source": "USER_INFO", "key.converter.schema.registry.basic.auth.user.info": "xyz:xyz", "key.converter.schema.registry.ssl.truststore.password": "xyz", "key.converter.schema.registry.ssl.truststore.location": "/mnt/sslcerts/truststore.p12", "key.converter.schemas.enable": "true",

Here are the HANA Sink connector log entries corresponding to delete operation. The log did not say that 2 messages were received by the SINK connector but I can see both UPSERT and DELETE statements.

[INFO] 2022-02-14 16:47:02,294 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test3-0] com.sap.kafka.connect.sink.hana.HANASinkTask put - Number of Records read for Sink: 1 [INFO] 2022-02-14 16:47:02,294 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test3-0] com.sap.kafka.connect.sink.hana.HANAWriter write - write records to HANA [INFO] 2022-02-14 16:47:02,294 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test3-0] com.sap.kafka.connect.sink.hana.HANAWriter write - initialize connection to HANA [INFO] 2022-02-14 16:47:02,297 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test3-0] com.sap.kafka.connect.sink.hana.HANASinkTask add - Table "MYSQLDOC2"."ZPERSONS" exists. Validate the schema and check if schema needs to evolve [INFO] 2022-02-14 16:47:02,297 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test3-0] com.sap.kafka.connect.sink.hana.HANAWriter flush - flush records into HANA [INFO] 2022-02-14 16:47:02,298 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test3-0] com.sap.kafka.client.hana.AbstractHANAPartitionLoader prepareInsertIntoStmt - Creating prepared statement: UPSERT "MYSQLDOC2"."ZPERSONS" ("persons_id", "first_name", "last_name") VALUES (?, ?, ?) WITH PRIMARY KEY [INFO] 2022-02-14 16:47:02,299 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test3-0] com.sap.kafka.client.hana.AbstractHANAPartitionLoader prepareDeleteFromStmt - Creating prepared statement: DELETE FROM "MYSQLDOC2"."ZPERSONS" WHERE "persons_id" = ? [INFO] 2022-02-14 16:47:02,303 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test3-0] com.sap.kafka.connect.sink.hana.HANAWriter write - flushing records to HANA successful [INFO] 2022-02-14 16:47:02,303 [task-thread-bw1-mysqldocker-to-hana-sink-avro-test3-0] com.sap.kafka.connect.sink.hana.HANASinkTask put - PHASE - 1 ended for task, with assigned partitions [mysqldocker3.mysqldb.zpersons-0]

elakito commented 2 years ago

@srkpers I think you use no rewrite mode, so I suppose there is no pseudo rewritten record with __delete=true. In that case, one record is received. And I suppose the deletion of the record was executed at HANA in that case Currently, as I mentioned earlier, the code prepares for one upsert/insert statement even it is never used. This part needs refactoring.

srkpers commented 2 years ago

@elakito Right. I have configured "transforms.unwrap.delete.handling.mode": "none", so there is no column called "__deleted". The deletion of row in HANA table worked as expected. I will do some additional testing but will go ahead and close this issue. Thank you for all the input to get this sorted out.