newrelic / kafka-connect-newrelic

Apache License 2.0
5 stars 17 forks source link

Timestamp as date is not allowed to Events Connector #40

Open wolvery opened 2 years ago

wolvery commented 2 years ago

NOTE: # The documentation suggests using timestamp field from SMT but the connector does not allow it.

Description

NOTE: # An error message occurs when the event is suppose to be sent

Steps to Reproduce

Sample message:

server Struct{OPERATION_ID=2021122300111111111124220090800000000000000000000,TIMESTAMP=1640278880,OPERATION_NAME=PBC-EFETIVADA,TIMESTAMP_DK=1640268373925,INTEGRATED_DK=true,INTEGRATED_EB=false,INTEGRATED_CC=false,eventType=alfateste,timestamp=Thu Dec 23 17:06:32 UTC 2021}

Error message:


erver [2021-12-24 12:59:58,331] ERROR Caught exception while processing a message (com.newrelic.telemetry.TelemetrySinkTask)
server java.lang.ClassCastException: class java.util.Date cannot be cast to class java.lang.Long (java.util.Date and java.lang.Long are in module java.base of loader 'bootstrap')
server     at org.apache.kafka.connect.data.Struct.getInt64(Struct.java:130)
server     at com.newrelic.telemetry.events.EventConverter.withSchema(EventConverter.java:76)
server     at com.newrelic.telemetry.events.EventConverter.toNewRelicEvent(EventConverter.java:153)
server     at com.newrelic.telemetry.events.EventsSinkTask.createTelemetry(EventsSinkTask.java:26)
server     at com.newrelic.telemetry.events.EventsSinkTask.createTelemetry(EventsSinkTask.java:13)
server     at com.newrelic.telemetry.TelemetrySinkTask.put(TelemetrySinkTask.java:119)
server     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
server     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
server     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
server     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
server     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
server     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
server     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
server     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
server     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
server     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
server     at java.base/java.lang.Thread.run(Thread.java:834)
server [2021-12-24 12:59:59,702] DEBUG Empty batch.  Doing nothing (com.newrelic.telemetry.TelemetryBatchRunner)

Using the config:

{
    "transforms.InsertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertTimestamp.timestamp.field": "timestamp",
}

Expected Behaviour

Relevant Logs / Console output

Your Environment

Connector 2.1.0 in docker file

Additional context

stalbot15 commented 2 years ago

@wolvery I ran into this issue as well today. Adding another transform to convert the timestamp to a Long gets around the error. Note that the list order of the transforms config matters, as the operations are chained.

"transforms": "InsertTimestamp,ConvertTimestamp",
"transforms.InsertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTimestamp.timestamp.field": "timestamp"

"transforms.ConvertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.ConvertTimestamp.field": "timestamp",
"transforms.ConvertTimestamp.target.type": "unix"