ably / kafka-connect-ably

Kafka Connector for publishing data from Kafka to Ably
https://ably.com/solutions/extend-kafka-to-the-edge
Apache License 2.0
11 stars 5 forks source link

Use of 'Dynamic Channel Configuration' is either broken or unclear #153

Closed MattBabbage closed 10 months ago

MattBabbage commented 1 year ago

Hi,

For reference I have little experience with Kafka Connect so this may be a simple issue. I am using the connector using the .zip upload method to Confluent Cloud. All works well until wanting to use 'Dynamic Channel Configuration' and reference subvariables within the channel. i.e. "channel": "chat-#{value.userId}".

The readme reference the following:

Referencing data within a record key or value is only possible when making use of Kafka Connector Schema Support

After following the guide and using the most common methods it doesn't seem to be functional? Here is the example configuration:

{
  "channel": "chat-#{value.teamId}",
  "client.id": "Ably-Kafka-Connector",
  "client.key": "Key",
  "group.id": "ably-connect-cluster",
  "message.name": "chat_message",
  "topics": "inbound-chat-messages-test",
  "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
  "value.converter":"io.confluent.connect.json.JsonSchemaConverter",
  "value.converter.schema.registry.url":"http://localhost:8081",
  "value.converter.schemas.enable": "false"
}

I am using the Confluent Schema Registry, and as such, I am not sending the Json Schema in with the message (value.converter.schemas.enable is false). This is seemingly recommended via the json connect documentation

this is the error message getting sent back:

    WorkerSinkTask{id=clcc-7pmz8j-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
Tolerance exceeded in error handler

Hide stacktrace

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:244)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:166)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:528)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:503)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:339)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:238)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:227)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:282)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic inbound-chat-messages-test: 
    at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:133)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:528)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:190)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:224)
    ... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
    at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:192)
    at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:267)
    at io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:179)
    at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:116)
    ... 18 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:252)
    at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:116)
    ... 21 more

This occurs when sending this message:

{
    "title":"This is a title",
    "message":"This is a message",
    "image":"https://i.natgeofe.com/n/548467d8-c5f1-4551-9f58-6817a8d2c45e/NationalGeographic_2572187_square.jpg",
    "link":"https://www.bbc.co.uk/news/topics/cvjk7vv0w40t",
    "tenantId":"company1",
    "teamId":"team1"
}

with this Json Schema in attached to the topic with schema registry using confluent cloud:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "$id": "http://example.com/myURI.schema.json",
  "title": "ChatMessage",
  "description": "Sample schema to help you get started.",
  "type": "object",
  "additionalProperties": false,
  "properties": {
    "title": {
      "type": "string",
      "description": "The integer type is used for integral numbers."
    },
    "message": {
      "type": "string",
      "description": "The number type is used for any numeric type, either integers or floating point numbers."
    },
    "image": {
      "type": "string",
      "description": "The string type is used for strings of text."
    },
    "link": {
      "type": "string",
      "description": "The string type is used for strings of text."
    },
    "tenantId": {
      "type": "string",
      "description": "The string type is used for strings of text."
    },
    "teamId": {
      "type": "string",
      "description": "The string type is used for strings of text."
    }
  }
}

Any advice is much appreciated! Cheers, Matt

sync-by-unito[bot] commented 1 year ago

➤ Automation for Jira commented:

The link to the corresponding Jira issue is https://ably.atlassian.net/browse/SDK-3780

jaley commented 11 months ago

Hi @MattBabbage,

Apologies for the long delay, I haven't been working on this for a few months, I just happened to notice your message in my GH notifications. Wanted to drop you a quick message in case you're still stuck on this as I saw exactly the same error while testing.

If I remember correctly, the reason this was happening for me had nothing to do with the Ably Kafka Connector configuration, but actually the configuration of the client application sending the messages. That error suggests to me that the client application isn't configured to use a Kafka Connector value serialiser and is perhaps just packing raw JSON into the value payload? This is exactly what I was doing initially in the loadtest app in this repository. I fixed it by letting the Kafka client do JSON serialisation for me, which causes it to wrap the JSON payload in an envelope with some magic bytes at the beginning. That stack trace suggests the JSON Deserialiser you've (correctly) configured on the Ably Connector side isn't able to find the magic bytes it expects.

The changes I made in the Python loadtest app to fix this were:

Have you got any other connectors in your deployment reading the values successfully? Note that I expect it'll just be components in the Kafka Connect ecosystem that require this, because they need values and keys to be serialized in some known way to construct datatypes in the common Kafka Connect format. If you're writing your own consumer code generally, it won't be a problem as you know how to unpack your own keys/values.

Hope that helps!

ttypic commented 11 months ago

Hi @MattBabbage,

Is the issue still relevant? I can confirm what @jaley said, it looks like problem on the client that sending the message.

ttypic commented 10 months ago

Hi @MattBabbage,

Unfortunately, we are unable to proceed with resolving it as there were no specific details provided. If you encounter this issue again or have additional details to share, please feel free to reopen this issue with the necessary information. We are here to help and will be happy to assist you further.