opensearch-project / data-prepper

OpenSearch Data Prepper is a component of the OpenSearch project that accepts, filters, transforms, enriches, and routes data at scale.
https://opensearch.org/docs/latest/clients/data-prepper/index/
Apache License 2.0
266 stars 205 forks source link

[BUG] Potential for NullPointerException when converting OpenSearch docuents to be sent to DLQ #3933

Closed graytaylor0 closed 3 months ago

graytaylor0 commented 10 months ago

Describe the bug There is a potential NullPointerException for when document is null here that causes pipeline to crash (https://github.com/opensearch-project/data-prepper/blob/35a69489c2f8621c8aa258ddd8dda105cd67a9e4/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/dlq/FailedBulkOperationConverter.java#L64).

2024-01-07T01:22:08.271 [dynamodb-pipeline-processor-worker-1-thread-1] ERROR org.opensearch.dataprepper.pipeline.common.FutureHelper - FutureTask failed due to: 
java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
    at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
    at org.opensearch.dataprepper.pipeline.common.FutureHelper.awaitFuturesIndefinitely(FutureHelper.java:29) ~[data-prepper-core-2.6.0.jar:?]
    at org.opensearch.dataprepper.pipeline.ProcessWorker.postToSink(ProcessWorker.java:158) ~[data-prepper-core-2.6.0.jar:?]
    at org.opensearch.dataprepper.pipeline.ProcessWorker.doRun(ProcessWorker.java:139) ~[data-prepper-core-2.6.0.jar:?]
    at org.opensearch.dataprepper.pipeline.ProcessWorker.run(ProcessWorker.java:61) ~[data-prepper-core-2.6.0.jar:?]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
    at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.NullPointerException
    at org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperationConverter.convertDocumentToGenericMap(FailedBulkOperationConverter.java:64) ~[opensearch-2.6.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperationConverter.convertToDlqObject(FailedBulkOperationConverter.java:38) ~[opensearch-2.6.0.jar:?]
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) ~[?:?]
    at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) ~[?:?]
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) ~[?:?]
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.logFailureForBulkRequests(OpenSearchSink.java:501) ~[opensearch-2.6.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleFailures(BulkRetryStrategy.java:369) ~[opensearch-2.6.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleFailures(BulkRetryStrategy.java:269) ~[opensearch-2.6.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetriesAndFailures(BulkRetryStrategy.java:256) ~[opensearch-2.6.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetry(BulkRetryStrategy.java:291) ~[opensearch-2.6.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.execute(BulkRetryStrategy.java:195) ~[opensearch-2.6.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.lambda$flushBatch$12(OpenSearchSink.java:487) ~[opensearch-2.6.0.jar:?]
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141) ~[micrometer-core-1.11.3.jar:1.11.3]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.flushBatch(OpenSearchSink.java:484) ~[opensearch-2.6.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doOutput(OpenSearchSink.java:453) ~[opensearch-2.6.0.jar:?]
    at org.opensearch.dataprepper.model.sink.AbstractSink.lambda$output$0(AbstractSink.java:67) ~[data-prepper-api-2.6.0.jar:?]
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141) ~[micrometer-core-1.11.3.jar:1.11.3]
    at org.opensearch.dataprepper.model.sink.AbstractSink.output(AbstractSink.java:67) ~[data-prepper-api-2.6.0.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.lambda$publishToSinks$5(Pipeline.java:349) ~[data-prepper-core-2.6.0.jar:?]
    ... 5 more

To Reproduce Not sure how to reproduce

Expected behavior Do not crash the pipeline when document is null

Screenshots If applicable, add screenshots to help explain your problem.

Environment (please complete the following information):

Additional context Add any other context about the problem here.

dlvenable commented 10 months ago

@graytaylor0 , Are you able to work this? It looks like it should be an easy fix. I think an empty string (or perhaps a null string) in the DLQ would be appropriate here.

graytaylor0 commented 10 months ago

Sure I can push a fix for this.