newrelic / kafka-connect-newrelic

Apache License 2.0
5 stars 17 forks source link

Don't commit messages that were not successfully received by the NR API #41

Open ericmittelhammer opened 2 years ago

ericmittelhammer commented 2 years ago

As noted in https://github.com/newrelic/kafka-connect-newrelic/issues/39, there are scenarios where messages can fail on delivery to the NR API, in this case, within the telemetry SDK, and they are still committed.

Recommend reviewing commit path and ensuring active commit is occurring OUTSIDE of any error handling routines.

Relevant Logs / Console output

(copied from https://github.com/newrelic/kafka-connect-newrelic/issues/39)

server Struct{OPERATION_ID=2021122300103230662789491718810000000000000000000,timestamp=1640278878,OPERATION_NAME=TRF-EFETIVADA,TIMESTAMP_DK=1640268698568,INTEGRATED_DK=true,INTEGRATED_EB=false,INTEGRATED_CC=false,eventType=alfateste} (com.newrelic.telemetry.TelemetrySinkTask)
server [2021-12-24 13:07:00,189] ERROR [EventBatch] - Unexpected failure when sending data. (com.newrelic.telemetry.TelemetryClient)
server java.lang.NullPointerException
server     at com.newrelic.telemetry.events.json.EventBatchMarshaller.mapToJson(EventBatchMarshaller.java:73)
server     at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
server     at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
server     at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
server     at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
server     at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
server     at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
server     at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
server     at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
server     at com.newrelic.telemetry.events.json.EventBatchMarshaller.toJson(EventBatchMarshaller.java:44)
server     at com.newrelic.telemetry.events.EventBatchSender.sendBatch(EventBatchSender.java:56)
server     at com.newrelic.telemetry.TelemetryClient.lambda$sendBatch$2(TelemetryClient.java:167)
server     at com.newrelic.telemetry.TelemetryClient.sendWithErrorHandling(TelemetryClient.java:212)
server     at com.newrelic.telemetry.TelemetryClient.lambda$scheduleBatchSend$4(TelemetryClient.java:201)
server     at com.newrelic.telemetry.LimitingScheduler.lambda$schedule$0(LimitingScheduler.java:52)
server     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
server     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
server     at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
server     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
server     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
server     at java.base/java.lang.Thread.run(Thread.java:834)