opensearch-project / data-prepper

OpenSearch Data Prepper is a component of the OpenSearch project that accepts, filters, transforms, enriches, and routes data at scale.
https://opensearch.org/docs/latest/clients/data-prepper/index/
Apache License 2.0
265 stars 203 forks source link

[BUG] Data Prepper processor workers stop running when an error from the routes occurs #4883

Open dlvenable opened 3 months ago

dlvenable commented 3 months ago

Describe the bug

If the routes for a sink fail, such as when the expression is invalid, the process worker running will stop running. This will lead to Data Prepper running without any process workers.

The buffer will fill up and Data Prepper will have effectively been shutdown.

To Reproduce

  1. Create a pipeline with conditional routes
  2. Make one of the routes have an invalid expression
  3. Run Data Prepper
  4. Ingest data
2024-08-26T23:13:58.480 [test-pipeline-processor-worker-5-thread-2] ERROR org.opensearch.dataprepper.pipeline.ProcessWorker - Encountered exception during pipeline test-pipeline processing
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@10ec1e4d[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@198669bf[Wrapped task = org.opensearch.dataprepper.pipeline.Pipeline$$Lambda$1477/0x000000080136e230@138d51a1]] rejected from org.opensearch.dataprepper.pipeline.common.PipelineThreadPoolExecutor@5af5a6fd[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 2018]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365) ~[?:?]
at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134) ~[?:?]
at org.opensearch.dataprepper.pipeline.Pipeline.lambda$publishToSinks$6(Pipeline.java:347) ~[data-prepper-core.jar:?]
at org.opensearch.dataprepper.pipeline.router.Router.lambda$route$0(Router.java:64) ~[data-prepper-core.jar:?]
at org.opensearch.dataprepper.pipeline.router.DataFlowComponentRouter.route(DataFlowComponentRouter.java:48) ~[data-prepper-core.jar:?]
at org.opensearch.dataprepper.pipeline.router.Router.route(Router.java:58) ~[data-prepper-corejar:?]
at org.opensearch.dataprepper.pipeline.Pipeline.publishToSinks(Pipeline.java:346) ~[data-prepper-core.jar:?]
at org.opensearch.dataprepper.pipeline.ProcessWorker.postToSink(ProcessWorker.java:168) ~[data-prepper-corejar:?]
at org.opensearch.dataprepper.pipeline.ProcessWorker.doRun(ProcessWorker.java:150) ~[data-prepper-corejar:?]
at org.opensearch.dataprepper.pipeline.ProcessWorker.run(ProcessWorker.java:68) ~[data-prepper-corejar:?]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:833) [?:?]

Expected behavior

I expect that Data Prepper will continue to run. One difficulty is what to do with the data. We could drop it or send it incorrectly somewhere.

Ideally, we can use the new _default route if available. Environment (please complete the following information):

Data Prepper 2.8

Additional context

This is a very similar issue to #4103, but is manifest through failures in the router and/or sinks.

dlvenable commented 2 months ago

I have tried to reproduce this in order to find the root cause. It does not appear to be exactly from throwing the exception on the routes, but possibly some side effect.

I have this pipeline to attempt to reproduce, but it does not stall.

entry-pipeline:
  workers: 1
  delay: 10
  source:
    http:

  buffer:
    bounded_blocking:
      buffer_size: 2
      batch_size: 10000

  sink:
    - pipeline:
        name: second-pipeline
    - stdout:

second-pipeline:
  workers: 1
  delay: 10
  source:
    pipeline:
      name: entry-pipeline

  buffer:
    bounded_blocking:
      buffer_size: 2
      batch_size: 10000

  processor:

  routes:
    - bad_route: 'this will throw an exception'

  sink:
    - stdout:
        routes: ['bad_route']
kkondaka commented 2 months ago

@dlvenable your example pipeline returns a different kind of exception. The original exception mentioned in the description says that the submission to executor service is rejected. This is usually done when the executor service is being shutdown. Are you sure that the original issue did not occur while the pipeline is getting shutdown?