opensearch-project / data-prepper

Data Prepper is a component of the OpenSearch project that accepts, filters, transforms, enriches, and routes data at scale.
https://opensearch.org/docs/latest/clients/data-prepper/index/
Apache License 2.0
241 stars 178 forks source link

Process Termination on Pipeline Shutdown #2595

Open rhysxevans opened 1 year ago

rhysxevans commented 1 year ago

Is your feature request related to a problem? Please describe. At present we are testing data-prepper in a k8s environment, for a new piece of work we are looking at.

The current issue we are having is a heap issue (I will raise another issue for this), that when we run out of heap space all our pipelines shutdown , which is fine. However we would like the option that once the pipelines have shutdown the data-prepper process is terminated, which is not happening at present, which means our pods continue to run although they process nothing.

Describe the solution you'd like On pipeline shutdown for the option to kill the process, and in turn for the pod to be restarted

Describe alternatives you've considered (Optional) We will investigate health check's but this is a belt and braces type thing

Additional context This would be an extension to this PR (I think) https://github.com/opensearch-project/data-prepper/pull/2540/files

dlvenable commented 1 year ago

@rhysxevans , Thank you for reporting this issue.

What version of Data Prepper are you running? We did make this the intended behavior in Data Prepper 2.2. That when any pipeline shuts down, all of Data Prepper should shut down. The option in #2540 would allow you to keep it running if any pipeline is still running.

rhysxevans commented 1 year ago

Hi At present 2.2.0 via opensearchproject/data-prepper:2.2.0

Without pipeline_shutdown: in data-prepper-config.yaml the pipelines shutdown but data prepper doesnt seem to terminate (I will double check this) I am currently testing with pipeline_shutdown: on-any-pipeline-failure to see if that makes a difference, however I think that is the default ?

rhysxevans commented 1 year ago

Note I am only running this at present to collect metrics (from prometheus (/metrics) compatible endpoints via an OTEL Collector, using the TA (target allocator)) to put these metrics into opensearch

The below may not be relevant but for context

pipelines.yml

metrics-pipeline:
  workers: 2
  delay: "100"
  source:
    otel_metrics_source:
      port: 21890
      ssl: false
  buffer:
    bounded_blocking:
      buffer_size: 4096
      batch_size: 512
  processor:
    - otel_metrics: 
        calculate_histogram_buckets: false
        calculate_exponential_histogram_buckets: false
        exponential_histogram_max_allowed_scale: 10
        flatten_attributes: true
  sink:
    - opensearch:
        hosts: ["https://first-cluster-headless.opensearch-first-cluster.svc.cluster.local:9200"]
        username: "admin"
        password: "admin"
        index_type: custom
        insecure: true
        index: metrics-otel-v1-%{yyyy.MM.dd}
    #- stdout:

data-prepper-config.yaml

ssl: false
pipeline_shutdown: on-any-pipeline-failure
metric_registries: [Prometheus]
circuit_breakers:
  heap:
    usage: 8gb
    reset: 1s
    check_interval: 500ms

Some logs I am seeing

2023-04-25T20:56:31,805 [metrics-pipeline-processor-worker-1-thread-2] WARN  org.opensearch.dataprepper.plugins.processor.otelmetrics.OTelMetricsRawProcessor - Error while processing metrics
java.lang.NullPointerException: value cannot be null
    at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:907) ~[guava-31.1-jre.jar:?]
    at org.opensearch.dataprepper.model.metric.ParameterValidator.lambda$validate$2(ParameterValidator.java:33) ~[data-prepper-api-2.2.0.jar:?]
    at java.util.Collections$SingletonList.forEach(Collections.java:4966) ~[?:?]
    at org.opensearch.dataprepper.model.metric.ParameterValidator.validate(ParameterValidator.java:31) ~[data-prepper-api-2.2.0.jar:?]
    at org.opensearch.dataprepper.model.metric.JacksonGauge$Builder.build(JacksonGauge.java:90) ~[data-prepper-api-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.processor.otelmetrics.OTelMetricsRawProcessor.lambda$mapGauge$0(OTelMetricsRawProcessor.java:129) ~[otel-metrics-raw-processor-2.2.0.jar:?]
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[?:?]
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[?:?]
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[?:?]
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[?:?]
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) ~[?:?]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) ~[?:?]
    at org.opensearch.dataprepper.plugins.processor.otelmetrics.OTelMetricsRawProcessor.mapGauge(OTelMetricsRawProcessor.java:131) ~[otel-metrics-raw-processor-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.processor.otelmetrics.OTelMetricsRawProcessor.processMetricsList(OTelMetricsRawProcessor.java:87) ~[otel-metrics-raw-processor-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.processor.otelmetrics.OTelMetricsRawProcessor.doExecute(OTelMetricsRawProcessor.java:71) ~[otel-metrics-raw-processor-2.2.0.jar:?]
    at org.opensearch.dataprepper.model.processor.AbstractProcessor.lambda$execute$0(AbstractProcessor.java:54) ~[data-prepper-api-2.2.0.jar:?]
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:69) [micrometer-core-1.10.5.jar:1.10.5]
    at org.opensearch.dataprepper.model.processor.AbstractProcessor.execute(AbstractProcessor.java:54) [data-prepper-api-2.2.0.jar:?]
    at org.opensearch.dataprepper.pipeline.ProcessWorker.doRun(ProcessWorker.java:115) [data-prepper-core-2.2.0.jar:?]
    at org.opensearch.dataprepper.pipeline.ProcessWorker.run(ProcessWorker.java:50) [data-prepper-core-2.2.0.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
    at java.lang.Thread.run(Thread.java:833) [?:?]
Exception in thread "I/O dispatcher 3" Exception in thread "HTTP-Dispatcher" java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
Exception in thread "idle-timeout-task" java.lang.OutOfMemoryError: Java heap space
2023-04-25T21:06:22,130 [metrics-pipeline-processor-worker-1-thread-1] ERROR org.opensearch.dataprepper.pipeline.common.FutureHelper - FutureTask failed due to:
java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: Java heap space
    at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
    at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
    at org.opensearch.dataprepper.pipeline.common.FutureHelper.awaitFuturesIndefinitely(FutureHelper.java:29) [data-prepper-core-2.2.0.jar:?]
    at org.opensearch.dataprepper.pipeline.ProcessWorker.postToSink(ProcessWorker.java:141) [data-prepper-core-2.2.0.jar:?]
    at org.opensearch.dataprepper.pipeline.ProcessWorker.doRun(ProcessWorker.java:121) [data-prepper-core-2.2.0.jar:?]
    at org.opensearch.dataprepper.pipeline.ProcessWorker.run(ProcessWorker.java:50) [data-prepper-core-2.2.0.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
    at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: java.lang.OutOfMemoryError: Java heap space
2023-04-25T21:06:22,137 [pool-6-thread-1] ERROR org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1 - I/O reactor terminated abnormally
org.apache.http.nio.reactor.IOReactorException: I/O dispatch worker terminated abnormally
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:359) ~[httpcore-nio-4.4.15.jar:4.4.15]
    at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221) ~[httpasyncclient-4.1.5.jar:4.1.5]
    at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64) [httpasyncclient-4.1.5.jar:4.1.5]
    at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: java.lang.OutOfMemoryError: Java heap space
2023-04-25T21:06:22,136 [pool-5-thread-2] WARN  io.micrometer.common.util.internal.logging.AbstractInternalLogger - Failed to apply the value function for the gauge 'jvm.threads.states'. Note that subsequent logs will be logged at debug level.
java.lang.OutOfMemoryError: Java heap space
2023-04-25T21:06:22,139 [metrics-pipeline-sink-worker-2-thread-1] ERROR org.opensearch.dataprepper.pipeline.common.PipelineThreadPoolExecutor - Pipeline [metrics-pipeline] process worker encountered a fatal exception, cannot proceed further
java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: Java heap space
    at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
    at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
    at org.opensearch.dataprepper.pipeline.common.PipelineThreadPoolExecutor.afterExecute(PipelineThreadPoolExecutor.java:70) [data-prepper-core-2.2.0.jar:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1137) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
    at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: java.lang.OutOfMemoryError: Java heap space
2023-04-25T21:06:22,154 [metrics-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.pipeline.Pipeline - Pipeline [metrics-pipeline] - Received shutdown signal with processor shutdown timeout PT30S and sink shutdown timeout PT30S. Initiating the shutdown process
2023-04-25T21:06:22,155 [pool-5-thread-1] ERROR org.opensearch.dataprepper.pipeline.server.PrometheusMetricsHandler - Encountered exception scraping prometheus meter registry
java.io.IOException: stream is closed
    at sun.net.httpserver.Request$WriteStream.write(Request.java:382) ~[jdk.httpserver:?]
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81) ~[?:?]
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142) ~[?:?]
    at sun.net.httpserver.ExchangeImpl.sendResponseHeaders(ExchangeImpl.java:280) ~[jdk.httpserver:?]
    at sun.net.httpserver.HttpExchangeImpl.sendResponseHeaders(HttpExchangeImpl.java:85) ~[jdk.httpserver:?]
    at org.opensearch.dataprepper.pipeline.server.PrometheusMetricsHandler.handle(PrometheusMetricsHandler.java:43) [data-prepper-core-2.2.0.jar:?]
    at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:95) [jdk.httpserver:?]
    at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:82) [jdk.httpserver:?]
    at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:98) [jdk.httpserver:?]
    at sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:851) [jdk.httpserver:?]
    at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:95) [jdk.httpserver:?]
    at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:818) [jdk.httpserver:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
    at java.lang.Thread.run(Thread.java:833) [?:?]
2023-04-25T21:06:22,188 [pool-5-thread-1] ERROR org.opensearch.dataprepper.pipeline.server.PrometheusMetricsHandler - Encountered exception scraping prometheus meter registry
java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[?:?]
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:62) ~[?:?]
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:132) ~[?:?]
    at sun.nio.ch.IOUtil.write(IOUtil.java:97) ~[?:?]
    at sun.nio.ch.IOUtil.write(IOUtil.java:53) ~[?:?]
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:532) ~[?:?]
    at sun.net.httpserver.Request$WriteStream.write(Request.java:393) ~[jdk.httpserver:?]
    at sun.net.httpserver.FixedLengthOutputStream.write(FixedLengthOutputStream.java:81) ~[jdk.httpserver:?]
    at java.io.FilterOutputStream.write(FilterOutputStream.java:108) ~[?:?]
    at sun.net.httpserver.PlaceholderOutputStream.write(ExchangeImpl.java:459) ~[jdk.httpserver:?]
    at org.opensearch.dataprepper.pipeline.server.PrometheusMetricsHandler.handle(PrometheusMetricsHandler.java:44) [data-prepper-core-2.2.0.jar:?]
    at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:95) [jdk.httpserver:?]
    at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:82) [jdk.httpserver:?]
    at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:98) [jdk.httpserver:?]
    at sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:851) [jdk.httpserver:?]
    at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:95) [jdk.httpserver:?]
    at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:818) [jdk.httpserver:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
    at java.lang.Thread.run(Thread.java:833) [?:?]

....

2023-04-25T21:06:26,302 [metrics-pipeline-sink-worker-2-thread-2] WARN  org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink - Document [FailedDlqData{index='metrics-otel-v1-2023.04.25', indexId='null', status='0', message='I/O reactor has been shut down', document={kind=SUM, flags=0, description=Total number of connections attempted by the given dialer a given name., serviceName=kube-prometheus-stack-prometheus, schemaUrl=, isMonotonic=true, unit=, aggregationTemporality=AGGREGATION_TEMPORALITY_CUMULATIVE, exemplars=[], name=net_conntrack_dialer_conn_attempted_total, startTime=2023-04-23T08:12:54.344Z, time=2023-04-25T20:51:24.344Z, value=39.0, resource.attributes.k8s@namespace@name=kube-prometheus-stack, resource.attributes.k8s@container@name=prometheus, resource.attributes.net@host@name=100.64.139.18, metric.attributes.service=kube-prometheus-stack-prometheus, resource.attributes.service@instance@id=100.64.139.18:9090, resource.attributes.service@name=kube-prometheus-stack-prometheus, resource.attributes.http@scheme=http, resource.attributes.k8s@pod@name=prometheus-kube-prometheus-stack-0, resource.attributes.net@host@port=9090, metric.attributes.endpoint=http-web, resource.attributes.k8s@node@name=ip-10-154-174-97.eu-west-1.compute.internal, resource.attributes.k8s@pod@uid=0bcd52c2-7994-4444-bea3-a0d4062e18ec, metric.attributes.container=prometheus, metric.attributes.pod=prometheus-kube-prometheus-stack-0, metric.attributes.namespace=kube-prometheus-stack, resource.attributes.k8s@statefulset@name=prometheus-kube-prometheus-stack, metric.attributes.dialer_name=serviceMonitor/promtail/promtail/0}}] has failure.
java.lang.RuntimeException: I/O reactor has been shut down
    at org.opensearch.client.RestClient.extractAndWrapCause(RestClient.java:961) ~[opensearch-rest-client-2.4.1.jar:2.4.1]
    at org.opensearch.client.RestClient.performRequest(RestClient.java:332) ~[opensearch-rest-client-2.4.1.jar:2.4.1]
    at org.opensearch.client.RestClient.performRequest(RestClient.java:320) ~[opensearch-rest-client-2.4.1.jar:2.4.1]
    at org.opensearch.client.transport.rest_client.RestClientTransport.performRequest(RestClientTransport.java:142) ~[opensearch-java-2.2.0.jar:?]
    at org.opensearch.client.opensearch.OpenSearchClient.bulk(OpenSearchClient.java:211) ~[opensearch-java-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.lambda$doInitializeInternal$1(OpenSearchSink.java:179) ~[opensearch-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetry(BulkRetryStrategy.java:213) ~[opensearch-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.execute(BulkRetryStrategy.java:154) ~[opensearch-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.lambda$flushBatch$6(OpenSearchSink.java:278) ~[opensearch-2.2.0.jar:?]
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141) ~[micrometer-core-1.10.5.jar:1.10.5]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.flushBatch(OpenSearchSink.java:275) ~[opensearch-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doOutput(OpenSearchSink.java:252) ~[opensearch-2.2.0.jar:?]
    at org.opensearch.dataprepper.model.sink.AbstractSink.lambda$output$0(AbstractSink.java:54) ~[data-prepper-api-2.2.0.jar:?]
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141) ~[micrometer-core-1.10.5.jar:1.10.5]
    at org.opensearch.dataprepper.model.sink.AbstractSink.output(AbstractSink.java:54) ~[data-prepper-api-2.2.0.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.lambda$publishToSinks$4(Pipeline.java:312) ~[data-prepper-core-2.2.0.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
    at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: java.lang.IllegalStateException: I/O reactor has been shut down
    at org.apache.http.util.Asserts.check(Asserts.java:34) ~[httpcore-4.4.16.jar:4.4.16]
    at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.connect(DefaultConnectingIOReactor.java:227) ~[httpcore-nio-4.4.15.jar:4.4.15]
    at org.apache.http.nio.pool.AbstractNIOConnPool.processPendingRequest(AbstractNIOConnPool.java:481) ~[httpcore-nio-4.4.15.jar:4.4.15]
    at org.apache.http.nio.pool.AbstractNIOConnPool.lease(AbstractNIOConnPool.java:280) ~[httpcore-nio-4.4.15.jar:4.4.15]
    at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.requestConnection(PoolingNHttpClientConnectionManager.java:295) ~[httpasyncclient-4.1.5.jar:4.1.5]
    at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.requestConnection(AbstractClientExchangeHandler.java:381) ~[httpasyncclient-4.1.5.jar:4.1.5]
    at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.start(DefaultClientExchangeHandlerImpl.java:130) ~[httpasyncclient-4.1.5.jar:4.1.5]
    at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase.execute(CloseableHttpAsyncClientBase.java:116) ~[httpasyncclient-4.1.5.jar:4.1.5]
    at org.apache.http.impl.nio.client.InternalHttpAsyncClient.execute(InternalHttpAsyncClient.java:138) ~[httpasyncclient-4.1.5.jar:4.1.5]
    at org.opensearch.client.RestClient.performRequest(RestClient.java:328) ~[opensearch-rest-client-2.4.1.jar:2.4.1]
    ... 19 more
rhysxevans commented 1 year ago

Ok, so after that setting I am seeing, what I was hoping for , I will monitor.

Next issue is to figure ot the heap out of memory, but that is for another ticket

2023-04-25T21:39:12,655 [metrics-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.pipeline.Pipeline - Pipeline [metrics-pipeline] - Received shutdown signal with processor shutdown timeout PT30S and sink shutdown timeout PT30S. Initiating the shutdown process
2023-04-25T21:39:12,656 [pool-5-thread-1] ERROR org.opensearch.dataprepper.pipeline.server.PrometheusMetricsHandler - Encountered exception scraping prometheus meter registry
java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[?:?]
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:62) ~[?:?]
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:132) ~[?:?]
    at sun.nio.ch.IOUtil.write(IOUtil.java:97) ~[?:?]
    at sun.nio.ch.IOUtil.write(IOUtil.java:53) ~[?:?]
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:532) ~[?:?]
    at sun.net.httpserver.Request$WriteStream.write(Request.java:393) ~[jdk.httpserver:?]
    at sun.net.httpserver.FixedLengthOutputStream.write(FixedLengthOutputStream.java:81) ~[jdk.httpserver:?]
    at java.io.FilterOutputStream.write(FilterOutputStream.java:108) ~[?:?]
    at sun.net.httpserver.PlaceholderOutputStream.write(ExchangeImpl.java:459) ~[jdk.httpserver:?]
    at org.opensearch.dataprepper.pipeline.server.PrometheusMetricsHandler.handle(PrometheusMetricsHandler.java:44) [data-prepper-core-2.2.0.jar:?]
    at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:95) [jdk.httpserver:?]
    at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:82) [jdk.httpserver:?]
    at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:98) [jdk.httpserver:?]
    at sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:851) [jdk.httpserver:?]
    at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:95) [jdk.httpserver:?]
    at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:818) [jdk.httpserver:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
    at java.lang.Thread.run(Thread.java:833) [?:?]
2023-04-25T21:39:12,686 [pool-5-thread-2] ERROR org.opensearch.dataprepper.pipeline.server.PrometheusMetricsHandler - Encountered exception scraping prometheus meter registry
java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[?:?]
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:62) ~[?:?]
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:132) ~[?:?]
    at sun.nio.ch.IOUtil.write(IOUtil.java:97) ~[?:?]
    at sun.nio.ch.IOUtil.write(IOUtil.java:53) ~[?:?]
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:532) ~[?:?]
    at sun.net.httpserver.Request$WriteStream.write(Request.java:393) ~[jdk.httpserver:?]
    at sun.net.httpserver.FixedLengthOutputStream.write(FixedLengthOutputStream.java:81) ~[jdk.httpserver:?]
    at java.io.FilterOutputStream.write(FilterOutputStream.java:108) ~[?:?]
    at sun.net.httpserver.PlaceholderOutputStream.write(ExchangeImpl.java:459) ~[jdk.httpserver:?]
    at org.opensearch.dataprepper.pipeline.server.PrometheusMetricsHandler.handle(PrometheusMetricsHandler.java:44) [data-prepper-core-2.2.0.jar:?]
    at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:95) [jdk.httpserver:?]
    at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:82) [jdk.httpserver:?]
    at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:98) [jdk.httpserver:?]
    at sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:851) [jdk.httpserver:?]
    at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:95) [jdk.httpserver:?]
    at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:818) [jdk.httpserver:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
    at java.lang.Thread.run(Thread.java:833) [?:?]
2023-04-25T21:39:14,738 [metrics-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.source.otelmetrics.OTelMetricsSource - Stopped otel_metrics_source.
2023-04-25T21:39:14,739 [metrics-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.pipeline.Pipeline - Pipeline [metrics-pipeline] - Shutting down processor process workers.
2023-04-25T21:39:14,909 [metrics-pipeline-processor-worker-1-thread-2] WARN  org.opensearch.dataprepper.plugins.processor.otelmetrics.OTelMetricsRawProcessor - Error while processing metrics
java.lang.NullPointerException: value cannot be null
    at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:907) ~[guava-31.1-jre.jar:?]
    at org.opensearch.dataprepper.model.metric.ParameterValidator.lambda$validate$2(ParameterValidator.java:33) ~[data-prepper-api-2.2.0.jar:?]
    at java.util.Collections$SingletonList.forEach(Collections.java:4966) ~[?:?]
    at org.opensearch.dataprepper.model.metric.ParameterValidator.validate(ParameterValidator.java:31) ~[data-prepper-api-2.2.0.jar:?]
    at org.opensearch.dataprepper.model.metric.JacksonGauge$Builder.build(JacksonGauge.java:90) ~[data-prepper-api-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.processor.otelmetrics.OTelMetricsRawProcessor.lambda$mapGauge$0(OTelMetricsRawProcessor.java:129) ~[otel-metrics-raw-processor-2.2.0.jar:?]
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[?:?]
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[?:?]
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[?:?]
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[?:?]
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) ~[?:?]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) ~[?:?]
    at org.opensearch.dataprepper.plugins.processor.otelmetrics.OTelMetricsRawProcessor.mapGauge(OTelMetricsRawProcessor.java:131) ~[otel-metrics-raw-processor-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.processor.otelmetrics.OTelMetricsRawProcessor.processMetricsList(OTelMetricsRawProcessor.java:87) ~[otel-metrics-raw-processor-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.processor.otelmetrics.OTelMetricsRawProcessor.doExecute(OTelMetricsRawProcessor.java:71) ~[otel-metrics-raw-processor-2.2.0.jar:?]
    at org.opensearch.dataprepper.model.processor.AbstractProcessor.lambda$execute$0(AbstractProcessor.java:54) ~[data-prepper-api-2.2.0.jar:?]
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:69) [micrometer-core-1.10.5.jar:1.10.5]
    at org.opensearch.dataprepper.model.processor.AbstractProcessor.execute(AbstractProcessor.java:54) [data-prepper-api-2.2.0.jar:?]
    at org.opensearch.dataprepper.pipeline.ProcessWorker.doRun(ProcessWorker.java:115) [data-prepper-core-2.2.0.jar:?]
    at org.opensearch.dataprepper.pipeline.ProcessWorker.run(ProcessWorker.java:50) [data-prepper-core-2.2.0.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
    at java.lang.Thread.run(Thread.java:833) [?:?]
2023-04-25T21:39:44,739 [metrics-pipeline-sink-worker-2-thread-1] WARN  org.opensearch.dataprepper.pipeline.Pipeline - Pipeline [metrics-pipeline] - Workers did not terminate in time, forcing termination of processor workers.
2023-04-25T21:39:44,740 [metrics-pipeline-processor-worker-1-thread-2] ERROR org.opensearch.dataprepper.pipeline.common.FutureHelper - FutureTask is interrupted or timed out
2023-04-25T21:39:44,740 [metrics-pipeline-processor-worker-1-thread-1] ERROR org.opensearch.dataprepper.pipeline.common.FutureHelper - FutureTask is interrupted or timed out
2023-04-25T21:39:44,740 [metrics-pipeline-processor-worker-1-thread-2] INFO  org.opensearch.dataprepper.pipeline.ProcessWorker - Processor shutdown phase 1 complete.
2023-04-25T21:39:44,740 [metrics-pipeline-processor-worker-1-thread-1] INFO  org.opensearch.dataprepper.pipeline.ProcessWorker - Processor shutdown phase 1 complete.
2023-04-25T21:39:44,740 [metrics-pipeline-processor-worker-1-thread-2] INFO  org.opensearch.dataprepper.pipeline.ProcessWorker - Beginning processor shutdown phase 2, iterating until buffers empty.
2023-04-25T21:39:44,740 [metrics-pipeline-processor-worker-1-thread-1] INFO  org.opensearch.dataprepper.pipeline.ProcessWorker - Beginning processor shutdown phase 2, iterating until buffers empty.
2023-04-25T21:39:44,743 [metrics-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.pipeline.Pipeline - Pipeline [metrics-pipeline] - Shutting down sink process workers.
2023-04-25T21:39:44,956 [metrics-pipeline-processor-worker-1-thread-2] WARN  org.opensearch.dataprepper.plugins.processor.otelmetrics.OTelMetricsRawProcessor - Error while processing metrics
java.lang.NullPointerException: value cannot be null
    at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:907) ~[guava-31.1-jre.jar:?]
    at org.opensearch.dataprepper.model.metric.ParameterValidator.lambda$validate$2(ParameterValidator.java:33) ~[data-prepper-api-2.

....

    ... 21 more
2023-04-25T21:39:52,163 [metrics-pipeline-sink-worker-2-thread-2] WARN  org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink - Document [FailedDlqData{index='metrics-otel-v1-2023.04.25', indexId='null', status='0', message='Request execution cancelled', document={kind=SUM, flags=0, description=Total number of log messages created by Loki itself., serviceName=loki/loki-read, schemaUrl=, isMonotonic=true, unit=, aggregationTemporality=AGGREGATION_TEMPORALITY_CUMULATIVE, exemplars=[], name=loki_internal_log_messages_total, startTime=2023-04-23T08:12:47.075Z, time=2023-04-25T21:31:02.075Z, value=78753.0, resource.attributes.k8s@namespace@name=loki, resource.attributes.k8s@container@name=loki, resource.attributes.net@host@name=100.64.143.151, metric.attributes.service=loki-read, resource.attributes.service@instance@id=100.64.143.151:3100, resource.attributes.service@name=loki/loki-read, metric.attributes.cluster=loki, resource.attributes.http@scheme=http, resource.attributes.k8s@pod@name=loki-read-2, resource.attributes.net@host@port=3100, metric.attributes.endpoint=http-metrics, resource.attributes.k8s@pod@uid=a9184a39-07a3-43b2-9e4b-3c13b2adba84, resource.attributes.k8s@node@name=ip-10-154-174-232.eu-west-1.compute.internal, metric.attributes.container=loki, metric.attributes.pod=loki-read-2, metric.attributes.level=error, metric.attributes.namespace=loki, resource.attributes.k8s@statefulset@name=loki-read}}] has failure.
java.lang.RuntimeException: Request execution cancelled
    at org.opensearch.client.RestClient.extractAndWrapCause(RestClient.java:961) ~[opensearch-rest-client-2.4.1.jar:2.4.1]
    at org.opensearch.client.RestClient.performRequest(RestClient.java:332) ~[opensearch-rest-client-2.4.1.jar:2.4.1]
    at org.opensearch.client.RestClient.performRequest(RestClient.java:320) ~[opensearch-rest-client-2.4.1.jar:2.4.1]
    at org.opensearch.client.transport.rest_client.RestClientTransport.performRequest(RestClientTransport.java:142) ~[opensearch-java-2.2.0.jar:?]
    at org.opensearch.client.opensearch.OpenSearchClient.bulk(OpenSearchClient.java:211) ~[opensearch-java-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.lambda$doInitializeInternal$1(OpenSearchSink.java:179) ~[opensearch-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetry(BulkRetryStrategy.java:213) ~[opensearch-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetriesAndFailures(BulkRetryStrategy.java:186) ~[opensearch-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetry(BulkRetryStrategy.java:231) ~[opensearch-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.execute(BulkRetryStrategy.java:154) ~[opensearch-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.lambda$flushBatch$6(OpenSearchSink.java:278) ~[opensearch-2.2.0.jar:?]
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141) ~[micrometer-core-1.10.5.jar:1.10.5]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.flushBatch(OpenSearchSink.java:275) ~[opensearch-2.2.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doOutput(OpenSearchSink.java:252) ~[opensearch-2.2.0.jar:?]
    at org.opensearch.dataprepper.model.sink.AbstractSink.lambda$output$0(AbstractSink.java:54) ~[data-prepper-api-2.2.0.jar:?]
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141) ~[micrometer-core-1.10.5.jar:1.10.5]
    at org.opensearch.dataprepper.model.sink.AbstractSink.output(AbstractSink.java:54) ~[data-prepper-api-2.2.0.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.lambda$publishToSinks$4(Pipeline.java:312) ~[data-prepper-core-2.2.0.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
    at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: java.util.concurrent.CancellationException: Request execution cancelled
    at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase.execute(CloseableHttpAsyncClientBase.java:114) ~[httpasyncclient-4.1.5.jar:4.1.5]
    at org.apache.http.impl.nio.client.InternalHttpAsyncClient.execute(InternalHttpAsyncClient.java:138) ~[httpasyncclient-4.1.5.jar:4.1.5]
    at org.opensearch.client.RestClient.performRequest(RestClient.java:328) ~[opensearch-rest-client-2.4.1.jar:2.4.1]
    ... 21 more

Above last log

Restart 

dc985
Reading pipelines and data-prepper configuration files from Data Prepper home directory.
/opt/java/openjdk/bin/java
Found openjdk version  of 17.0
2023-04-25T21:40:31,049 [main] INFO  org.opensearch.dataprepper.DataPrepperArgumentConfiguration - Command line args: /usr/share/data-prepper/pipelines,/usr/share/data-prepper/config/data-prepper-config.yaml
2023-04-25T21:40:31,051 [main] INFO  org.opensearch.dataprepper.DataPrepperArgs - Using /usr/share/data-prepper/pipelines configuration file
2023-04-25T21:40:31,075 [main] INFO  org.hibernate.validator.internal.util.Version - HV000001: Hibernate Validator 8.0.0.Final
2023-04-25T21:40:31,438 [main] INFO               org.reflections.Reflections - Reflections took 116 ms to scan 31 urls, producing 114 keys and 323 values
2023-04-25T21:40:31,743 [main] INFO  org.opensearch.dataprepper.breaker.HeapCircuitBreaker - Heap circuit breaker with usage of 8589934592 bytes.
2023-04-25T21:40:31,770 [main] INFO  org.opensearch.dataprepper.parser.PipelineParser - Reading pipeline configuration from pipelines.yml
2023-04-25T21:40:31,809 [main] INFO  org.opensearch.dataprepper.parser.PipelineParser - Building pipeline [metrics-pipeline] from provided configuration
2023-04-25T21:40:31,809 [main] INFO  org.opensearch.dataprepper.parser.PipelineParser - Building [otel_metrics_source] as source component for the pipeline [metrics-pipeline]
2023-04-25T21:40:31,883 [main] WARN  org.opensearch.dataprepper.plugins.source.otelmetrics.OTelMetricsSource - Creating otel-metrics-source without authentication. This is not secure.
2023-04-25T21:40:31,884 [main] WARN  org.opensearch.dataprepper.plugins.source.otelmetrics.OTelMetricsSource - In order to set up Http Basic authentication for the otel-metrics-source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-metrics-source#authentication-configurations