streamthoughts / kafka-connect-file-pulse

🔗 A multipurpose Kafka Connect connector that makes it easy to parse, transform and stream any file, in any format, into Apache Kafka
https://streamthoughts.github.io/kafka-connect-file-pulse/
Apache License 2.0
322 stars 64 forks source link

v2.6.0: null values in JSON objects are not evaluated appropriately and throw an error in JSONFilter #269

Closed pri-naik5 closed 1 year ago

pri-naik5 commented 2 years ago

Describe the bug v2.6.0: null values in JSON objects (not within a struct) are not evaluated appropriately and throw an error in JSONFilter

To Reproduce

Expected behavior Processes the file without issues, auto-generates the schema with both [null, STRING] or [null, STRUCT] for field types for field1 and field4 respectively.

Screenshots With the initial file: io.streamthoughts.kafka.connect.filepulse.data.DataException: Failed to merge schemas for field 'field1'. at io.streamthoughts.kafka.connect.filepulse.data.StructSchema$StructSchemaMerger.apply(StructSchema.java:341) at io.streamthoughts.kafka.connect.filepulse.data.StructSchema.merge(StructSchema.java:260) at io.streamthoughts.kafka.connect.filepulse.data.LazyArraySchema.valueSchema(LazyArraySchema.java:55) at io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter.apply(JSONFilter.java:90) at io.streamthoughts.kafka.connect.filepulse.filter.AbstractMergeRecordFilter.apply(AbstractMergeRecordFilter.java:43) at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline$FilterNode.apply(DefaultRecordFilterPipeline.java:162) at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:134) at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:102) at io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer.next(DefaultFileRecordsPollingConsumer.java:176) at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:199) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:304) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: io.streamthoughts.kafka.connect.filepulse.data.DataException: Cannot merge incompatible schema type NULL<>STRING at io.streamthoughts.kafka.connect.filepulse.data.Schema.merge(Schema.java:203) at io.streamthoughts.kafka.connect.filepulse.data.StructSchema$StructSchemaMerger.apply(StructSchema.java:338) ... 18 more

After I replace field1 with a non-null string value:

io.streamthoughts.kafka.connect.filepulse.data.DataException: Failed to merge schemas for field 'field4'. at io.streamthoughts.kafka.connect.filepulse.data.StructSchema$StructSchemaMerger.apply(StructSchema.java:341) at io.streamthoughts.kafka.connect.filepulse.data.StructSchema.merge(StructSchema.java:260) at io.streamthoughts.kafka.connect.filepulse.data.LazyArraySchema.valueSchema(LazyArraySchema.java:55) at io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter.apply(JSONFilter.java:90) at io.streamthoughts.kafka.connect.filepulse.filter.AbstractMergeRecordFilter.apply(AbstractMergeRecordFilter.java:43) at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline$FilterNode.apply(DefaultRecordFilterPipeline.java:162) at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:134) at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:102) at io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer.next(DefaultFileRecordsPollingConsumer.java:176) at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:199) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:304) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: io.streamthoughts.kafka.connect.filepulse.data.DataException: Failed to merge schemas for field 'field4a'. at io.streamthoughts.kafka.connect.filepulse.data.StructSchema$StructSchemaMerger.apply(StructSchema.java:341) at io.streamthoughts.kafka.connect.filepulse.data.StructSchema.merge(StructSchema.java:260) at io.streamthoughts.kafka.connect.filepulse.data.StructSchema$StructSchemaMerger.apply(StructSchema.java:338) ... 18 more Caused by: io.streamthoughts.kafka.connect.filepulse.data.DataException: Cannot merge incompatible schema type NULL<>STRING at io.streamthoughts.kafka.connect.filepulse.data.Schema.merge(Schema.java:203) at io.streamthoughts.kafka.connect.filepulse.data.StructSchema$StructSchemaMerger.apply(StructSchema.java:338) ... 20 more

Additional context Config of connector: `fs.scan.interval.ms: "10000" fs.scan.filters: "io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter" file.filter.regex.pattern: ".*\.json$" tasks.reader.class: "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalBytesArrayInputReader" errors.log.include.messages: true errors.log.enable: true

offset.strategy: "name"

topic: "json_cspm_topic"
tasks.file.status.storage.bootstrap.servers: "PLAINTEXT://my-cluster-kafka-bootstrap:9092"
internal.kafka.reporter.bootstrap.servers: "PLAINTEXT://my-cluster-kafka-bootstrap:9092"
internal.kafka.reporter.topic: "connect-file-pulse-status"
fs.cleanup.policy.class: "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy"
#cleaner.output.failed.path: "/tmp/jsonexamples/failed/"
#cleaner.output.succeed.path: "/tmp/jsonexamples/succeed/"
fs.listing.class: "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing"
fs.listing.directory.path: "/tmp/jsonexamples/"
filters: "ParseJSON, ExcludeFields, SetTopic"
#filters: "ReplaceEmptyArraysWithNulls, SetTopic"
filters.SetTopic.type: "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter"
filters.SetTopic.value: "{{ replace_all($metadata.name,'.json','') }}"
filters.SetTopic.field: "$topic"
filters.ParseJSON.type: "io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter"
filters.ParseJSON.source: "message"
filters.ParseJSON.merge: "false"
filters.ParseJSON.explode.array: "true"
filters.ExcludeFields.type: "io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter"
filters.ExcludeFields.fields: "Path"
transforms: "Flatten"
transforms.Flatten.type: "org.apache.kafka.connect.transforms.Flatten$Value"
transforms.Flatten.delimiter: "_"`
pri-naik5 commented 2 years ago

Additionally with this input - [ { "field1": "", "field2" : "test5", "field3": [ { "field3_a": "test", "field3_b": "test2", "field3_c": "test3" } ], "field4": null }, { "field1": "test1", "field2" : "test5", "field4": { "field4a": "this_value", "field4b": "", "field4c": "", "field4d": "", "field4e": "", "field4f": "", "field4g": "", "field4h": [ { "field4ha": "", "field4hb": "", "field4hc": "" } ] } } ]

I see this error:

io.streamthoughts.kafka.connect.filepulse.data.DataException: Failed to merge schemas for field 'field4'. at io.streamthoughts.kafka.connect.filepulse.data.StructSchema$StructSchemaMerger.apply(StructSchema.java:341) at io.streamthoughts.kafka.connect.filepulse.data.StructSchema.merge(StructSchema.java:260) at io.streamthoughts.kafka.connect.filepulse.data.LazyArraySchema.valueSchema(LazyArraySchema.java:55) at io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter.apply(JSONFilter.java:90) at io.streamthoughts.kafka.connect.filepulse.filter.AbstractMergeRecordFilter.apply(AbstractMergeRecordFilter.java:43) at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline$FilterNode.apply(DefaultRecordFilterPipeline.java:162) at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:134) at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:102) at io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer.next(DefaultFileRecordsPollingConsumer.java:176) at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:199) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:304) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: io.streamthoughts.kafka.connect.filepulse.data.DataException: Cannot merge incompatible schema type NULL<>STRUCT at io.streamthoughts.kafka.connect.filepulse.data.Schema.merge(Schema.java:203) at io.streamthoughts.kafka.connect.filepulse.data.StructSchema$StructSchemaMerger.apply(StructSchema.java:338) ... 18 more

pri-naik5 commented 2 years ago

@fhussonnois I have added a fix for this. Can I get permissions to create a PR on this repo?

github-actions[bot] commented 2 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] commented 1 year ago

This issue was closed because it has been stalled for 30 days with no activity.