opensearch-project / data-prepper

OpenSearch Data Prepper is a component of the OpenSearch project that accepts, filters, transforms, enriches, and routes data at scale.
https://opensearch.org/docs/latest/clients/data-prepper/index/
Apache License 2.0
262 stars 202 forks source link

[BUG] Data Prepper hung with data stuck in buffer #2598

Open cmanning09 opened 1 year ago

cmanning09 commented 1 year ago

Describe the bug Data Prepper is no longer processing data. There appears to be data stuck in the buffer. Buffer metrics are reporting bufferUsage of 75%. RecordsInFlight are 250,000 and records in buffer are 687,000+. The last message in my logs is

2023-04-25T20:42:43.344 WARN  org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink - Document [******] has failure.

java.lang.RuntimeException: Number of retries reached the limit of max retries(configured value 10)

Data prepper appears to be hung. I no longer see the source polling or any data flowing through the pipeline via the metrics.

To Reproduce I have 2 out of 5 pipelines with this issue. All are pointing to the same domain. Around the time of this issue starting there was a brief write block on my domain. It is unclear if this issue is reproducible as 3 were able to continue to process data after the write block was lifted.. Steps to reproduce the behavior:

Expected behavior Data prepper does not hang with data in the buffer.

Environment (please complete the following information):

Additional context Partial Pipeline Configuration

version: "2"
my-pipeline:
  source:
    s3:
      notification_type: "sqs"
      buffer_timeout: "60s"
      codec:
        newline:
          skip_lines: 1
      sqs:
        queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/****-queue"
      compression: "gzip"
      aws:
        region: "us-east-1"
        sts_role_arn: "arn:aws:iam::123456789012:role/osis-pipeline-role"
  buffer:
    bounded_blocking:
      batch_size: 125000
      buffer_size: 1000000
...

  sink:
  - opensearch:
      max_retries: 10
      hosts:
      - "https://*****"
      index: "vpc-flow-logs-%{yyyy.MM.dd}"
      bulk_size: 20
      aws:
        region: "us-east-1"
        sts_role_arn: "arn:aws:iam::123456789012:role/*****-role"
  workers: 2
  delay: 0
dlvenable commented 1 year ago

We found that the sink worker is blocking when logging to Log4j using asynchronous logging.

"my-pipeline-sink-worker-2-thread-1" #36 prio=5 os_prio=0 cpu=84181006.84ms elapsed=86619.88s tid=0x0000ffff58250080 nid=0xa7 runnable  [0x0000ffff4c819000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.19/Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.19/LockSupport.java:357)
    at com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:136)
    at com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:105)
    at com.lmax.disruptor.RingBuffer.publishEvent(RingBuffer.java:465)
    at com.lmax.disruptor.dsl.Disruptor.publishEvent(Disruptor.java:326)
    at org.apache.logging.log4j.core.async.AsyncLoggerDisruptor.enqueueLogMessageWhenQueueFull(AsyncLoggerDisruptor.java:247)
    - locked <0x00000006613c6860> (a java.lang.Object)
    at org.apache.logging.log4j.core.async.AsyncLogger.handleRingBufferFull(AsyncLogger.java:246)
    at org.apache.logging.log4j.core.async.AsyncLogger.publish(AsyncLogger.java:230)
    at org.apache.logging.log4j.core.async.AsyncLogger.logWithThreadLocalTranslator(AsyncLogger.java:225)
    at org.apache.logging.log4j.core.async.AsyncLogger.access$000(AsyncLogger.java:67)
    at org.apache.logging.log4j.core.async.AsyncLogger$1.log(AsyncLogger.java:152)
    at org.apache.logging.log4j.core.async.AsyncLogger.log(AsyncLogger.java:136)
    at org.apache.logging.log4j.spi.AbstractLogger.tryLogMessage(AbstractLogger.java:2168)
    at org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2122)
    at org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2105)
    at org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2003)
    at org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1870)
    at org.apache.logging.slf4j.Log4jLogger.warn(Log4jLogger.java:280)
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.lambda$logFailure$10(OpenSearchSink.java:311)
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink$$Lambda$1578/0x0000000800970040.accept(Unknown Source)
    at java.util.ArrayList.forEach(java.base@11.0.19/ArrayList.java:1541)
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.logFailure(OpenSearchSink.java:310)
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink$$Lambda$1222/0x0000000800829c40.accept(Unknown Source)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleFailures(BulkRetryStrategy.java:322)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetriesAndFailures(BulkRetryStrategy.java:195)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetry(BulkRetryStrategy.java:235)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetriesAndFailures(BulkRetryStrategy.java:186)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetry(BulkRetryStrategy.java:235)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetriesAndFailures(BulkRetryStrategy.java:186)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetry(BulkRetryStrategy.java:235)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetriesAndFailures(BulkRetryStrategy.java:186)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetry(BulkRetryStrategy.java:235)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetriesAndFailures(BulkRetryStrategy.java:186)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetry(BulkRetryStrategy.java:235)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetriesAndFailures(BulkRetryStrategy.java:186)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetry(BulkRetryStrategy.java:235)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetriesAndFailures(BulkRetryStrategy.java:186)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetry(BulkRetryStrategy.java:235)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetriesAndFailures(BulkRetryStrategy.java:186)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetry(BulkRetryStrategy.java:235)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetriesAndFailures(BulkRetryStrategy.java:186)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetry(BulkRetryStrategy.java:235)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetriesAndFailures(BulkRetryStrategy.java:186)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetry(BulkRetryStrategy.java:235)
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.execute(BulkRetryStrategy.java:154)
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.lambda$flushBatch$6(OpenSearchSink.java:268)
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink$$Lambda$1527/0x000000080094dc40.run(Unknown Source)
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141)
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.flushBatch(OpenSearchSink.java:265)
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doOutput(OpenSearchSink.java:245)
    at org.opensearch.dataprepper.model.sink.AbstractSink.lambda$output$0(AbstractSink.java:54)
    at org.opensearch.dataprepper.model.sink.AbstractSink$$Lambda$1522/0x000000080094e840.run(Unknown Source)
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141)
    at org.opensearch.dataprepper.model.sink.AbstractSink.output(AbstractSink.java:54)
    at org.opensearch.dataprepper.pipeline.Pipeline.lambda$publishToSinks$3(Pipeline.java:297)
    at org.opensearch.dataprepper.pipeline.Pipeline$$Lambda$1521/0x000000080094f440.run(Unknown Source)
    at java.util.concurrent.Executors$RunnableAdapter.call(java.base@11.0.19/Executors.java:515)
    at java.util.concurrent.FutureTask.run(java.base@11.0.19/FutureTask.java:264)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.19/ThreadPoolExecutor.java:1128)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.19/ThreadPoolExecutor.java:628)
    at java.lang.Thread.run(java.base@11.0.19/Thread.java:829)

Since your pipeline does not have a DLQ enabled, the opensearch sink is logging the failing documents. As you have max_retries set to 10, the opensearch sink gives up quickly (after only few seconds). Then it writes all of these documents to the logs.

To get your pipeline working, please consider:

There may be a few improvements we can make in Data Prepper to help: