opensearch-project / ml-commons

ml-commons provides a set of common machine learning algorithms, e.g. k-means, or linear regression, to help developers build ML related features within OpenSearch.
Apache License 2.0
90 stars 128 forks source link

[BUG] Error using Bedrock connector for Claud V2 #2969

Open Jon-AtAWS opened 1 day ago

Jon-AtAWS commented 1 day ago

What is the bug? This query:

response ='converse', body=
  "query": {
    "simple_query_string": {
      "query": "old faded blue jeans",
      "fields": ["question_text"]
  "ext": {
    "generative_qa_parameters": {
      "llm_model": "anthropic.claude-v2",
      "llm_question": "old faded blue jeans",
      "memory_id": f"{memory_id}",
      "context_size": 5,
      "message_size": 5,
      "timeout": 15

Gives the following error

Traceback (most recent call last):
  File "/Users/handler/code/gdata/", line 180, in <module>
    response ='converse', body=
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/client/", line 176, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/client/", line 2364, in search
    return self.transport.perform_request(
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/", line 455, in perform_request
    raise e
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/", line 416, in perform_request
    status, headers_response, data = connection.perform_request(
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/connection/", line 308, in perform_request
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/connection/", line 315, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(
opensearchpy.exceptions.RequestError: RequestError(400, 'status_exception', 'Error validating input schema: Validation failed: [$.parameters: required property \'inputs\' not found] for instance: {"algorithm":"REMOTE","parameters":{"messages":"[{\\"role\\":\\"system\\",\\"content\\":\\"You are a helpful assistant\\"},{\\"role\\":\\"user\\",\\"content\\":\\"Generate a concise and informative answer in less than 100 words for the given question\\"},{\\"role\\":\\"user\\",\\"content\\":\\"QUESTION: faded old blue jeans\\"},{\\"role\\":\\"user\\",\\"content\\":\\"ANSWER:\\"}]","model":"anthropic.claude-v2"},"action_type":null} with schema: {\n    "type": "object",\n    "properties": {\n        "parameters": {\n            "type": "object",\n            "properties": {\n                "inputs": {\n                    "type": "string"\n                }\n            },\n            "required": [\n                "inputs"\n            ]\n        }\n    },\n    "required": [\n        "parameters"\n    ]\n}')

I followed the documentation here: and created the following code:

import boto3
import json
from opensearchpy import OpenSearch
from opensearchpy.helpers import bulk
import os
import uuid

OPENSEARCH_HOST = os.environ.get('OPENSEARCH_HOST', 'localhost')
OPENSEARCH_PORT = os.environ.get('OPENSEARCH_PORT', 9200)
OPENSEARCH_AUTH = (os.environ.get('OPENSEARCH_ADMIN_USER', 'admin'),
                   os.environ.get('OPENSEARCH_ADMIN_PASSWORD', ''))
REGION = os.environ.get('AWS_REGION', 'us-west-2')

# Step 0 set up
os_client = OpenSearch(
  hosts = [{'host': OPENSEARCH_HOST, 'port': OPENSEARCH_PORT}],
  http_auth = OPENSEARCH_AUTH,
  use_ssl = True,
  verify_certs = False,
  ssl_assert_hostname = False,
  ssl_show_warn = False,
  "persistent": {
    "plugins.ml_commons.memory_feature_enabled": True,
    "plugins.ml_commons.rag_pipeline_feature_enabled": True,
    "plugins.ml_commons.trusted_connector_endpoints_regex": [
session = boto3.client('sts', REGION).get_session_token()

## Step 1: Create a connector to a model
response = os_client.transport.perform_request('POST', '/_plugins/_ml/connectors/_create',
    "name": "Amazon Bedrock",
    "description": "Test connector for Amazon Bedrock",
    "version": 1,
    "protocol": "aws_sigv4",
    "credential": {
        "access_key": session['Credentials']['AccessKeyId'],
        "secret_key": session['Credentials']['SecretAccessKey'],
        "session_token": session['Credentials']['SessionToken']
    "parameters": {
        "region": f"{REGION}",
        "service_name": "bedrock",
        "model": "anthropic.claude-v2"
    "actions": [
            "action_type": "predict",
            "method": "POST",
            "headers": {
                "content-type": "application/json"
            "url": "https://bedrock-runtime.${parameters.region}${parameters.model}/invoke",
            "request_body": "{\"prompt\":\"\\n\\nHuman: ${parameters.inputs}\\n\\nAssistant:\",\"max_tokens_to_sample\":300,\"temperature\":0.5,\"top_k\":250,\"top_p\":1,\"stop_sequences\":[\"\\\\n\\\\nHuman:\"]}"
connector_id = response['connector_id']
print(f"Connector ID: {connector_id}")

## Step 2: Register and deploy the model
response = os_client.transport.perform_request('POST', '/_plugins/_ml/models/_register',
    "name": "Amazon Bedrock",
    "function_name": 'remote',
    'description': 'bedrock',
    'connector_id': f'{connector_id}'
task_id = response['task_id']
status = response['status']
while status != 'COMPLETED':
    response = os_client.transport.perform_request('GET', f'/_plugins/_ml/tasks/{task_id}')
    status = response['state']
    print(f"Task status: {status}")
model_id = response['model_id']
print(f'Model ID: {model_id}')

response = os_client.transport.perform_request('POST', f'/_plugins/_ml/models/{model_id}/_deploy', body='')
print(f"Model deploy response: {response}")

## Step 3: Create a search pipeline
response = os_client.transport.perform_request('PUT', f'/_search/pipeline/rag_pipeline',
    "response_processors": [
        "retrieval_augmented_generation": {
          "tag": "conversation demo",
          "description": "Demo pipeline Using Bedrock Connector",
          "model_id": f"{model_id}",
          "context_field_list": ["text"],
          "system_prompt": "You are a helpful assistant",
          "user_instructions": "Generate a concise and informative answer in less than 100 words for the given question"

## Step 4: Ingest RAG data into an index
os_client.indices.delete(index='converse', ignore=[400, 404])
os_client.indices.create(index='converse', body={
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1,
    "" : "rag_pipeline"
  "mappings": {
    "properties": {
      "question_id": {"type": "keyword"},
      "title": { "type": "text" },
      "question_text": {"type": "text"},
      "asin": {"type": "keyword"},
      "bullet_point1": {"type": "text"},
      "bullet_point2": {"type": "text"},
      "bullet_point3": {"type": "text"},
      "bullet_point4": {"type": "text"},
      "bullet_point5": {"type": "text"},
      "product_description": {"type": "text"},
      "brand_name": {"type": "keyword"},
      "item_name": {"type": "text"},
      "question_type": {"type": "keyword"},
      "answer_aggregated": {"type": "keyword"},
      "answers": {
          "properties": {
            "answer_text": {"type": "text"}

rag_source = './datasets/amazon_pqa/amazon_pqa_jeans.json'
with open(rag_source, 'r') as f:
  nline = 0
  buffer = []
  for line in f:
    data = json.loads(line)
          "_op_type": "create",
          "_index": 'converse',
          "_source": data
    nline += 1
    if nline >= 5000:
      print(nline, ' lines processed')
      bulk(os_client, buffer)
    if nline % 5000 == 0:
      print(nline, ' lines processed')
      bulk(os_client, buffer)
      buffer = []

## RAG pipeline

## Step 5: Create a conversation memory
conversation_name = f'conversation-{str(uuid.uuid1())[:8]}'
response = os_client.transport.perform_request('POST', '/_plugins/_ml/memory/',
  body={"name": conversation_name})
memory_id = response['memory_id']
print(f'Memory ID: {memory_id}')

##Step 6: Use the pipeline for RAG
response ='converse', body=
  "query": {
    "simple_query_string": {
      "query": "old faded blue jeans",
      "fields": ["question_text"]
  "ext": {
    "generative_qa_parameters": {
      "llm_model": "anthropic.claude-v2",
      "llm_question": "faded old blue jeans",
      "memory_id": f"{memory_id}",
      "context_size": 5,
      "message_size": 5,
      "timeout": 15

How can one reproduce the bug? Steps to reproduce the behavior: The above code will reproduce the error. The data came from

What is the expected behavior? I should get a text response from the model

What is your host/environment?

Do you have any screenshots? N/A

Additional context If I run the following in Dev Tools, I get a different error

GET converse/_search
  "query": {
    "simple_query_string": {
      "query": "old faded blue jeans",
      "fields": ["question_text"]
  "ext": {
    "generative_qa_parameters": {
      "llm_model": "anthropic.claude-v2",
      "llm_question": "faded old blue jeans",
      "memory_id": "v21hA5IBrpFbJSuYur7o",
      "context_size": 5,
      "message_size": 5,
      "timeout": 15
  "error": {
    "root_cause": [
        "type": "runtime_exception",
        "reason": "Context text not found in search hit {\n  \"_index\" : \"converse\",\n  \"_id\" : \"o21qA5IBrpFbJSuY4uy6\",\n  \"_score\" : 13.451588,\n  \"_source\" : {\n    \"question_id\" : \"Tx1S3K9MJ9OVLJI\",\n    \"question_text\" : \"Is andi faded blue\",\n    \"asin\" : \"B004G9Q83E\",\n    \"bullet_point1\" : \"Sits below waist\",\n    \"bullet_point2\" : \"Slim through seat and thigh\",\n    \"bullet_point3\" : \"Boot cut leg\",\n    \"bullet_point4\" : \"\",\n    \"bullet_point5\" : \"\",\n    \"product_description\" : \"\",\n    \"brand_name\" : \"Levi's\",\n    \"item_name\" : \"Levi's Mens 527 Bootcut Jean, Andi, 30-32\",\n    \"question_type\" : \"yes-no\",\n    \"answer_aggregated\" : \"no\",\n    \"answers\" : [\n      {\n        \"answer_text\" : \"It is more of a navy. It looks a little faded, but is not light.\"\n      },\n      {\n        \"answer_text\" : \"no\"\n      },\n      {\n        \"answer_text\" : \"No, these are a dark wash\"\n      }\n    ]\n  }\n}"
    "type": "runtime_exception",
    "reason": "Context text not found in search hit {\n  \"_index\" : \"converse\",\n  \"_id\" : \"o21qA5IBrpFbJSuY4uy6\",\n  \"_score\" : 13.451588,\n  \"_source\" : {\n    \"question_id\" : \"Tx1S3K9MJ9OVLJI\",\n    \"question_text\" : \"Is andi faded blue\",\n    \"asin\" : \"B004G9Q83E\",\n    \"bullet_point1\" : \"Sits below waist\",\n    \"bullet_point2\" : \"Slim through seat and thigh\",\n    \"bullet_point3\" : \"Boot cut leg\",\n    \"bullet_point4\" : \"\",\n    \"bullet_point5\" : \"\",\n    \"product_description\" : \"\",\n    \"brand_name\" : \"Levi's\",\n    \"item_name\" : \"Levi's Mens 527 Bootcut Jean, Andi, 30-32\",\n    \"question_type\" : \"yes-no\",\n    \"answer_aggregated\" : \"no\",\n    \"answers\" : [\n      {\n        \"answer_text\" : \"It is more of a navy. It looks a little faded, but is not light.\"\n      },\n      {\n        \"answer_text\" : \"no\"\n      },\n      {\n        \"answer_text\" : \"No, these are a dark wash\"\n      }\n    ]\n  }\n}"
  "status": 500
Jon-AtAWS commented 1 day ago

One mystery solved. If I add a sleep statement after loading the data and after creating the memory, then I get the same error as I get in dev tools:

Traceback (most recent call last):
  File "/Users/handler/code/gdata/", line 184, in <module>
    response ='converse', body=
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/client/", line 176, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/client/", line 2364, in search
    return self.transport.perform_request(
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/", line 455, in perform_request
    raise e
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/", line 416, in perform_request
    status, headers_response, data = connection.perform_request(
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/connection/", line 308, in perform_request
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/connection/", line 315, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(
opensearchpy.exceptions.TransportError: TransportError(500, 'runtime_exception', 'Context text not found in search hit {\n  "_index" : "converse",\n  "_id" : "oW58A5IBrpFbJSuYGAC0",\n  "_score" : 13.451588,\n  "_source" : {\n    "question_id" : "Tx1S3K9MJ9OVLJI",\n    "question_text" : "Is andi faded blue",\n    "asin" : "B004G9Q83E",\n    "bullet_point1" : "Sits below waist",\n    "bullet_point2" : "Slim through seat and thigh",\n    "bullet_point3" : "Boot cut leg",\n    "bullet_point4" : "",\n    "bullet_point5" : "",\n    "product_description" : "",\n    "brand_name" : "Levi\'s",\n    "item_name" : "Levi\'s Mens 527 Bootcut Jean, Andi, 30-32",\n    "question_type" : "yes-no",\n    "answer_aggregated" : "no",\n    "answers" : [\n      {\n        "answer_text" : "It is more of a navy. It looks a little faded, but is not light."\n      },\n      {\n        "answer_text" : "no"\n      },\n      {\n        "answer_text" : "No, these are a dark wash"\n      }\n    ]\n  }\n}')
Jon-AtAWS commented 1 day ago

Second mystery solved. I changed the definition of the search pipeline to use 'question_text' in the context_field_list.

response = os_client.transport.perform_request('PUT', f'/_search/pipeline/rag_pipeline',
    "response_processors": [
        "retrieval_augmented_generation": {
          "tag": "conversation demo",
          "description": "Demo pipeline Using Bedrock Connector",
          "model_id": f"{model_id}",
          "context_field_list": ["question_text"],
          "system_prompt": "You are a helpful assistant",
          "user_instructions": "Generate a concise and informative answer in less than 100 words for the given question"

Now I get this error

Traceback (most recent call last):
  File "/Users/handler/code/gdata/", line 184, in <module>
    response ='converse', body=
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/client/", line 176, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/client/", line 2364, in search
    return self.transport.perform_request(
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/", line 455, in perform_request
    raise e
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/", line 416, in perform_request
    status, headers_response, data = connection.perform_request(
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/connection/", line 308, in perform_request
  File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/connection/", line 315, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(
opensearchpy.exceptions.RequestError: RequestError(400, 'status_exception', 'Error validating input schema: Validation failed: [$.parameters: required property \'inputs\' not found] for instance: {"algorithm":"REMOTE","parameters":{"messages":"[{\\"role\\":\\"system\\",\\"content\\":\\"You are a helpful assistant\\"},{\\"role\\":\\"user\\",\\"content\\":\\"Generate a concise and informative answer in less than 100 words for the given question\\"},{\\"role\\":\\"user\\",\\"content\\":\\"SEARCH RESULT 1: Is andi faded blue\\"},{\\"role\\":\\"user\\",\\"content\\":\\"SEARCH RESULT 2: I can\'t tell from the photos if these jeans are a faded black or a faded blue?\\"},{\\"role\\":\\"user\\",\\"content\\":\\"SEARCH RESULT 3: Looking for slightly faded jeans for casual Friday at the office - not dark blue. Any suggestions?\\"},{\\"role\\":\\"user\\",\\"content\\":\\"SEARCH RESULT 4: Which color is the darkest without faded streaks or faded  wrinkles \\\\\\\\\\\\\\"?\\\\\\\\\\\\\\"\\"},{\\"role\\":\\"user\\",\\"content\\":\\"SEARCH RESULT 5: Are any of the denim blue colors solid rather than faded in spots like is so popular now?\\"},{\\"role\\":\\"user\\",\\"content\\":\\"QUESTION: faded old blue jeans\\"},{\\"role\\":\\"user\\",\\"content\\":\\"ANSWER:\\"}]","model":"anthropic.claude-v2"},"action_type":null} with schema: {\n    "type": "object",\n    "properties": {\n        "parameters": {\n            "type": "object",\n            "properties": {\n                "inputs": {\n                    "type": "string"\n                }\n            },\n            "required": [\n                "inputs"\n            ]\n        }\n    },\n    "required": [\n        "parameters"\n    ]\n}')

So it's correctly pulling the context. Looks like I need inputs.

Jon-AtAWS commented 1 day ago

One more turn on this... it's confusing, I used the bedrock connector blueprint from here, but that one's wrong for conversational search. Anyway, there's no parameters.inputs there. So I changed the connector to look like the one on the docs page, by changing the request body to:

request_body = json.dumps({
  "model": "${parameters.model}",
  "messages": "${parameters.messages}",
  "temperature": "${parameters.temperature}"

And the connector to

response = os_client.transport.perform_request('POST', '/_plugins/_ml/connectors/_create',
    "name": "Amazon Bedrock",
    "description": "Test connector for Amazon Bedrock",
    "version": 1,
    "protocol": "aws_sigv4",
    "credential": {
        "access_key": session['Credentials']['AccessKeyId'],
        "secret_key": session['Credentials']['SecretAccessKey'],
        "session_token": session['Credentials']['SessionToken']
    "parameters": {
        "region": f"{REGION}",
        "service_name": "bedrock",
        "model": "anthropic.claude-v2"
    "actions": [
            "action_type": "predict",
            "method": "POST",
            "headers": {
                "content-type": "application/json"
            "url": "https://bedrock-runtime.${parameters.region}${parameters.model}/invoke",
            "request_body": request_body

But that gives:

opensearchpy.exceptions.RequestError: RequestError(400, 'status_exception', 'Error validating input schema: Validation failed: [$.parameters: required property \'inputs\' not found] for instance: {"algorithm":"REMOTE","parameters":{"messages":"[{\\"role\\":\\"system\\",\\"content\\":\\"You are a helpful assistant\\"},{\\"role\\":\\"user\\",\\"content\\":\\"Generate a concise and informative answer in less than 100 words for the given question\\"},{\\"role\\":\\"user\\",\\"content\\":\\"SEARCH RESULT 1: Is andi faded blue\\"},{\\"role\\":\\"user\\",\\"content\\":\\"SEARCH RESULT 2: I can\'t tell from the photos if these jeans are a faded black or a faded blue?\\"},{\\"role\\":\\"user\\",\\"content\\":\\"SEARCH RESULT 3: Looking for slightly faded jeans for casual Friday at the office - not dark blue. Any suggestions?\\"},{\\"role\\":\\"user\\",\\"content\\":\\"SEARCH RESULT 4: Which color is the darkest without faded streaks or faded  wrinkles \\\\\\\\\\\\\\"?\\\\\\\\\\\\\\"\\"},{\\"role\\":\\"user\\",\\"content\\":\\"SEARCH RESULT 5: Are any of the denim blue colors solid rather than faded in spots like is so popular now?\\"},{\\"role\\":\\"user\\",\\"content\\":\\"QUESTION: faded old blue jeans\\"},{\\"role\\":\\"user\\",\\"content\\":\\"ANSWER:\\"}]","model":"anthropic.claude-v2"},"action_type":null} with schema: {\n    "type": "object",\n    "properties": {\n        "parameters": {\n            "type": "object",\n            "properties": {\n                "inputs": {\n                    "type": "string"\n                }\n            },\n            "required": [\n                "inputs"\n            ]\n        }\n    },\n    "required": [\n        "parameters"\n    ]\n}')
yuye-aws commented 6 hours ago

Hi @Jon-AtAWS ! You are receiving this error because model interface for anthropic.claude-v2 model requires that inputs parameter must be specified. I am running with the DEBUG mode, and find the actual response is:

  "algorithm" : "REMOTE",
  "parameters" : {
    "messages" : "[{\"role\":\"system\",\"content\":[{\"type\":\"text\",\"text\":\"You are a helpful assistant\"}]},{\"role\":\"user\",\"content\":[{\"type\":\"text\",\"text\":\"Generate a concise and informative answer in less than 100 words for the given question\"},{\"type\":\"text\",\"text\":\"QUESTION: faded old blue jeans\\n\"},{\"type\":\"text\",\"text\":\"ANSWER:\"}]}]",
    "model" : "anthropic.claude-v2"
  "action_type" : null

This response is formatted from generative_qa_parameters, namely the JAVA class GenerativeQAResponseProcessor.

yuye-aws commented 6 hours ago

According to, it is correct to require inputs for the claude v2 model. Taking a deep dive into the GenerativeQAResponseProcessor.

yuye-aws commented 5 hours ago

I found the root cause. The requests will be formatted from chatCompletionInput into a Map within function getInputParameters. Since OpenAI models and other model APIs differ in request format, getInputParameters treated them differently according to modelProvider. Unfortunately, anthropic.claude-v2 are classified into the OpenAI models.

ylwu-amzn commented 5 hours ago

@Jon-AtAWS , I think you should use

    "generative_qa_parameters": {
      "llm_model": "bedrock/claude",
yuye-aws commented 5 hours ago

@Jon-AtAWS I would suggest a quick workaround:

response ='converse', body=
  "query": {
    "match": {
      "question_text": "old faded blue jeans",
  "ext": {
    "generative_qa_parameters": {
      "llm_model": "anthropic.claude-v2",
      "llm_question": "faded old blue jeans",
      "context_size": 5,
      "message_size": 5,
      "timeout": 15,
      "llm_response_field": "completion"