redis-field-engineering / redis-kafka-connect

Redis Kafka Connector (Source and Sink) by Redis
https://redis-field-engineering.github.io/redis-kafka-connect/
Apache License 2.0
25 stars 16 forks source link

Keys are not delete from redis when redis.command is SET #32

Open valentinno7 opened 9 months ago

valentinno7 commented 9 months ago

We have noticed that in v0.9.0 (currently we are using redis/redis-enterprise-kafka:6.7.4 and don't have this issue) keys are not deleted from redis if message value is null when redis.command is SET, instead of deletion value is set to empty string. Here https://redis-field-engineering.github.io/redis-kafka-connect/#_sink_string it says: String or bytes. If value is null the key is deleted. So we assume that redis item deletion is expected behaviour. Providing example reproduced using jupyter notebooks: image

CJPoll commented 1 month ago

I'm experiencing this issue too. In fact, it kills the whole connector.

My connector configuration (using terraform to define):

resource "kafka-connect_connector" "redis-kafka-connect" {
  name = "redis-kafka-connect"

  config = {
    "name": "redis-kafka-connect",
    "connector.class": "com.redis.kafka.connect.RedisSinkConnector",
    "tasks.max": "1",
    "topics": "${var.topic}",
    "redis.uri": "redis://redis:6379",
    "redis.command": "SET",

    # The key is serialized using Avro, but for visibility in tooling we want to convert it to just the entity UUID using a Single-Message Transform
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",

    # The value is also serialized in Avro, but we don't want to make any changes - leave it serialized in Redis
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",

    # Take the `id` field from the key and replace the entire key with just the UUID value. Picture below showing that this transform works as expected for create/update events
    "transforms": "ExtractField",
    "transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.ExtractField.field": "id"
  }
} 

This setup works for creates/updates image

But it kills the connector when the value is a null value.

Error: org.apache.kafka.connect.errors.DataException: The value for the record must be a string or byte array. Consider using the StringConverter or ByteArrayConverter if the data is stored in Kafka in the format needed in Redis. (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect  | 2024-07-09T05:05:18.013549922Z com.redis.kafka.connect.shaded.io.lettuce.core.RedisException: org.apache.kafka.connect.errors.DataException: The value for the record must be a string or byte array. Consider using the StringConverter or ByteArrayConverter if the data is stored in Kafka in the format needed in Redis. 
connect  | 2024-07-09T05:05:18.013552003Z       at com.redis.kafka.connect.shaded.io.lettuce.core.internal.Exceptions.fromSynchronization(Exceptions.java:106)
connect  | 2024-07-09T05:05:18.013553622Z       at com.redis.kafka.connect.shaded.com.redis.spring.batch.common.AbstractOperationExecutor.process(AbstractOperationExecutor.java:124)                                                                       
connect  | 2024-07-09T05:05:18.013555046Z       at com.redis.kafka.connect.shaded.com.redis.spring.batch.writer.AbstractOperationItemWriter.write(AbstractOperationItemWriter.java:47)
connect  | 2024-07-09T05:05:18.013556414Z       at com.redis.kafka.connect.sink.RedisSinkTask.put(RedisSinkTask.java:328)                                                                                                                                  
connect  | 2024-07-09T05:05:18.013557686Z       at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
connect  | 2024-07-09T05:05:18.013558962Z       at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)                                                                                                                            
connect  | 2024-07-09T05:05:18.013560234Z       at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
connect  | 2024-07-09T05:05:18.013561502Z       at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)                                                                                                                         
connect  | 2024-07-09T05:05:18.013562770Z       at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
connect  | 2024-07-09T05:05:18.013564125Z       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)                                                                                                                                     
connect  | 2024-07-09T05:05:18.013565430Z       at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)                                                                                                            
connect  | 2024-07-09T05:05:18.013566721Z       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect  | 2024-07-09T05:05:18.013568017Z       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)                                                                                                                                       
connect  | 2024-07-09T05:05:18.013569932Z       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)                                                                                                                
connect  | 2024-07-09T05:05:18.013571252Z       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect  | 2024-07-09T05:05:18.013572582Z       at java.base/java.lang.Thread.run(Thread.java:829)