streamthoughts / kafka-connect-file-pulse

🔗 A multipurpose Kafka Connect connector that makes it easy to parse, transform and stream any file, in any format, into Apache Kafka
https://streamthoughts.github.io/kafka-connect-file-pulse/
Apache License 2.0
324 stars 65 forks source link

Convert Record Timestamps into Avro Logical Date Type #665

Open schrieveslaach opened 2 months ago

schrieveslaach commented 2 months ago

Is your feature request related to a problem? Please describe.

In avro there is a logical type date:

A date logical type annotates an Avro int, where the int stores the number of days from the unix epoch, 1 January 1970 (ISO calendar).

And it should be possible to parse fields as dates and convert it into a compliant Avro record.

Describe the solution you'd like

It would be great if DateFilter provides a configuration option to get access to temporal fields to be able to parse a date into a Avro compatible logical date int.

Describe alternatives you've considered

I tried to convert timestamp into “number of days from the unix epoch, 1 January 1970” but I couldn't find a way express that as SCeL.

schrieveslaach commented 2 months ago

I found a way to convert the timestamp to an int with {{ timestamp_diff( 'DAYS', $.dateAsTimestamp, 0 ) }}:

    "filters.DateParser.type": "io.streamthoughts.kafka.connect.filepulse.filter.DateFilter",
    "filters.DateParser.field": "$.dateAsStr",
    "filters.DateParser.formats": "yyyy-MM-dd",
    "filters.DateParser.target": "$.dateAsTimestamp",
    "filters.DateAsDays.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
    "filters.DateAsDays.field": "$.date",
    "filters.DateAsDays.value": "{{ timestamp_diff( 'DAYS', $.dateAsTimestamp, 0 ) }}",
    "filters.DateAsInt.type": "io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter",
    "filters.DateAsInt.field": "date",
    "filters.DateAsInt.to": "INTEGER",

However, the processing fails than with the schema conversion error:


connect  | io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException: Failed to convert data into Kafka Connect record at offset [position=2100, rows=24, timestamp=1725872898509] from object-file: [uri=file:/opt/feeds/d943719c-7866-491e-b4be-b9c8269788c1.csv, name='d943719c-7866-491e-b4be-b9c8269788c1.csv', contentLength=2100, lastModified=1725872893757, contentDigest=[digest=1514060459, algorithm='CRC32'], userDefinedMetadata={system.inode=12725087, system.hostname=connect}]'
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.buildSourceRecord(FilePulseSourceTask.java:331)
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.lambda$poll$0(FilePulseSourceTask.java:211)
connect  |      at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
connect  |      at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1033)
connect  |      at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
connect  |      at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
connect  |      at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
connect  |      at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
connect  |      at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
connect  |      at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:212)
connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:481)
connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:354)
connect  |      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
connect  |      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:78)
connect  |      at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
connect  |      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect  |      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect  |      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect  |      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect  |      at java.base/java.lang.Thread.run(Thread.java:829)
connect  | Caused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema "org.apache.kafka.connect.data.Date" with type INT32: class java.lang.Integer for field: "date"
connect  |      at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)
connect  |      at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.internal.ConnectSchemaMapper.toConnectStruct(ConnectSchemaMapper.java:261)
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.internal.ConnectSchemaMapper.map(ConnectSchemaMapper.java:202)
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord.lambda$toSourceRecord$0(TypedFileRecord.java:94)
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.internal.InternalSourceRecordBuilder.build(InternalSourceRecordBuilder.java:42)
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord.toSourceRecord(TypedFileRecord.java:102)
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.buildSourceRecord(FilePulseSourceTask.java:310)
connect  |      ... 21 more
``