opensearch-project / ml-commons

ml-commons provides a set of common machine learning algorithms, e.g. k-means, or linear regression, to help developers build ML related features within OpenSearch.
Apache License 2.0
96 stars 133 forks source link

[BUG] Neural search: 4xx error ingesting data with Sagemaker external model #2249

Closed tiagoshin closed 5 months ago

tiagoshin commented 7 months ago

What is the bug? I'm trying to use a model hosted in a Sagemaker endpoint in the same AWS Account as the Opensearch cluster to perform a Neural search. The issue that I observe is that, while ingesting data into the index, I observe the following error for many documents:

        {
            "index": {
                "_index": "my-index",
                "_id": "id",
                "status": 400,
                "error": {
                    "type": "status_exception",
                    "reason": "Error from remote service: {\"message\":null}"
                }
            }
        }

I don't see any logs in OpenSearch error logs, and I don't see any 4xx or 5xx requests in Sagemaker. This error only happens with a reasonable amount of data in bulk ingestion, which in this case is 250 records. When I ingest only 20 records, it works. I already tested getting some documents that failed and tried to ingest them separately, and it worked. So, the issue is not with the document or with the Sagemaker model.

How can one reproduce the bug?

  1. First, deploy the bge-base-en-v1.5 embedding model in Sagemaker using this python script:
    
    from sagemaker.jumpstart.model import JumpStartModel

model_id = "huggingface-sentencesimilarity-bge-base-en-v1-5" env = { 'MMS_JOB_QUEUE_SIZE': '100000', } text_embedding_model = JumpStartModel( model_id=model_id, env=env, role="", )

predictor = text_embedding_model.deploy( initial_instance_count=1, instance_type='ml.g5.xlarge' )

2. Once it's deployed, get the SageMaker endpoint.
3. Create a Sagemaker connector in OpenSearch:

POST {{host}}/_plugins/_ml/connectors/_create { "name": "Amazon Sagemaker connector", "description": "The connector to Sagemaker", "version": 1, "protocol": "aws_sigv4", "credential": { "roleArn": "" }, "parameters": { "region": "", "service_name": "sagemaker" }, "actions": [ { "action_type": "predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "", "request_body": "{ \"text_inputs\": \"${parameters.text_inputs}\", \"mode\": \"embedding\" }", "pre_process_function": "\n StringBuilder builder = new StringBuilder();\n builder.append(\"\\"\");\n String first = params.text_docs[0];\n builder.append(first);\n builder.append(\"\\"\");\n def parameters = \"{\" +\"\\"text_inputs\\":\" + builder + \"}\";\n return \"{\" +\"\\"parameters\\":\" + parameters + \"}\";", "post_process_function": "\n def name = \"sentence_embedding\";\n def dataType = \"FLOAT32\";\n if (params.embedding == null || params.embedding.length == 0) {\n return params.message;\n }\n def shape = [params.embedding.length];\n def json = \"{\" +\n \"\\"name\\":\\"\" + name + \"\\",\" +\n \"\\"data_type\\":\\"\" + dataType + \"\\",\" +\n \"\\"shape\\":\" + shape + \",\" +\n \"\\"data\\":\" + params.embedding +\n \"}\";\n return json;\n " } ] }

4. Get the connector id
5. Create a model group:

POST {{host}}/_plugins/_ml/model_groups/_register { "name": "sagemaker-model-group", "description": "Semantic search model group sagemaker" }

6. Get the model group id
7. Upload the model into OpenSearch:

POST {{host}}/_plugins/_ml/models/_register { "name": "bge-base", "function_name": "remote", "model_group_id": "", "description": "test model", "connector_id": "" }

8. Get the model_id
9. Load the model

POST {{host}}/_plugins/_ml/models//_load

10. Create an ingestion pipeline:

PUT {{host}}/_ingest/pipeline/{{pipeline_name}} { "description": "pipeline", "processors": [ { "set": { "field": "passage_text", "value": "{{{field1}}}, {{{field2}}}" } }, { "text_embedding": { "model_id": "", "field_map": { "passage_text": "passage_embedding" } } } ] }

11. Create an index:

PUT {{host}}/

{"mappings": , "settings": ..., "passage_embedding": { "type": "knn_vector", "dimension": 768, "method": { "engine": "nmslib", "space_type": "cosinesimil", "name": "hnsw", "parameters": { "ef_construction": 512, "m": 16 } } }, "passage_text": { "type": "text" }, }

12. Bulk ingest the data

PUT {{host}}//_bulk


**What is the expected behavior?**
It's expected that all documents have the following status in ingestion:

{ "index": { "_index": "index", "_id": "id", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 2, "failed": 0 }, "_seq_no": 1, "_primary_term": 1, "status": 201 }



**What is your host/environment?**
I'm running it in the AWS OpenSearch managed version 2.11.

**Do you have any screenshots?**
Not applicable

**Do you have any additional context?**
Not applicable
sean-zheng-amazon commented 7 months ago

@tiagoshin there was a known issue in ml-commons with 2.11 handling batch inference traffic, which was fixed in 2.12. possible for you to upgrade to 2.12 and retry?

ylwu-amzn commented 7 months ago

@tiagoshin Your connector request boday and pre/post process function are wrong. Can you try this ?

POST {{host}}/_plugins/_ml/connectors/_create
{
  "name": "Amazon Sagemaker connector",
  "description": "The connector to Sagemaker",
  "version": 1,
  "protocol": "aws_sigv4",
  "credential": {
    "roleArn": "<YOUR-ROLE>"
  },
  "parameters": {
    "region": "<YOUR-REGION>",
    "service_name": "sagemaker"
  },
  "actions": [
    {
      "action_type": "predict",
      "method": "POST",
      "headers": {
        "content-type": "application/json"
      },
      "url": "<YOUR-SAGEMAKER-ENDPOINT>",
      "request_body": "{ \"text_inputs\": \"${parameters.text_inputs}\", \"mode\": \"embedding\" }",
      "pre_process_function": "\n    StringBuilder builder = new StringBuilder();\n    builder.append(\"\\\"\");\n    String first = params.text_docs[0];\n    builder.append(first);\n    builder.append(\"\\\"\");\n    def parameters = \"{\" +\"\\\"text_inputs\\\":[\" + builder + \"]}\";\n    return  \"{\" +\"\\\"parameters\\\":\" + parameters + \"}\";",
      "post_process_function": "\n      def name = \"sentence_embedding\";\n      def dataType = \"FLOAT32\";\n      if (params.embedding == null || params.embedding.length == 0) {\n        return params.message;\n      }\n      def embedding = params.embedding[0];\n      def shape = [embedding.length];\n      def json = \"{\" +\n                 \"\\\"name\\\":\\\"\" + name + \"\\\",\" +\n                 \"\\\"data_type\\\":\\\"\" + dataType + \"\\\",\" +\n                 \"\\\"shape\\\":\" + shape + \",\" +\n                 \"\\\"data\\\":\" + embedding +\n                 \"}\";\n      return json;\n    "
    }
  ]
}
tiagoshin commented 7 months ago

Hi @ylwu-amzn, I tried to run it with the code you provided but I got, for all documents in ingestion:

            "index": {
                "_index": "index",
                "_id": "my-id",
                "status": 400,
                "error": {
                    "type": "illegal_argument_exception",
                    "reason": "Invalid JSON in payload"
                }
            }
        },

So I think it's not able to parse the data correctly. The other code that I was using was parsing the data successfully for some cases, so I wonder, why do you think the pre-process and post-process scripts are wrong?

ylwu-amzn commented 7 months ago

@tiagoshin Can you share your sample predict request ?

Zhangxunmt commented 7 months ago

@tiagoshin ,can you check the sample workflows results that I provided below? The below model and connector work fine from my side. Are you having problems when the _bulk request takes more than say 200 records using the same model/workflow?


POST /_plugins/_ml/connectors/_create
{
  "name": "Amazon Sagemaker connector",
  "description": "The connector to Sagemaker",
  "version": 1,
  "protocol": "aws_sigv4",
  "credential": {
    "roleArn": "<YOUR-ROLE>"
  },
  "parameters": {
    "region": "us-east-1",
    "service_name": "sagemaker"
  },
  "actions": [
    {
      "action_type": "predict",
      "method": "POST",
      "headers": {
        "content-type": "application/json"
      },
      "url": <You endpoint url>,
      "request_body": "{ \"text_inputs\": \"${parameters.text_inputs}\", \"mode\": \"embedding\" }",
      "pre_process_function": "\n    StringBuilder builder = new StringBuilder();\n    builder.append(\"\\\"\");\n    String first = params.text_docs[0];\n    builder.append(first);\n    builder.append(\"\\\"\");\n    def parameters = \"{\" +\"\\\"text_inputs\\\":\" + builder + \"}\";\n    return  \"{\" +\"\\\"parameters\\\":\" + parameters + \"}\";",
      "post_process_function": "\n      def name = \"sentence_embedding\";\n      def dataType = \"FLOAT32\";\n      if (params.embedding == null || params.embedding.length == 0) {\n        return params.message;\n      }\n      def shape = [params.embedding.length];\n      def json = \"{\" +\n                 \"\\\"name\\\":\\\"\" + name + \"\\\",\" +\n                 \"\\\"data_type\\\":\\\"\" + dataType + \"\\\",\" +\n                 \"\\\"shape\\\":\" + shape + \",\" +\n                 \"\\\"data\\\":\" + params.embedding +\n                 \"}\";\n      return json;\n    "
    }
  ]
}

POST /_plugins/_ml/models/_register
{
    "name": "bge-base",
    "function_name": "remote",
    "description": "test model",
    "connector_id": "0vfppI4B_1v71XsqvLwp"
}
GET /_plugins/_ml/models/2PfqpI4B_1v71XsqjbwX
POST /_plugins/_ml/models/2PfqpI4B_1v71XsqjbwX/_load

POST /_plugins/_ml/models/2PfqpI4B_1v71XsqjbwX/_predict
{
  "parameters": {
    "text_inputs" : "tests"
  }
}

PUT /_ingest/pipeline/mypipeline
{
    "description": "pipeline",
    "processors": [
        {
            "text_embedding": {
                "model_id": "2PfqpI4B_1v71XsqjbwX",
                "field_map": {
                    "passage_text": "passage_embedding"
                }
            }
        }
    ]
}

PUT /my-nlp-index
{
  "settings": {
    "index.knn": true,
    "default_pipeline": "mypipeline"
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "text"
      },
      "passage_embedding": {
        "type": "knn_vector",
        "dimension": 768,
        "method": {
          "engine": "nmslib",
          "space_type": "cosinesimil",
          "name": "hnsw",
          "parameters": {
            "ef_construction": 512,
            "m": 16
          }
        }
      },
      "passage_text": {
        "type": "text"
      }
    }
  }
}

PUT /my-nlp-index/_doc/1
{
  "passage_text": "A West Virginia university women 's basketball team , officials , and a small gathering of fans are in a West Virginia arena .",
  "id": "4319130149.jpg"
}

POST _bulk
{"index": {"_index": "my-nlp-index", "_id": "1"}}
{"passage_text": "Chart and table of population level and growth rate for the Ogden-Layton metro area from 1950 to 2023. United Nations population projections are also included through the year 2035.\nThe current metro area population of Ogden-Layton in 2023 is 750,000, a 1.63% increase from 2022.\nThe metro area population of Ogden-Layton in 2022 was 738,000, a 1.79% increase from 2021.\nThe metro area population of Ogden-Layton in 2021 was 725,000, a 1.97% increase from 2020.\nThe metro area population of Ogden-Layton in 2020 was 711,000, a 2.16% increase from 2019."}
{"index": {"_index": "my-nlp-index", "_id": "2"}}
{"passage_text": "Chart and table of population level and growth rate for the New York City metro area from 1950 to 2023. United Nations population projections are also included through the year 2035.\\nThe current metro area population of New York City in 2023 is 18,937,000, a 0.37% increase from 2022.\\nThe metro area population of New York City in 2022 was 18,867,000, a 0.23% increase from 2021.\\nThe metro area population of New York City in 2021 was 18,823,000, a 0.1% increase from 2020.\\nThe metro area population of New York City in 2020 was 18,804,000, a 0.01% decline from 2019."}
{"index": {"_index": "my-nlp-index", "_id": "3"}}
{"passage_text": "Chart and table of population level and growth rate for the Chicago metro area from 1950 to 2023. United Nations population projections are also included through the year 2035.\\nThe current metro area population of Chicago in 2023 is 8,937,000, a 0.4% increase from 2022.\\nThe metro area population of Chicago in 2022 was 8,901,000, a 0.27% increase from 2021.\\nThe metro area population of Chicago in 2021 was 8,877,000, a 0.14% increase from 2020.\\nThe metro area population of Chicago in 2020 was 8,865,000, a 0.03% increase from 2019."}
{"index": {"_index": "my-nlp-index", "_id": "4"}}
{"passage_text": "Chart and table of population level and growth rate for the Miami metro area from 1950 to 2023. United Nations population projections are also included through the year 2035.\\nThe current metro area population of Miami in 2023 is 6,265,000, a 0.8% increase from 2022.\\nThe metro area population of Miami in 2022 was 6,215,000, a 0.78% increase from 2021.\\nThe metro area population of Miami in 2021 was 6,167,000, a 0.74% increase from 2020.\\nThe metro area population of Miami in 2020 was 6,122,000, a 0.71% increase from 2019."}
{"index": {"_index": "my-nlp-index", "_id": "5"}}
{"passage_text": "Chart and table of population level and growth rate for the Austin metro area from 1950 to 2023. United Nations population projections are also included through the year 2035.\\nThe current metro area population of Austin in 2023 is 2,228,000, a 2.39% increase from 2022.\\nThe metro area population of Austin in 2022 was 2,176,000, a 2.79% increase from 2021.\\nThe metro area population of Austin in 2021 was 2,117,000, a 3.12% increase from 2020.\\nThe metro area population of Austin in 2020 was 2,053,000, a 3.43% increase from 2019."}
{"index": {"_index": "my-nlp-index", "_id": "6"}}
{"passage_text": "Chart and table of population level and growth rate for the Seattle metro area from 1950 to 2023. United Nations population projections are also included through the year 2035.\\nThe current metro area population of Seattle in 2023 is 3,519,000, a 0.86% increase from 2022.\\nThe metro area population of Seattle in 2022 was 3,489,000, a 0.81% increase from 2021.\\nThe metro area population of Seattle in 2021 was 3,461,000, a 0.82% increase from 2020.\\nThe metro area population of Seattle in 2020 was 3,433,000, a 0.79% increase from 2019."}
ylwu-amzn commented 7 months ago

Seems model input text_inputs could be array of string or just a string. My test is using array of string. @tiagoshin Since you are using string , you can follow @Zhangxunmt 's example

Zhangxunmt commented 7 months ago

I just verified with the above test case with 120 records and results show that all 120 records ingested correctly through _bulk.

tiagoshin commented 7 months ago

Hi @Zhangxunmt, I noticed that you're using the same pre-process and post-process scripts as I provided in the description. I tested the workflow you provided and it works for me, but that's a small scale. My tests with the exact same code that you provided but with 500 elements failed for 26 elements. However, usually, we ingest around 10k documents, so the fact that it works for 120 documents doesn't mean it will work for our use case. I think the same error still persists.

tiagoshin commented 7 months ago

@ylwu-amzn I understand your approach and I tested that the model in the Sagemaker endpoint can indeed receive an input like:

{
    "text_inputs": ["element1", "element2"], 
    "mode": "embedding"
}

However, with the code you provided, I get Invalid JSON in payload when I try to ingest. So, I tried to hit the model predict endpoint directly using an input like:

{
  "parameters": {
    "text_inputs" : ["element1", "element2"]
  }
}

And I got the same error. In theory, this should be a valid json. So it looks like there's a validation in OpenSearch ingestor that doesn't allow us to use a list as text inputs

Zhangxunmt commented 7 months ago

@tiagoshin , How did you run the bulk ingestion with the 500 records? Was it through Python code, Lambda function or Postman, or OS dashboard? Also is it possible to share your ingested data so we can reproduce it easier.

I verified ingesting 260 records both in AOS 2.11 latest version, and open source 2.11/2.13, I didn't get a single error. From the performance I don't feel that the error will happen even I add more data. I tested them through OS dashboard though.

tiagoshin commented 7 months ago

I'm running this test using a file in Postman in AOS 2.11. I believe the issue is exactly about the performance when adding more data. I'll send you the data privately

Zhangxunmt commented 7 months ago

@tiagoshin , I have used exactly the same dataset you shared with us (a set processor to generate passage_text, total 251 records), but still couldn't duplicate the error from my side. Actually all elements are ingested correctly with this response.

{
      "index": {
        "_index": "index",
        "_id": "my_id",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 3,
          "successful": 3,
          "failed": 0
        },
        "_seq_no": 1,
        "_primary_term": 1,
        "status": 201
      }
    }

Is it possible to setup a call so we can go over the scenario and find out the difference?

tiagoshin commented 7 months ago

@Zhangxunmt Sure, let's set up a call. Just to make sure, let me check a few details:

Zhangxunmt commented 7 months ago

@tiagoshin The sagemaker endpoint uses ml.g5.xlarge instance type. I used model_id = "huggingface-sentencesimilarity-bge-base-en-v1-5" in the notebook for deploying the model with the same script in your description.

The OpenSearch cluster is a 3 date-node cluster with instance type r6g.large
Yes the response from _bulk return False in "errors". Here is the header of the response.

{
  "took": 1504,
  "errors": false,
  "items": [
    {
      "index": {
      ... ....
      }
Zhangxunmt commented 7 months ago

@tiagoshin As we discussed over the call, please either send me the ticket/issue for the patch upgrade or try a standard 2.11 version from your side.

tiagoshin commented 7 months ago

@Zhangxunmt I've opened these:

Zhangxunmt commented 7 months ago

@tiagoshin have you already verified using a 2.11 version that that's only the issue in the patch?

tiagoshin commented 7 months ago

@Zhangxunmt Not yet, I'm still pending some answer from Nikhil regarding this topic

Zhangxunmt commented 7 months ago

@tiagoshin, I used a 2.11 patched version but still didn't duplicate the error. The primary shard number of the index is 5, and the replica shards number is 2.

tiagoshin commented 7 months ago

Thanks for the investigation @Zhangxunmt. I'll wait for the cluster upgrade to test it on 2.11 standard. I also already tested in the OpenSearch dev tools with the exact code that you provided and it didn't work, so I strongly believe there's some relation to the OpenSearch version.

tiagoshin commented 6 months ago

Hi @Zhangxunmt, I got the cluster update here to 2.11. With the setup that you provided here, it works now. So I'm able to reproduce your setup, that's great news! However, I see something wrong with the setup because in the data that I sent you privately that has 250 records, I don't have a field called passage_text, because I generate it using the ingestion pipeline using this:

PUT /_ingest/pipeline/mypipeline
{
    "description": "pipeline",
    "processors": [
        {
            "set": {
                "field": "passage_text",
                "value": "'{{{name}}}'. '{{{genre}}}'. '{{{type}}}'. '{{{categories}}}'"
            }
        },
        {
            "text_embedding": {
                "model_id": "2PfqpI4B_1v71XsqjbwX",
                "field_map": {
                    "passage_text": "passage_embedding"
                }
            }
        }
    ]
}

When I define the ingestion pipeline like this and use the same setup to try to ingest the data that I sent you with 250 records, I get the same errors that I was getting before. So, to check if the cause is the ingestion pipeline or not, instead of using the passage_text as the text field for embedding, I changed to use name for this, because that's a field that I have on my data. So, I just ran your code redefining the ingestion pipeline and index as:

PUT /_ingest/pipeline/mypipeline
{
    "description": "pipeline",
    "processors": [
        {
            "text_embedding": {
                "model_id": "2PfqpI4B_1v71XsqjbwX",
                "field_map": {
                    "name": "passage_embedding"
                }
            }
        }
    ]
}

And index as:

PUT /my-nlp-index
{
  "settings": {
    "index.knn": true,
    "default_pipeline": "mypipeline"
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "text"
      },
      "passage_embedding": {
        "type": "knn_vector",
        "dimension": 768,
        "method": {
          "engine": "nmslib",
          "space_type": "cosinesimil",
          "name": "hnsw",
          "parameters": {
            "ef_construction": 512,
            "m": 16
          }
        }
      },
      "name": {
        "type": "text"
      }
    }
  }
}

Could you test with any of these setups on your side to see if you're able to reproduce it? Thank you!

Zhangxunmt commented 6 months ago

Thanks @tiagoshin . I think we are getting close to the root cause. At least our env are identical now. But actually My pipeline is the same as yours. The one in the earlier message was old. I haven't got errors so far.

PUT /_ingest/pipeline/newpipeline
{
    "description": "pipeline",
    "processors": [
      {
        "set": {
            "field": "passage_text",
            "value": "{{{xxx}}}. {{{xxx}}}. {{{xxx}}}. {{{xxx}}}. {{{xxx}}}. {{{xxx}}}.{{{xxx}}}"
        }
      },
      {
        "text_embedding": {
            "model_id": "_ABVpo4BAIll6zTi4CsE",
            "field_map": {
                "passage_text": "passage_embedding"
            }
        }
      }
    ]
}

I can test with your latest setup later. Will update soon.

tiagoshin commented 6 months ago

Hi @Zhangxunmt When using the ingestion pipeline, please make sure that the fields used in the values exist in the data. Thank you for the update! Waiting for your experiments

Zhangxunmt commented 6 months ago

I did the below two experiments with the same connector. These are the data from the dashboard. But none of the case returned error from my side. Can you check if your connector is the same? The credential is hidden but it's just a roleArn defined inside. I used the same 250 data records that you sent me.

{
  "name": "Amazon Sagemaker connector",
  "version": "1",
  "description": "The connector to Sagemaker",
  "protocol": "aws_sigv4",
  "parameters": {
    "service_name": "sagemaker",
    "region": "us-east-1"
  },
  "actions": [
    {
      "action_type": "PREDICT",
      "method": "POST",
      "url": "your url",
      "headers": {
        "content-type": "application/json"
      },
      "request_body": """{ "text_inputs": "${parameters.text_inputs}", "mode": "embedding" }""",
      "pre_process_function": """
    StringBuilder builder = new StringBuilder();
    builder.append("\"");
    String first = params.text_docs[0];
    builder.append(first);
    builder.append("\"");
    def parameters = "{" +"\"text_inputs\":" + builder + "}";
    return  "{" +"\"parameters\":" + parameters + "}";""",
      "post_process_function": """
      def name = "sentence_embedding";
      def dataType = "FLOAT32";
      if (params.embedding == null || params.embedding.length == 0) {
        return params.message;
      }
      def shape = [params.embedding.length];
      def json = "{" +
                 "\"name\":\"" + name + "\"," +
                 "\"data_type\":\"" + dataType + "\"," +
                 "\"shape\":" + shape + "," +
                 "\"data\":" + params.embedding +
                 "}";
      return json;
    """
    }
  ]
}

Pipeline 1: (I also tried using the exact pipeline that you shared with me last week)

PUT /_ingest/pipeline/mypipeline
{
    "description": "pipeline",
    "processors": [
        {
            "set": {
                "field": "passage_text",
                "value": "'{{{name}}}'. '{{{type}}}'. '{{{categories}}}'"
            }
        },
        {
            "text_embedding": {
                "model_id": "_ABVpo4BAIll6zTi4CsE",
                "field_map": {
                    "passage_text": "passage_embedding"
                }
            }
        }
    ]
}

Create the index 1

PUT /myindex
{ 
    "mappings": { ... }, 
    "settings": {
            "index": {
                "default_pipeline": "mypipeline",
                "knn": "true", 
                ...    
             }
}

Ingest with _bulk

PUT /myindex/_bulk

Pipeline 2:

PUT /_ingest/pipeline/mypipeline
{
    "description": "pipeline",
    "processors": [
        {
            "text_embedding": {
                "model_id": "_ABVpo4BAIll6zTi4CsE",
                "field_map": {
                    "name": "passage_embedding"
                }
            }
        }
    ]
}

Index 2:

PUT /my-nlp-index
{
  "settings": {
    "index.knn": true,
    "default_pipeline": "mypipeline"
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "text"
      },
      "passage_embedding": {
        "type": "knn_vector",
        "dimension": 768,
        "method": {
          "engine": "nmslib",
          "space_type": "cosinesimil",
          "name": "hnsw",
          "parameters": {
            "ef_construction": 512,
            "m": 16
          }
        }
      },
      "name": {
        "type": "text"
      }
    }
  }
}

Ingest with _bulk

PUT /my-nlp-index/_bulk

Both experiments returned:

{
  "took": 315,
  "ingest_took": 3081,
  "errors": false,
  "items": [
    {
      "index": {
        "_index": "my-nlp-index",
        "_id": "51c75f7bb6f26ba1cd00002f",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 3,
          "successful": 3,
          "failed": 0
        },
        "_seq_no": 0,
        "_primary_term": 1,
        "status": 201
      }
    },
......
tiagoshin commented 6 months ago

So you weren't able to reproduce the bug using it, right? My connector is defined as:

{
  "name": "Amazon Sagemaker connector",
  "version": "1",
  "description": "The connector to Sagemaker",
  "protocol": "aws_sigv4",
  "credential": {
    "roleArn": "my role arn"
  },
  "parameters": {
    "service_name": "sagemaker",
    "region": "us-east-1"
  },
  "actions": [
    {
      "action_type": "PREDICT",
      "method": "POST",
      "url": "my sagemaker url",
      "headers": {
        "content-type": "application/json"
      },
      "request_body": "{ \"text_inputs\": \"${parameters.text_inputs}\", \"mode\": \"embedding\" }",
      "pre_process_function": "\n    StringBuilder builder = new StringBuilder();\n    builder.append(\"\\\"\");\n    String first = params.text_docs[0];\n    builder.append(first);\n    builder.append(\"\\\"\");\n    def parameters = \"{\" +\"\\\"text_inputs\\\":\" + builder + \"}\";\n    return  \"{\" +\"\\\"parameters\\\":\" + parameters + \"}\";",
      "post_process_function": "\n      def name = \"sentence_embedding\";\n      def dataType = \"FLOAT32\";\n      if (params.embedding == null || params.embedding.length == 0) {\n        return params.message;\n      }\n      def shape = [params.embedding.length];\n      def json = \"{\" +\n                 \"\\\"name\\\":\\\"\" + name + \"\\\",\" +\n                 \"\\\"data_type\\\":\\\"\" + dataType + \"\\\",\" +\n                 \"\\\"shape\\\":\" + shape + \",\" +\n                 \"\\\"data\\\":\" + params.embedding +\n                 \"}\";\n      return json;\n    "
    }
  ]
}

I cannot query using the way you defined; json doesn't recognize triple double quotes. But they look the same, right? Other than that, I don't see any difference in our setups. Are you using any special cluster configuration?

Zhangxunmt commented 6 months ago

@tiagoshin , No I still didn't get any errors. It's the same connector that I shared in the earlier comment. I didn't use any special cluster configuration. I didn't specify the primary shards and replicas so they're all auto set when the index is created. Do you want to setup another call to cross check again? We can go over from the very beginning step again one by one.

tiagoshin commented 6 months ago

@Zhangxunmt I think so. Let's go on a call and check again. That's really weird because our setup looks pretty much the same!

Zhangxunmt commented 6 months ago

@tiagoshin , I see you have 6 data nodes in your domain. I only have 3. So one explain is that you have 6 nodes sending traffic to sageMaker host in parallel, which is double the throughput that I send in my cluster. The sageMaker model service throttled some of you requests and that's why you see this "error from remote service" error.

Zhangxunmt commented 6 months ago

I have kicked off a cluster config change to 6 data nodes using the same instance type as yours. When the new data nodes are ready I will test again with 6 nodes.

Zhangxunmt commented 6 months ago

@tiagoshin , I was able to reproduce the same error as below using the scaled up cluster with 6 data nodes. So this proves the hypothesis that those errors come from the SageMaker throttling.

  "took": 353,
  "ingest_took": 2995,
  "errors": true,
  "items": [
    {
      "index": {
        "_index": "plutotvindex",
        "_id": "5d9492c77ea6f99188738ff1",
        "status": 400,
        "error": {
          "type": "status_exception",
          "reason": """Error from remote service: {"message":null}"""
        }
      }
    },
......
Zhangxunmt commented 6 months ago

Can we try this autoscaling in SageMaker to mitigate this issue? https://docs.aws.amazon.com/sagemaker/latest/dg/endpoint-auto-scaling.html

ylwu-amzn commented 6 months ago

Another way: reduce the bulk size to not exceed Sagemaker model throughput limit. The documents in bulk request will be processed in parallel on multiple data nodes. If you cluster has multiple data nodes, then these data nodes will send request to Sagemaker model in parallel. That may exceed the Sagemaker model capability.

For example, your Sagemaker model can handle 10 requests per second. Now you have 200 docs in one bulk request and these documents will be processed by 6 data nodes. Assume each data node sends out 3 requests per second, then the Sagemaker model will receive 3 * 6 = 18 requests per seconds. That exceeds Sagemaker model throughput limit of 10 requests/second.

tiagoshin commented 6 months ago

Hi @Zhangxunmt , thanks for the investigations and I'm glad you were able to reproduce it. So, I tried autoscaling in Sagemaker but it seems that it doesn't work well when we have machines with GPU, because it can only scale based on CPU usage, so it doesn't work well. The better would be to bump up the number of machines if that's the case, but I'm not sure if that would solve the issue. However, to @ylwu-amzn's point, I understand, but if that's the case, shouldn't we see errors in Sagemaker? Or at least any high peak of resource usage? I don't observe anything suspicious at the Sagemaker monitoring, and it has, in theory, 4xx and 5xx errors metrics. Did you observe these errors? When you say that the Sagemaker have model throughput limit of 10 requests/second, where is this information? Is this a Service Quota? I didn't find anything like that in the Sagemaker docs

Zhangxunmt commented 6 months ago

@tiagoshin , my perception is that SageMaker metrics are not reliable at all. In my testing account, the SageMaker metrics doesn't show any 4xx errors either. It only has spikes to "1" in the "invocation per instance" when the model is invoked, but it still doesn't make sense because invocation number is absolutely more than 1 per instance.

I think the 10 requests/second is an example to explain, which is likely not the exact real throttling limit.

Zhangxunmt commented 6 months ago

https://github.com/opensearch-project/ml-commons/blob/2.11/ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/remote/AwsConnectorExecutor.java#L106. The error is from this line, which indicates the remote service returned error code and returns the error messages. From the response posted earlier, we can see that SageMaker returned 400 status code with ""message":null" in the response body.

Zhangxunmt commented 6 months ago

@tiagoshin another workaround is using a different model like the bedrock text embedding model. Tutorial: shttps://docs.aws.amazon.com/opensearch-service/latest/developerguide/cfn-template.html, https://github.com/opensearch-project/ml-commons/blob/main/docs/tutorials/aws/semantic_search_with_bedrock_cohere_embedding_model.md

tiagoshin commented 6 months ago

@Zhangxunmt I tried it before, but Bedrock has really low quotas. For the Cohere embedding model for example, they only support 2k requests per minute. That's not enough for a full ingestion.

Zhangxunmt commented 6 months ago

@tiagoshin SageMaker team has identified that the throttling from their side caused your ingestion error. I think we can close this issue once the concurrency limit is increased for you.