newrelic / kafka-connect-newrelic

Apache License 2.0
5 stars 17 forks source link

Events are not been sent to NewRelic #39

Open wolvery opened 2 years ago

wolvery commented 2 years ago

NOTE: # Events from kafka with timestamp and eventType are not been sent to new relic

Description

I have detected that this new version can not send a simple avro message like this one:

server Struct{OPERATION_ID=2021122300103230662789491718810000000000000000000,timestamp=1640278878,OPERATION_NAME=TRF-EFETIVADA,TIMESTAMP_DK=1640268698568,INTEGRATED_DK=true,INTEGRATED_EB=false,INTEGRATED_CC=false,eventType=alfateste} (com.newrelic.telemetry.TelemetrySinkTask)
server [2021-12-24 13:07:00,189] ERROR [EventBatch] - Unexpected failure when sending data. (com.newrelic.telemetry.TelemetryClient)
server java.lang.NullPointerException
server     at com.newrelic.telemetry.events.json.EventBatchMarshaller.mapToJson(EventBatchMarshaller.java:73)
server     at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
server     at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
server     at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
server     at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
server     at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
server     at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
server     at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
server     at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
server     at com.newrelic.telemetry.events.json.EventBatchMarshaller.toJson(EventBatchMarshaller.java:44)
server     at com.newrelic.telemetry.events.EventBatchSender.sendBatch(EventBatchSender.java:56)
server     at com.newrelic.telemetry.TelemetryClient.lambda$sendBatch$2(TelemetryClient.java:167)
server     at com.newrelic.telemetry.TelemetryClient.sendWithErrorHandling(TelemetryClient.java:212)
server     at com.newrelic.telemetry.TelemetryClient.lambda$scheduleBatchSend$4(TelemetryClient.java:201)
server     at com.newrelic.telemetry.LimitingScheduler.lambda$schedule$0(LimitingScheduler.java:52)
server     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
server     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
server     at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
server     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
server     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
server     at java.base/java.lang.Thread.run(Thread.java:834)

NOTE: # Messages are not sent and offset is been updated as the message was sent. This connector even with these problems did not have any lag on consumers. It means that it updated the offset without sending any messages.

Steps to Reproduce

COnfig.json:


{

    "api.key": "",
    "batch.size": "10",
    "connector.class": "com.newrelic.telemetry.events.EventsSinkConnector",
    "drop.invalid.message": "false",
    "flush.timeout.ms": "10000",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
    "linger.ms": "1",
    "max.buffered.records": "20",
    "max.in.flight.requests": "5",
    "max.retries": "5",
    "nr.flush.max.interval.ms": "5000",
    "nr.flush.max.records": "100",
    "retry.backoff.ms": "100",
    "topics": "TEMP_CONCILIATION",
    "transforms": "eventtype,Cast",
    "transforms.eventtype.static.field": "eventType",
    "transforms.eventtype.static.value": "alfateste",
    "transforms.eventtype.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.Cast.renames": "TIMESTAMP:timestamp"
}

Expected Behaviour

Relevant Logs / Console output

Your Environment

Docker:

FROM confluentinc/cp-kafka-connect:6.1.0

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt newrelic/newrelic-kafka-connector:2.1.0

Additional context

ericmittelhammer commented 2 years ago

@wolvery thanks for submitting this issue. In this case, I believe there are two underlying issues. One, as you pointed out, is that messages are still being committed even though they have failed to send. I have created a separate issue for that here: https://github.com/newrelic/kafka-connect-newrelic/issues/41

The other issue is that the data that you are using is of course failing. It seems like some or all of the data is being passed to the New Relic Telemetry SDK with null values. I believe the excercise here would be to determine wether or not that is occurring within the Avro Converter, or within the Sink Connector itself. Your config looks correct to me. Is there anything within the way you are using your schema that would trigger this?

Could you also try using the same configuration, but with another debug sink (perhaps the FileSink)? To see if you get any unexpected nulls or errors with it as well?

wolvery commented 2 years ago

@ericmittelhammer

@wolvery thanks for submitting this issue. In this case, I believe there are two underlying issues. One, as you pointed out, is that messages are still being committed even though they have failed to send. I have created a separate issue for that here: #41

The other issue is that the data that you are using is of course failing. It seems like some or all of the data is being passed to the New Relic Telemetry SDK with null values. I believe the excercise here would be to determine wether or not that is occurring within the Avro Converter, or within the Sink Connector itself. Your config looks correct to me. Is there anything within the way you are using your schema that would trigger this?

Could you also try using the same configuration, but with another debug sink (perhaps the FileSink)? To see if you get any unexpected nulls or errors with it as well?

Actually, there are a few records that might contain one field null.

I have identified that once I change null to 0, for example, the connector uploads the events in new relic.

So, it seems the problem is located in the new relic events API, not in this connector itself. I am going to create an issue there.