hpgrahsl / kryptonite-for-kafka

Kryptonite for Kafka is a client-side 🔒 field level 🔓 cryptography library for Apache Kafka® offering a Kafka Connect SMT, ksqlDB UDFs, and a standalone HTTP API service. It's an ! UNOFFICIAL ! community project
83 stars 6 forks source link

Decryption issue when using JDBC Sink Connector #10

Closed sjm767 closed 1 year ago

sjm767 commented 1 year ago

I'm using JDBC source/sink connector to transfer data between different MSSQL Servers. And I want to encrypt a field values in Kafka, So I'm trying to using "kryptonite-for-kafka".

This is my sample source table schema.

NAME TYPE COMMENT
SEQ BIGINT PRIMARY_KEY
ITEM_NO BIGINT
ITEM_NAME VARCHAR(20) Encryption target
CHANGE_DATE DATETIME2

So, I setted JDBC SourceConnector using AvroConverter

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://{schema}:8081",
    "value.converter.schema.registry.url": "http://{schema}:8081",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true",   
    "transforms": "cipher",
    "transforms.cipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
    "transforms.cipher.cipher_mode": "ENCRYPT",
    "transforms.cipher.field_config": "[{\"name\":\"ITEM_NAME\"}]",
    "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
    "transforms.cipher.cipher_data_keys": "${file:/etc/kafka/connect-classified.properties:cipher_data_keys}",
    "transforms.cipher.predicate":"isTombstone",
    "transforms.cipher.negate":true,
    "predicates": "isTombstone",
    "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
    "connection.url": "jdbc:sqlserver://{MSSQL}:1433",
    "connection.user": "user",
    "connection.password": "pass",
    "mode": "timestamp+incrementing",
    "incrementing.column.name": "SEQ",
    "timestamp.column.name": "CHANGE_DATE",
    "validate.non.null": "true",
    "query": "SELECT SEQ, ITEM_NO, ITEM_NAME, CHANGE_DATE FROM source_table WHERE ITEM_NO = 1111",   
    "poll.interval.ms": "5000",
    "topic.prefix": "test-topic"
  }

And I successfully encryted and produced to kafka topic. here is message and Schema

VALUE
{  
  "ITEM_NO": 1111,
  "ITEM_NAME": "LQE7msoBMszuPICzYwhPzeKbzDYPzMkAS8gG2wyeZZFDoLeAnRPGfRrTImHfDDCybXktZGVtby1zZWNyZXQta2V5LTEys2ux",
  "CHANGE_DATE": {
    "long": 1686219238466
  },
  "SEQ": 44
}

VALUE SCHEMA

{
  "fields": [    
    {
      "name": "ITEM_NO",
      "type": "long"
    },
    {
      "name": "ITEM_NAME",
      "type": "string"
    },
    {
      "default": null,
      "name": "CHANGE_DATE",
      "type": [
        "null",
        {
          "[connect.name](http://connect.name/)": "org.apache.kafka.connect.data.Timestamp",
          "connect.version": 1,
          "logicalType": "timestamp-millis",
          "type": "long"
        }
      ]
    },
    {
      "name": "SEQ",
      "type": "long"
    }
  ],
  "name": "ConnectDefault",
  "namespace": "io.confluent.connect.avro",
  "type": "record"
}

And I tried to decrypt and insert the message to another MSSQL using JDBC Sink Connector. But a problem occurred here. (table schema is same)

I setted JDBC Sink Connector like this.


{
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://{schema}:8081",
    "value.converter.schema.registry.url": "http://{schema}:8081",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true",
    "topics": "test-topic",
    "transforms": "decipher",   
    "transforms.decipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
    "transforms.decipher.cipher_mode": "DECRYPT",
    "transforms.decipher.field_config": "[{\"name\":\"ITEM_NAME\"}]",
    "transforms.decipher.cipher_data_keys": "${file:/etc/kafka/connect-classified.properties:cipher_data_keys}",
    "transforms.decipher.cipher_data_key_identifier": "my-demo-secret-key-123",
    "transforms.decipher.predicate":"isTombstone",
    "transforms.decipher.negate":true,
    "predicates": "isTombstone",
    "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
    "connection.url": "jdbc:sqlserver://{MSSQL2}:1433",
    "connection.user": "user",
    "connection.password": "pass",
    "table.name.format": "test_sink_table",
    "delete.on.null.values": true
  }

I got these error messages

[2023-06-08 10:07:16,577] ERROR WorkerSinkTask{id=jaeshim_jdbc_sink_connector_cipher-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:208)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:237)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:159)
        at org.apache.kafka.connect.runtime.TransformationChain.transformRecord(TransformationChain.java:70)
        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:543)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:494)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: schema-aware data needs schema spec for DECRYPT but none was given for field path 'ITEM_NAME'
        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.lambda$derivePrimaryType$0(SchemaRewriter.java:170)
        at java.base/java.util.Optional.orElseThrow(Optional.java:408)
        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewri

I think Sink Connector using "DECRYPT" but there is any schema related to DECRYPT with 'ITEM_NAME'. I tried using JsonSchemaConverter or JsonConvert with schema, but it's the same problem.

Could you tell me what could be the problem with these errors?

sjm767 commented 1 year ago

Hello I I followed the code and tried to debug it.

As a result, I missed schama in filed_config. So, I fixed like this.

AS-IS "[{\"name\":\"ITEM_NAME\"}]"

TO-BE "[{\"name\":\"ITEM_NAME\",\"schema\": {\"type\": \"STRING\"}}]"

It was my mistake..!

hpgrahsl commented 1 year ago

Happy to hear you found the necessary configuration change to make it work! 🎉