opensearch-project / data-prepper

Data Prepper is a component of the OpenSearch project that accepts, filters, transforms, enriches, and routes data at scale.
https://opensearch.org/docs/latest/clients/data-prepper/index/
Apache License 2.0
259 stars 190 forks source link

[BUG] Can't connect to OpenSearch if Build Flavor is not returned #4654

Open kclinden opened 3 months ago

kclinden commented 3 months ago

Describe the bug Pipeline failing to connect to OpenSearch when BuildFlavor is not returned from root OpenSearch api. Source seems to point to this line - https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java#L198

Do we need to get buildFlavor?

To Reproduce Steps to reproduce the behavior: Pipeline:

version: '2'
opensearch-source-pipeline:
  source:
    opensearch:
      hosts: ['https://192.168.1.100:9200']
      username: 'admin'
      password: 'somepass'
      indices:
        include:
          - index_name_regex: 'wazuh-alerts-4.x*'
      scheduling:
        interval: 'PT5M'
      connection:
        insecure: true
  sink:
    - stdout:

Expected behavior Pipeline connects

Additional context OpenSearch API Return Data:

{
  "name": "node-1",
  "cluster_name": "wazuh-cluster",
  "cluster_uuid": "EzI18fqRRsqyp_bqBT0xyQ",
  "version": {
    "number": "7.10.2",
    "build_type": "rpm",
    "build_hash": "eee49cb340edc6c4d489bcd9324dda571fc8dc03",
    "build_date": "2023-09-20T23:54:29.889267151Z",
    "build_snapshot": false,
    "lucene_version": "9.7.0",
    "minimum_wire_compatibility_version": "7.10.0",
    "minimum_index_compatibility_version": "7.0.0"
  },
  "tagline": "The OpenSearch Project: https://opensearch.org/"
}

Error:

2024-06-21T18:14:35,773 [opensearch-source-pipeline-sink-worker-2-thread-1] ERROR org.opensearch.dataprepper.pipeline.common.PipelineThreadPoolExecutor - Pipeline [opensearch-source-pipeline] process worker encountered a fatal exception, cannot proceed further
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Unable to call info API using the elasticsearch client
    at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
    at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
    at org.opensearch.dataprepper.pipeline.common.PipelineThreadPoolExecutor.afterExecute(PipelineThreadPoolExecutor.java:70) [data-prepper-core-2.8.0.jar:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1137) [?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
    at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
Caused by: java.lang.RuntimeException: Unable to call info API using the elasticsearch client
    at org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy.getDistributionAndVersionNumber(SearchAccessorStrategy.java:199) ~[opensearch-2.8.0.jar:?]
    at org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy.getSearchAccessor(SearchAccessorStrategy.java:115) ~[opensearch-2.8.0.jar:?]
    at org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSource.startProcess(OpenSearchSource.java:75) ~[opensearch-2.8.0.jar:?]
    at org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSource.start(OpenSearchSource.java:65) ~[opensearch-2.8.0.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.startSourceAndProcessors(Pipeline.java:215) ~[data-prepper-core-2.8.0.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.lambda$execute$2(Pipeline.java:260) ~[data-prepper-core-2.8.0.jar:?]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
    ... 2 more
Caused by: co.elastic.clients.util.MissingRequiredPropertyException: Missing required property 'ElasticsearchVersionInfo.buildFlavor'
    at co.elastic.clients.util.ApiTypeHelper.requireNonNull(ApiTypeHelper.java:76) ~[elasticsearch-java-7.17.0.jar:?]
    at co.elastic.clients.elasticsearch._types.ElasticsearchVersionInfo.<init>(ElasticsearchVersionInfo.java:74) ~[elasticsearch-java-7.17.0.jar:?]
    at co.elastic.clients.elasticsearch._types.ElasticsearchVersionInfo.<init>(ElasticsearchVersionInfo.java:50) ~[elasticsearch-java-7.17.0.jar:?]
    at co.elastic.clients.elasticsearch._types.ElasticsearchVersionInfo$Builder.build(ElasticsearchVersionInfo.java:300) ~[elasticsearch-java-7.17.0.jar:?]
    at co.elastic.clients.elasticsearch._types.ElasticsearchVersionInfo$Builder.build(ElasticsearchVersionInfo.java:200) ~[elasticsearch-java-7.17.0.jar:?]
    at co.elastic.clients.json.ObjectBuilderDeserializer.deserialize(ObjectBuilderDeserializer.java:80) ~[elasticsearch-java-7.17.0.jar:?]
    at co.elastic.clients.json.DelegatingDeserializer$SameType.deserialize(DelegatingDeserializer.java:43) ~[elasticsearch-java-7.17.0.jar:?]
    at co.elastic.clients.json.ObjectDeserializer$FieldObjectDeserializer.deserialize(ObjectDeserializer.java:72) ~[elasticsearch-java-7.17.0.jar:?]
    at co.elastic.clients.json.ObjectDeserializer.deserialize(ObjectDeserializer.java:176) ~[elasticsearch-java-7.17.0.jar:?]
    at co.elastic.clients.json.ObjectDeserializer.deserialize(ObjectDeserializer.java:137) ~[elasticsearch-java-7.17.0.jar:?]
    at co.elastic.clients.json.JsonpDeserializer.deserialize(JsonpDeserializer.java:75) ~[elasticsearch-java-7.17.0.jar:?]
    at co.elastic.clients.json.ObjectBuilderDeserializer.deserialize(ObjectBuilderDeserializer.java:79) ~[elasticsearch-java-7.17.0.jar:?]
    at co.elastic.clients.json.DelegatingDeserializer$SameType.deserialize(DelegatingDeserializer.java:43) ~[elasticsearch-java-7.17.0.jar:?]
    at co.elastic.clients.transport.rest_client.RestClientTransport.decodeResponse(RestClientTransport.java:328) ~[elasticsearch-java-7.17.0.jar:?]
    at co.elastic.clients.transport.rest_client.RestClientTransport.getHighLevelResponse(RestClientTransport.java:294) ~[elasticsearch-java-7.17.0.jar:?]
    at co.elastic.clients.transport.rest_client.RestClientTransport.performRequest(RestClientTransport.java:147) ~[elasticsearch-java-7.17.0.jar:?]
    at co.elastic.clients.elasticsearch.ElasticsearchClient.info(ElasticsearchClient.java:983) ~[elasticsearch-java-7.17.0.jar:?]
    at org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy.getDistributionAndVersionNumber(SearchAccessorStrategy.java:196) ~[opensearch-2.8.0.jar:?]
    at org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy.getSearchAccessor(SearchAccessorStrategy.java:115) ~[opensearch-2.8.0.jar:?]
    at org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSource.startProcess(OpenSearchSource.java:75) ~[opensearch-2.8.0.jar:?]
    at org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSource.start(OpenSearchSource.java:65) ~[opensearch-2.8.0.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.startSourceAndProcessors(Pipeline.java:215) ~[data-prepper-core-2.8.0.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.lambda$execute$2(Pipeline.java:260) ~[data-prepper-core-2.8.0.jar:?]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
    ... 2 more
kclinden commented 3 months ago

Looks like elasticsearch returns this value but opensearch does not. https://www.elastic.co/guide/en/elasticsearch/reference/current/rest-api-root.html

Elasticsearch Source: https://github.com/elastic/elasticsearch/blob/e47ec4efc99ca4e643b7e6195b86c1ecdcdfac45/modules/rest-root/src/main/java/org/elasticsearch/rest/root/MainResponse.java#L71

OpenSearch Source: https://github.com/opensearch-project/OpenSearch/blob/f5dbbb04b96bb8866255e4f967241db8bed16141/server/src/main/java/org/opensearch/action/main/MainResponse.java#L121

kclinden commented 3 months ago

Similar to - https://github.com/opensearch-project/data-prepper/issues/3640

dlvenable commented 3 months ago

@kclinden , We do allow for configuring the distribution_version as a fallback when the cluster does not provide it. You can see the logic here.

https://github.com/opensearch-project/data-prepper/blob/980b0ec50f0e7a905fab9a79896b413d9e23230e/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java#L92

Here is the configuration parameter:

https://github.com/opensearch-project/data-prepper/blob/980b0ec50f0e7a905fab9a79896b413d9e23230e/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfiguration.java#L66-L67

And this shows the valid values:

https://github.com/opensearch-project/data-prepper/blob/980b0ec50f0e7a905fab9a79896b413d9e23230e/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfiguration.java#L141

For example:

source:
  opensearch:
    distribution_version: opensearch
dlvenable commented 3 months ago

We need to add this to the documentation. It is not present now.

kclinden commented 2 months ago

@dlvenable thanks. That gets met further. When I set that though my pipeline hangs at the following

dataprepper-1  | 2024-07-02T15:41:36,559 [opensearch-source-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.pipeline.Pipeline - Pipeline [opensearch-source-pipeline] - Submitting request to initiate the pipeline processing