Describe the bug
The process workers crash when there is not sufficient permission to access the source cluster of an OpenSearch source pipeline. This causes the pipeline to shutdown.
2023-11-28T20:19:42.499 [opensearch-migration-pipeline-sink-worker-2-thread-1] ERROR org.opensearch.dataprepper.pipeline.common.PipelineThreadPoolExecutor - Pipeline [****] process worker encountered a fatal exception, cannot proceed further
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Unable to call info API using the elasticsearch client
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
at org.opensearch.dataprepper.pipeline.common.PipelineThreadPoolExecutor.afterExecute(PipelineThreadPoolExecutor.java:70) ~[data-prepper-core-2.6.0-SNAPSHOT.jar:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1129) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.RuntimeException: Unable to call info API using the elasticsearch client
at org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy.getDistributionAndVersionNumber(SearchAccessorStrategy.java:191) ~[opensearch-2.6.0-SNAPSHOT.jar:?]
at org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy.getSearchAccessor(SearchAccessorStrategy.java:107) ~[opensearch-2.6.0-SNAPSHOT.jar:?]
at org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSource.startProcess(OpenSearchSource.java:74) ~[opensearch-2.6.0-SNAPSHOT.jar:?]
at org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSource.start(OpenSearchSource.java:64) ~[opensearch-2.6.0-SNAPSHOT.jar:?]
at org.opensearch.dataprepper.pipeline.Pipeline.startSourceAndProcessors(Pipeline.java:215) ~[data-prepper-core-2.6.0-SNAPSHOT.jar:?]
at org.opensearch.dataprepper.pipeline.Pipeline.lambda$execute$2(Pipeline.java:260) ~[data-prepper-core-2.6.0-SNAPSHOT.jar:?]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
... 2 more
Caused by: co.elastic.clients.elasticsearch._types.ElasticsearchException: [es/info] failed: [security_exception] no permissions for [cluster:monitor/main] and User [name=<IAM role ARN>, backend_roles=[<IAM role ARN>], requestedTenant=null]
at co.elastic.clients.transport.rest_client.RestClientTransport.getHighLevelResponse(RestClientTransport.java:281) ~[elasticsearch-java-7.17.0.jar:?]
at co.elastic.clients.transport.rest_client.RestClientTransport.performRequest(RestClientTransport.java:147) ~[elasticsearch-java-7.17.0.jar:?]
at co.elastic.clients.elasticsearch.ElasticsearchClient.info(ElasticsearchClient.java:983) ~[elasticsearch-java-7.17.0.jar:?]
at org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy.getDistributionAndVersionNumber(SearchAccessorStrategy.java:188) ~[opensearch-2.6.0-SNAPSHOT.jar:?]
at org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy.getSearchAccessor(SearchAccessorStrategy.java:107) ~[opensearch-2.6.0-SNAPSHOT.jar:?]
at org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSource.startProcess(OpenSearchSource.java:74) ~[opensearch-2.6.0-SNAPSHOT.jar:?]
at org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSource.start(OpenSearchSource.java:64) ~[opensearch-2.6.0-SNAPSHOT.jar:?]
at org.opensearch.dataprepper.pipeline.Pipeline.startSourceAndProcessors(Pipeline.java:215) ~[data-prepper-core-2.6.0-SNAPSHOT.jar:?]
at org.opensearch.dataprepper.pipeline.Pipeline.lambda$execute$2(Pipeline.java:260) ~[data-prepper-core-2.6.0-SNAPSHOT.jar:?]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
... 2 more
To Reproduce
Steps to reproduce the behavior:
Create an FGAC enabled OpenSearch/Elasticsearch cluster
Don't provide FGAC permissions to the pipeline role
Create an opensearch source pipeline
Expected behavior
Similar to the OpenSearch sink, the pipeline should spin until the permissions issue is resolved rather than crashing
Describe the bug The process workers crash when there is not sufficient permission to access the source cluster of an OpenSearch source pipeline. This causes the pipeline to shutdown.
To Reproduce Steps to reproduce the behavior:
Expected behavior Similar to the OpenSearch sink, the pipeline should spin until the permissions issue is resolved rather than crashing