opensearch-project / data-prepper

OpenSearch Data Prepper is a component of the OpenSearch project that accepts, filters, transforms, enriches, and routes data at scale.
https://opensearch.org/docs/latest/clients/data-prepper/index/
Apache License 2.0
262 stars 202 forks source link

[BUG] Unhelpful error message initializing OpenSearch Ingestion, OpenSearch sink #4195

Open Jon-AtAWS opened 8 months ago

Jon-AtAWS commented 8 months ago

See also: https://github.com/opensearch-project/opensearch-java/issues/473

Looks like we maybe fixed this in the java client, but not in the Python client? Or maybe this is a different code path?

I haven't been able to diagnose exactly what's going on and where the failure is. Here's what's in CloudWatch Logs for the OpenSearch sink initialization

2024-02-27T19:37:23.594 [log-pipeline-sink-worker-2-thread-1] WARN  org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink - Failed to initialize OpenSearch sink with a retryable exception. 
org.opensearch.client.opensearch._types.OpenSearchException: Request failed: [security_exception] authentication/authorization failure
    at org.opensearch.client.transport.aws.AwsSdk2Transport.parseResponse(AwsSdk2Transport.java:473) ~[opensearch-java-2.8.1.jar:?]
    at org.opensearch.client.transport.aws.AwsSdk2Transport.executeSync(AwsSdk2Transport.java:392) ~[opensearch-java-2.8.1.jar:?]
    at org.opensearch.client.transport.aws.AwsSdk2Transport.performRequest(AwsSdk2Transport.java:192) ~[opensearch-java-2.8.1.jar:?]
    at org.opensearch.client.opensearch.indices.OpenSearchIndicesClient.exists(OpenSearchIndicesClient.java:507) ~[opensearch-java-2.8.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.index.NoIsmPolicyManagement.checkIfIndexExistsOnServer(NoIsmPolicyManagement.java:50) ~[opensearch-2.6.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.checkAndCreateIndex(AbstractIndexManager.java:268) ~[opensearch-2.6.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.setupIndex(AbstractIndexManager.java:225) ~[opensearch-2.6.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitializeInternal(OpenSearchSink.java:231) ~[opensearch-2.6.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitialize(OpenSearchSink.java:193) ~[opensearch-2.6.1.jar:?]
    at org.opensearch.dataprepper.model.sink.AbstractSink.initialize(AbstractSink.java:52) ~[data-prepper-api-2.6.1.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.isReady(Pipeline.java:200) ~[data-prepper-core-2.6.1.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.lambda$execute$2(Pipeline.java:252) ~[data-prepper-core-2.6.1.jar:?]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
    at java.base/java.lang.Thread.run(Thread.java:829) [?:?]

FWIW, my pipeline role has:

        {
            "Action": [
                "es:DescribeDomain",
                "es:*"
            ],
            "Resource": "arn:aws:es:us-west-2:OBSCURED:domain/OBSCURED",
            "Effect": "Allow"
        },
Jon-AtAWS commented 8 months ago

Actually, this may not have anything to do with the Python client, it's either the Java client or Data Prepper. Can you re-route?

Utkarsh-Aga commented 8 months ago

Hello @Jon-AtAWS , Can you please once update the Pipeline Role with the following permission, and see if you continue to face the issue ?

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "es:DescribeDomain",
            "Resource": "arn:aws:es:us-west-2:{your-account-id}:domain/{domain-name}"
        },
        {
            "Effect": "Allow",
            "Action": "es:ESHttp*",
            "Resource": "arn:aws:es:us-west-2:{your-account-id}:domain/{domain-name}/*"
        }
    ]
}
Jon-AtAWS commented 8 months ago

Thanks for the fast response!

My pipeline role has (pardon the CDK code)

        pipeline_policy_doc.add_statements(iam.PolicyStatement(**{
            "effect": iam.Effect.ALLOW,
            "resources": [f"{domain.domain_arn}"],
            "actions": [
                "es:ESHttp*",
                "es:DescribeDomain"
            ]
        }))

And I have verified that the statement is correct in the generated policy. It's not deployed right now, so I can't screenshot the IAM console.

The pipeline works if I map the role to FGAC's all_access role, or add "" and "" for index and cluster level permissions to the OpenSearch role I create. There's some permission that I need in OpenSearch FGAC that's causing this error.

Looking at the code for checkIfIndexExists (the failing call), it looks like it's calling HEAD /index_name. I have tried cluster_composite_ops, and a variety of other cluster level permissions for FGAC. At the index level, I have indices_all, and I've also tried crud, read, and write.

My next attempt will be to add indices_all to cluster level permissions. That doesn't make any sense, really, but maybe it will work?

My goal is to find the minimum permissions needed for the FGAC role to work with DataPrepper/OpenSearch Ingestion. If you already know that, then please let me know!

Jon-AtAWS commented 8 months ago

Well, shoot. As I was re-reading that, I realized that my resource is missing a wildcard for the es:ESHttp action. So, I changed to

        pipeline_policy_doc.add_statements(iam.PolicyStatement(**{
            "effect": iam.Effect.ALLOW,
            "resources": [f"{domain.domain_arn}"],
            "actions": [
                "es:DescribeDomain"
            ]
        }))
        pipeline_policy_doc.add_statements(iam.PolicyStatement(**{
            "effect": iam.Effect.ALLOW,
            "resources": [f"{domain.domain_arn}/*"],
            "actions": [
                "es:ESHttp*",
            ]
        }))

And it worked. I still don't understand why it also worked when I changed to all_access for FGAC, I will try that again. And, I still think that we need a better error message, including the entity that was presenting credentials, and the API that was called. Even better if the error message specifies the permissions that I need (whether IAM or FGAC)

Jon-AtAWS commented 8 months ago

Nope, something else made it work.

Here's the error

2024-03-01T17:38:09.531 [Thread-11] WARN  org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink - Failed to initialize OpenSearch sink with a retryable exception. 
org.opensearch.client.opensearch._types.OpenSearchException: Request failed: [security_exception] authentication/authorization failure
    at org.opensearch.client.transport.aws.AwsSdk2Transport.parseResponse(AwsSdk2Transport.java:473) ~[opensearch-java-2.8.1.jar:?]
    at org.opensearch.client.transport.aws.AwsSdk2Transport.executeSync(AwsSdk2Transport.java:392) ~[opensearch-java-2.8.1.jar:?]
    at org.opensearch.client.transport.aws.AwsSdk2Transport.performRequest(AwsSdk2Transport.java:192) ~[opensearch-java-2.8.1.jar:?]
    at org.opensearch.client.opensearch.indices.OpenSearchIndicesClient.exists(OpenSearchIndicesClient.java:507) ~[opensearch-java-2.8.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.index.NoIsmPolicyManagement.checkIfIndexExistsOnServer(NoIsmPolicyManagement.java:50) ~[opensearch-2.6.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.checkAndCreateIndex(AbstractIndexManager.java:268) ~[opensearch-2.6.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.setupIndex(AbstractIndexManager.java:225) ~[opensearch-2.6.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitializeInternal(OpenSearchSink.java:231) ~[opensearch-2.6.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitialize(OpenSearchSink.java:193) ~[opensearch-2.6.1.jar:?]
    at org.opensearch.dataprepper.model.sink.SinkThread.run(SinkThread.java:25) ~[data-prepper-api-2.6.1.jar:?]
    at java.base/java.lang.Thread.run(Thread.java:829) [?:?]

Here is the role's trust relationship

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "osis-pipelines.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

Here is the role's permissions

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": "es:DescribeDomain",
            "Resource": "arn:aws:es:us-west-2:XXXXXXXXXXX:domain/X",
            "Effect": "Allow"
        },
        {
            "Action": "es:ESHttp*",
            "Resource": "arn:aws:es:us-west-2:XXXXXXXXXXX:domain/X/*",
            "Effect": "Allow"
        },
        {
            "Action": "dynamodb:DescribeExport",
            "Resource": "arn:aws:dynamodb:us-west-2:XXXXXXXXXXXX:table/X/export/*",
            "Effect": "Allow"
        },
        {
            "Action": [
                "dynamodb:DescribeStream",
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator"
            ],
            "Resource": "arn:aws:dynamodb:us-west-2:XXXXXXXXXXXX:table/X/stream/*",
            "Effect": "Allow"
        },
        {
            "Action": [
                "dynamodb:DescribeContinuousBackups",
                "dynamodb:DescribeTable",
                "dynamodb:ExportTableToPointInTime"
            ],
            "Resource": "arn:aws:dynamodb:us-west-2:XXXXXXXXXXXX:table/X",
            "Effect": "Allow"
        },
        {
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetObject",
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
            "Resource": "arn:aws:s3:::XXXXXXXXX/X/export/*",
            "Effect": "Allow"
        }
    ]
}

Here is the FGAC permissions

{
  "pipeline_write_role": {
    "reserved": false,
    "hidden": false,
    "cluster_permissions": [
      "cluster_all",
      "indices_all"
    ],
    "index_permissions": [
      {
        "index_patterns": [
          "*"
        ],
        "dls": "",
        "fls": [],
        "masked_fields": [],
        "allowed_actions": [
          "crud",
          "create_index"
        ]
      }
    ],
    "tenant_permissions": [],
    "static": false
  }
}
Jon-AtAWS commented 8 months ago

This:

PUT _plugins/_security/api/roles/pipeline_write_role
{
  "cluster_permissions": ["cluster_monitor", "indices_all"],
  "index_permissions": [
  {
    "index_patterns": [ "*" ],
    "dls": "",
    "fls": [],
    "masked_fields": [],
    "allowed_actions": [
      "indices_all"
    ]
  }]
}

Allowed the sink to initialize

Summarizing

  1. It's NOT the permissions on the IAM role
  2. crud, and create_index are not enough for FGAC

What is the additional permission I need, and can we fix the error message?

kkondaka commented 8 months ago

DataPrepper can only show the message provided by OpenSearch.

Jon-AtAWS commented 8 months ago

It would be good for DP to add context like: "called client's IndexExists method, attempting to validate the non-existence of index 'foo', in order to set the template." Or something like that.

dlvenable commented 8 months ago

@Jon-AtAWS , Yes, I agree with this.

@kkondaka , I think we could accomplish this by adding a try-catch when making the call. Maybe we can even have a special exception for failed requests?

try {
  final BooleanResponse booleanResponse = openSearchClient.indices().exists(
                new ExistsRequest.Builder().index(indexAlias).build());
} catch(Exception ex) {
  throw new OpenSearchRequestException("checking that index exists.", ex);
}

And

class OpenSearchRequestException extends RuntimeExtension {

  @Override
  public String getMessage() {
    return "Failed while" + message + " with error code: " + innerException.getMessage();
  }
}
Jon-AtAWS commented 8 months ago

throw new OpenSearchRequestException("checking that index exists.", ex);

Thanks @dlvenable !

Nitpicking a bit - let's pack as much information in these messages as possible. For instance, in this message, we can add the index name we're checking. If we know the authenticated entity, and its roles, we should add that as well. More information is better!

dlvenable commented 8 months ago

throw new OpenSearchRequestException("checking that index exists.", ex);

Thanks @dlvenable !

Nitpicking a bit - let's pack as much information in these messages as possible. For instance, in this message, we can add the index name we're checking. If we know the authenticated entity, and its roles, we should add that as well. More information is better!

@Jon-AtAWS , This is great feedback. We could quite easily include the index name. The role name is a bit more challenging, but it may be provided back in the OpenSearch response.

krishna-ggk commented 8 months ago

@Jon-AtAWS Based on stacktrace, the failure happened while checking if the index exists which requires indices:admin/exists FGAC permission. I don't see this permission being explicit part of any action group.

Jon-AtAWS commented 8 months ago

I now have this policy

{
  "pipeline_write_role": {
    "reserved": false,
    "hidden": false,
    "cluster_permissions": [
      "indices:admin/exists",
      "cluster_monitor",
      "cluster_composite_ops"
    ],
    "index_permissions": [
      {
        "index_patterns": [
          "*"
        ],
        "dls": "",
        "fls": [],
        "masked_fields": [],
        "allowed_actions": [
          "indices:admin/exists",
          "crud"
        ]
      }
    ],
    "tenant_permissions": [],
    "static": false
  }
}

And receiving this error:

2024-03-07T23:16:55.564 [log-pipeline-sink-worker-2-thread-1] WARN  org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink - Failed to initialize OpenSearch sink with a retryable exception. 
org.opensearch.client.opensearch._types.OpenSearchException: Request failed: [security_exception] authentication/authorization failure
    at org.opensearch.client.transport.aws.AwsSdk2Transport.parseResponse(AwsSdk2Transport.java:473) ~[opensearch-java-2.8.1.jar:?]
    at org.opensearch.client.transport.aws.AwsSdk2Transport.executeSync(AwsSdk2Transport.java:392) ~[opensearch-java-2.8.1.jar:?]
    at org.opensearch.client.transport.aws.AwsSdk2Transport.performRequest(AwsSdk2Transport.java:192) ~[opensearch-java-2.8.1.jar:?]
    at org.opensearch.client.opensearch.indices.OpenSearchIndicesClient.exists(OpenSearchIndicesClient.java:507) ~[opensearch-java-2.8.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.index.NoIsmPolicyManagement.checkIfIndexExistsOnServer(NoIsmPolicyManagement.java:50) ~[opensearch-2.6.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.checkAndCreateIndex(AbstractIndexManager.java:268) ~[opensearch-2.6.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.setupIndex(AbstractIndexManager.java:225) ~[opensearch-2.6.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitializeInternal(OpenSearchSink.java:231) ~[opensearch-2.6.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitialize(OpenSearchSink.java:193) ~[opensearch-2.6.1.jar:?]
    at org.opensearch.dataprepper.model.sink.AbstractSink.initialize(AbstractSink.java:52) ~[data-prepper-api-2.6.1.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.isReady(Pipeline.java:200) ~[data-prepper-core-2.6.1.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.lambda$execute$2(Pipeline.java:252) ~[data-prepper-core-2.6.1.jar:?]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
    at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
Jon-AtAWS commented 8 months ago

This policy is successful (changing to "indices_all" at index level)

PUT _plugins/_security/api/roles/pipeline_write_role
{
    "cluster_permissions": [
        "indices:admin/exists",
        "cluster_monitor",
        "cluster_composite_ops"
    ],
    "index_permissions": [
    {
        "index_patterns": [
          "*"
        ],
        "dls": "",
        "fls": [],
        "masked_fields": [],
        "allowed_actions": [
          "indices_all",
          "crud"
        ]
    }
    ],
    "tenant_permissions": []
}

The docs (https://opensearch.org/docs/latest/security/access-control/default-action-groups/) are not very helpful here. indices_all action group is documented as "Grants all permissions on the index. Equates to indices:*". So, I can't really tell which permission I added that enabled the sink to initialize. And, the error doesn't tell me which permission I need.

krishna-ggk commented 8 months ago

If you have indices_all, ideally you should not require crud. Are you trying to breakdown indices_all further to scope it down for ingestion?

Jon-AtAWS commented 8 months ago

Yes, I am trying to figure out minimum permissions. I understand that I don't need crud, I just threw indices_all in there (as an edit to indices:admin/exists) to verify that the exists perm is not sufficient.