confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
1.01k stars 953 forks source link

How to pass avro schema and payload using AvroConverter #1337

Open sonawanedheeraj6 opened 1 year ago

sonawanedheeraj6 commented 1 year ago

I have implemented the JDBC sink using JsonConverter and it is working correctly as expected. Following is my JsonConverter Schema which i am trying to publish.

let schema = {
    schema: {
      type: "struct",
      fields: [
        {
          type: "string",
          optional: false,
          field: "name",
        },
        {
          type: "string",
          optional: false,
          field: "email",
        },
        {
          type: "string",
          optional: false,
          field: "address",
        }
        {
          type: "string",
          optional: false,
          field: "uuid",
        },
      ],
      optional: false,
      name: "ACT_EQUIP",
    },
    payload: {
      name: "Alex",
      email: "alex@gamil.com",
      address: "India",
      uuid: "6419aa1c99a138a29db0e48c",
    },
  };

But now i want to use the AvroConverter and want to pass the payload like the above schema without using schema registry.

I tried updating the schema as per AvroConverter as follows.

let schema = {
    schema: {
      type: "record",
      fields: [
        {
          type: "string",
          optional: false,
          field: "name",
        },
        {
          type: "string",
          optional: false,
          field: "email",
        },
        {
          type: "string",
          optional: false,
          field: "address",
        }
        {
          type: "string",
          optional: false,
          field: "uuid",
        },
      ],
      optional: false,
      name: "ACT_EQUIP",
    },
    payload: {
      name: "Alex",
      email: "alex@gamil.com",
      address: "India",
      uuid: "6419aa1c99a138a29db0e48c",
    },
  };

But after passing the above schema i am getting following error

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema type: record
    at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:528)
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:371)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
2023-05-09 07:01:40,545 ERROR [kpi-connector|task-0] WorkerSinkTask{id=kpi-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-kpi-connector-0]
2023-05-09 07:01:40,546 INFO [kpi-connector|task-0] Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask) [task-thread-kpi-connector-0]

Can anyone please suggest how can i use the jsonConverter Schema for AvroConverter? Orelse can anyone tell how can we pass the avro schema with payload without using the avro schema registry?

OneCricketeer commented 1 year ago

without using schema registry

AvroConverter class requires Schema Registry

use the jsonConverter Schema for AvroConverter

Not possible. Avro is not JSON. You cannot have schema embedded in the record, only the payload as avro-serialized bytes.

how can we pass the avro schema with payload without using the avro schema registry

Use a different converter https://github.com/farmdawgnation/registryless-avro-converter