scylladb / kafka-connect-scylladb

Kafka Connect Scylladb Sink
Apache License 2.0
42 stars 22 forks source link

Is it possible to use kafka message key as primary key for table? #51

Closed clemensk1 closed 3 years ago

clemensk1 commented 3 years ago

Hi,

how can I use the kafka message key as primary key in a table?

example:

kafka message key:

{ "id": 1 }

kafka message value:

{ 
   "department": "foo",
   "name": "bar"
}

desired table:

id | department      | name
---+-----------------+-----------
 1 | foo             | bar

Error message:

Caused by: org.apache.kafka.connect.errors.DataException: record.valueSchema() must contain all of the fields in record.keySchema(). record.keySchema() is used by the connector to determine the key for the table. record.valueSchema() is missing field 'id'. record.valueSchema() is used by the connector to persist data to the table in ScyllaDb. Here are the available fields for record.valueSchema(department, name) and record.keySchema(id).

my connector config:

topics: scyllatest2
scylladb.keyspace: test
scylladb.keyspace.create.enabled: true
scylladb.contact.points: scylla-eu-central-1-eu-central-1c-0.scylla
scylladb.consistency.level: ANY
behavior.on.malformed.documents: warn
errors.tolerance: all
errors.log.enable: true
errors.log.include.messages: true
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://kafka-schema-registry-cp-schema-registry.my-kafka-project:8081
key.converter: io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: http://kafka-schema-registry-cp-schema-registry.my-kafka-project:8081

I use avro schema for both the key and the value of the topic.

Many Thanks!

avelanarius commented 3 years ago

The connector already uses the Kafka key to determine the primary key for the Scylla table. However, the connector requires that those fields in the key of Kafka message must also appear in the value of Kafka message. For example this would work correctly:

Kafka key:

{ "id": 1 }

Kafka value:

{ 
   "id": 1,
   "department": "foo",
   "name": "bar"
}

We will consider relaxing this requirement of the connector in the future, so it would work on your example without problems.

However, you can use Kafka SMT (Single Message Transform) to transform your messages (on the fly, the original topic is not affected) into the required format. Unfortunately, it will require a few transforms and installing additional transform:

First, we will use KeyToValueTransform, available here: https://github.com/jzaralim/KeyToValueTransform (installation is mvn package and copying JAR file to classpath). This transform will copy key to value as follows:

{ 
   "rowkey": {"id": 1},
   "department": "foo",
   "name": "bar"
}

Next, we will use Flatten$Value transform, so that id is not nested in the value:

{ 
   "rowkey.id": 1,
   "department": "foo",
   "name": "bar"
}

Finally, using ReplaceField$Value, we will rename rowkey.id to id:

{ 
   "id": 1,
   "department": "foo",
   "name": "bar"
}

This is what has to be added to the configuration:

transforms = keyToValue, flatten, rename
transforms.keyToValue.type = com.github.jzaralim.kafka.connect.transform.keytovalue.KeyToValueTransform
transforms.flatten.type = org.apache.kafka.connect.transforms.Flatten$Value
transforms.rename.type = org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.rename.renames = rowkey.id:id
clemensk1 commented 3 years ago

Thank you very much for this detailed answer, I'll give it a try :)

dbolshak commented 1 year ago

Hello @clemensk1 ,

I have managed to solve the issue by using columns mapping.