jcustenborder / kafka-connect-spooldir

Kafka Connect connector for reading CSV files into Kafka.
Apache License 2.0
165 stars 124 forks source link

Unable to write JSON schemaless events #185

Open mroiter-larus opened 3 years ago

mroiter-larus commented 3 years ago

Hi @jcustenborder,

I’m having some trouble trying to use SMT functions with SpoolDirSchemaLessJsonSourceConnector. I would like to simply ingest some schemaless JSON events from a file into a topic, applying the ReplaceField SMT function. Here is the connector configuration:

name=testReplaceField
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector
tasks.max=1
topic=testReplaceField
input.path=/tmp/data
input.file.pattern=test-replaceField-no-schema.json
error.path=/tmp/data/error
finished.path=/tmp/data/finished
halt.on.error=false
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
transforms=RenameField
transforms.RenameField.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameField.renames=foo:c1,bar:c2

my sample source file is populated as follow:

{"foo": 1, "bar": "test1"}
{"foo": 2, "bar": "test2"}
{"foo": 3, "bar": "test3"}

Despite i disabled the schemas for both the key and the value, it seems the SMT function is still interpreting my JSON events as if they had a schema. I got the following exception from the connector logs:

[2021-06-16 17:23:36,933] ERROR WorkerSourceTask{id=testReplaceField-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:341)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [field replacement], found: java.lang.String
    at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
    at org.apache.kafka.connect.transforms.ReplaceField.applyWithSchema(ReplaceField.java:167)
    at org.apache.kafka.connect.transforms.ReplaceField.apply(ReplaceField.java:146)
    at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
    ... 11 more

As you can see into the stacktrace, it keep going into the applyWithSchema method, which obviously fails! As suggested here, i already tried to use StringConverter instead of JsonConverter but with no luck. Same error.

Am i doing something wrong??

Thanks in advance!

Regards,

Mauro

jcustenborder commented 3 years ago

Hi @mroiter-larus,

Kafka Connect is a little weird. It has it's own type system that is independent to how the data is serialized. For example a Struct is basically a row like structure. In Json this would be an object like your examples. In Avro it's a Record. So basically what is happening is this connector is using jackson to stream the file breaking by each object boundary. It hands this off as a string. If you were using only the string converter you would be done. In your example you're running a transformation so it looks like this SpoolDirSchemaLessJsonSourceConnector -> RenameField -> Converter It's breaking at the RenameField part. That SMT is saying that it doesn't support Strings. You have a couple options. You could use something like FromJson and parse the data by a json schema, then run your rename field. Another option would be convert it post with something like KSQL.

https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-json-schema/transformations/examples/FromJson.inline.html

mroiter-larus commented 3 years ago

Hi @jcustenborder ,

Thanks a lot for your answer!

I tried the first approach you suggested (using FromJson). Actually it works, but doing so the JSON events, that were initially schemaless, are treated as they had schema (and i think this exactly what is expected by the FromJson transformation). I mean, the RenameField step is still running the applyWithSchema method and i would like to prevent this.

Is there a way to do the ReplaceField step so that the executed method should be the applySchemaless?

Thanks a lot!

jcustenborder commented 3 years ago

Off the top of my head I'm not sure. I don't know how much I would worry about it being schemaless or with schema. The converter could be Apache Kafka Json Converter which would remove the schema or you can use the JsonSchemaConverter I wrote which will attach the schema as a header. It's in the same project. That would give you json in your topic.

https://github.com/jcustenborder/kafka-connect-json-schema/blob/master/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverter.java