opensearch-project / ml-commons

ml-commons provides a set of common machine learning algorithms, e.g. k-means, or linear regression, to help developers build ML related features within OpenSearch.
Apache License 2.0
99 stars 136 forks source link

[RFC] ML Inference Processors #2173

Open mingshl opened 9 months ago

mingshl commented 9 months ago

Problem statement

Currently, there are different implementations of search processors/ingest processors that use a machine learning model, e.g, TextEmbeddingProcessor for text-embedding models , GenerativeQAResponseProcessor for large language models, PersonalizeRankingResponseProcessor for reranking models housing in AWS personalized Service. Looking forward, when each type of machine learning models has a separate type of processor, the number of processors will grow to be enormous. It will be in-convenient for users when configuring different processors. However, ML Commons plugin supports connecting to a foundation model hosted on an external platform and uploading your own pre-trained model to the OpenSearch cluster, users can utilize the model_id from ml-commons plugins to apply in search/ingest process. We can simplified the multiple implementations of search processors/ingest processors that use a machine learning model.

Motivation:

To Improve the ease of using machine learning models to process ingest/search requests, we are introducing a Machine Learning Inference Processor to OpenSearch-ml-common to uses a machine learning model to read from the data and add the prediction outcome to the data that is being ingested through the ingest pipeline, or return the prediction outcomes along with the search response that is returned through the search pipeline.

Scope:

Out of Scope:

-ML inference Processor is focusing on model inferences and does not handle data transformations. Developers would consider data formatting methods before using ML Inference Processors if the documents and search hit does not match the format of model input. For example, adding the preprocess methods to model connectors (example), utilizing data transforming processing (e.g Split Processors, JsonProcessor ).

Proposed Design:

Create ML Inference processors(ingest side), ML Inference search requests processors, ML Inference search response processors that share the same parameters and extend from same interface that handles getModelInferenceResult.

ML Inference Processors parameters:

parameters Required Default Description
model_id yes - (String) The ID for the model
function_name Optional for externally hosted models, Required for local models remote (String) The function name of the ML model configured in the processor. For local models, valid values are sparse_encoding, sparse_tokenize, text_embedding, and text_similarity. For externally hosted models, valid value is remote.
model_input Optional for externally hosted models, Required for local models (String) A template that defines the input field format expected by the model. Each local model type might use a different set of inputs. For externally hosted models, default is "{ \"parameters\": ${ml_inference.parameters} }
input_map Optional for externally hosted models, Required for local models (List of Map) maps the fields from documents to model input, if no input mapping specified, default to use all fields from documents as model input
output_map Optional for externally hosted models, Required for local models (List of Map) maps the fields from model out to ingest documents, if no output mapping specified, will return all model outputs in a ‘inference_result' field
inference_parameters no The default settings defined in the model (Object) flexible configurations needed for different model predictions can be added in model_config. For example response_filter.
full_response_path Optional for externally hosted models, Required for local models true for local models and false for externally hosted models (Boolean) Set this parameter to true if the model_output_field contains a full JSON path to the field instead of the field name. The model output will then be fully parsed to get the value of the field.
override no false (Boolean) Relevant if an ingested document already contains a field with the name specified in . If override is false, then the input field is skipped. If true, then the existing field value is overridden by the new model output.
ignore_missing no false (Boolean) If true and any of the input fields defined in input_map are missing then those missing fields are quietly ignored, otherwise a missing field causes a failure.
description no - Description of the processor. Useful for describing the purpose of the processor or its configuration.
ignore_failure no false Ignore failures for the processor.
tag no - Identifier for the processor. Useful for debugging and metrics.

Sample Process:

using the following example for a text embedding remote model wupL7Y0Bm1mYgYg_PasK that is connected in ml-common,

curl -XPUT localhost:9200/_ingest/pipeline/test-ingest -d '{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "wupL7Y0Bm1mYgYg_PasK",
        "input_map": [
          {
            "dairy": "input"
          }
        ],

        "output_map": [
          {
            "response": "dairy_embedding"
          }
        ]
      }
    }
  ]
}' -H "Content-Type:Application/json"     

curl -XPUT localhost:9200/daily_index -d ' {
  "settings": {
    "index": {
      "default_pipeline": "test-ingest"
    }
  },
  "mappings": {
    "dynamic": false,
    "properties": {
      "id": {
        "type": "integer"
      },
      "dairy": {
        "type": "text"
      },
      "weather": {
        "type": "text", 
        "fields": {
          "standard": { "type": "text" },  
          "raw": { "type": "keyword" }     
        }
      }
    }
  }
}' -H "Content-Type:Application/json"        
curl -XPUT localhost:9200/daily_index/_doc/1 -d '{
  "id": 1,
  "dairy": ["happy"],
  "weather": "rainy"
  }' -H "Content-Type:Application/json"  
curl -XGET localhost:9200/daily_index/_doc/1 

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          {
  {
  "_index": "daily_index",
  "_id": "1",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "dairy": ["happy"],
    "weather": "rainy",
    "dairy_embedding": [
      -0.052491702,
      0.041711915,
      0.08673346,
      0.0020010993,
      -0.0081961695,
      -0.10907775,
      0.10094219,
      -0.07203556,
      0.037287816
    ]
  }
}

Added after gathering feedbacks for different use cases,

0. using multiple rounds of predictions

Sometimes, a model only accept one model input fields, and we would like to predicts on multiple fields, we need to run the model multiple times. The inference processors can run one model with multiple inference.

curl -XPUT localhost:9200/_ingest/pipeline/test-ingest -d '{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "S7Uk_I0Bgdza-v2klZ72",
        "input_map": [
          {
            "dairy": "input"
          },
          { 
          "weather": "input"
          }
        ],

        "output_map": [
          {
            "response": "dairy_embedding"
          },
                    {
            "response": "weather_embedding"
          }

        ]
      }
    }
  ]
}' -H "Content-Type:Application/json"                                                                                                                                                                                                                                           

in this setting, it will run the model twice and mapping the output accordingly to two document fields.

the sample response would be

   curl -XGET localhost:9200/daily_index/_doc/1

   {
  "_index": "daily_index",
  "_id": "1",
  "_version": 2,
  "_seq_no": 1,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "dairy_embedding": [
      [
        -0.083478354,
        0.05323705,
        -0.005245433
      ]
    ],
    "weather_embedding": [
      [
        0.017304314,
        -0.021530833,
        0.050184276,
        0.08962978
      ],
      [
        -0.049097132,
        -0.032323096,
        0.014746797,
        -0.06299502,
        0.05543841
      ]
    ],
    "weather": [
      "rainy",
      "cloudy"
    ],
    "id": 1,
    "dairy": [
      "happy"
    ]
  }
} 

Handling object type model input:

for example, I want to use a language classification model , this model's predict function is expecting an object, in the format of {"input": ["text"]}, this can be a good complicated case, it's object with a map of list.

model.predict(
{
  "inputs": [
    "opensearch introduce ml inference processor"
  ]
}
)

1. Using the one field as ml input

then in the ml connector, if the use case is using the one field from the document to identify we can define the connectors and in the response body

##connector_1
POST /_plugins/_ml/connectors/_create
{
 "name": "Sagemaker language identification model connector",
 "description": "Connector for classification model",
 "version": 1,
 "protocol": "aws_sigv4",
 "parameters": {
 "region": "us-east-1",
 "service_name": "sagemaker"
 },
 "credential": {
 "access_key": "your_access_key",
 "secret_key": "your_secret_key",
 "session_token": "your_session_token"},
 "actions": [
 {
 "action_type": "predict",
 "method": "POST",
 "url": "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/your_url/invocations",
 "headers": { 
 "content-type": "application/json" 
 },
 "request_body": "{\"inputs\":[\"${parameters.inputs}\"]}" } ]
}

in this request body, it helps formatting the parameters.input field into the desire model input format. In using this model to predict in ml common, we don't need to worry about the format of an object of a map with list, instead, we use parameters.input as we defined in the connectors.

POST /_plugins/_ml/models/3NDCHI4Bwy4GdbSIgXcY/_predict
{
  "parameters": {
    "inputs": "Say this is a test"
  }
}

##returning 
{
  "inference_results": [
    {
      "output": [
        {
          "name": "response",
          "dataAsMap": {
            "response": [
              {
                "label": "en",
                "score": 0.9411176443099976
              }
            ]
          }
        }
      ],
      "status_code": 200
    }
  ]
}

Let's use the inference processors during ingestions for reviews field:

PUT /_ingest/pipeline/test-ingest-language-one
{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "you_model_id",
        "input_map": [
          {
            "reviews": "inputs"
          }
        ],

        "output_map": [
          {
            "response": "reviews_language_classification"
          }
        ]
      }
    }
  ]
} 

## binding the ingest pipeline with the index: 

PUT /product_review_index_1
 {
  "settings": {
    "index": {
      "default_pipeline": "test-ingest-language-one"
    }
  },
  "mappings": {
    "dynamic": true,
    "properties": {
      "id": {
        "type": "integer"
      },
      "reviews": {
        "type": "text"
      },
      "products": {
        "type": "text", 
        "fields": {
          "standard": { "type": "text" },  
          "raw": { "type": "keyword" }     
        }
      }
    }
  }
}

##during ingestion, it auto triggers to the ingest pipeline with ml_inference processors
PUT /product_review_index_1/_doc/1
{
  "id": 1,
  "reviews": "happy purchase, love it!",
  "products": "opensearch hoodies"
  } 

Now the documents getting ingest already has the model output field named reviews_language_classification

{
  "_index": "product_review_index_1",
  "_id": "1",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "reviews": "happy purchase, love it!",
    "reviews_language_classification": [
      {
        "score": 0.9937509894371033,
        "label": "en"
      }
    ],
    "id": 1,
    "products": "opensearch hoodies"
  }
}

Since I turned on dynamic mapping, to use the language label field, we can search using doc path reviews_language_classification.label in search queries .

GET _search
{
  "query": {
    "match": {
      "reviews_language_classification.label": "en"
    }
  }
}

2. Using the multiple fields as ml input

Another use case, @zhichao-aws also mentioned that many models now accepts multiple model input fields, nowadays, text_embedding models and classification models accepts multiple model inputs, we just need to config the connectors properly to meet this multiple input fields requirements. Similarly in the response body, we config two input fields.

##connector_2
POST /_plugins/_ml/connectors/_create
{
 "name": "Sagemaker language identification model connector",
 ...same as above .. 
 "request_body": "{\"inputs\":[\"${parameters.inputs1}\",\"${parameters.inputs2}\"]}" } ]
}

In this case, it's looking for two input fields to the document, and format properly, the ml_inference processor will handle the mappings for reviews -> input1, products -> input2.

PUT /_ingest/pipeline/test-ingest-language-two
{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "19BlHI4Bwy4GdbSI43dk",
        "input_map": [
          {
            "reviews": "inputs1",
            "products": "inputs2"
          }
        ],

        "output_map": [
          {
            "response": "reviews_products_language_classification"
          }
        ]
      }
    }
  ]
}

###returnning 
{
  "_index": "product_review_index_1",
  "_id": "1",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "reviews": "happy purchase, love it!",
    "reviews_products_language_classification": [
      {
        "score": 0.9937509894371033,
        "label": "en"
      },
   {
        "score": 0.9063221654224541,
        "label": "en"
      }
    ],
    "id": 1,
    "products": "opensearch hoodies"
  }
}

3. Formatting with other processors.

in the connectors, it's supported writing post_process_function, and also we can use other processors before ml_inference_processor to handle model input field format or after to handle model output format.

I am continuing the for the second step, and would like parse the model output field reviews_products_language_classification into two fields. this is to seperate an array and append to new fields, we can use a script processor and remove processor to bundle with it. In the future, we can also add a new type of processor maybe called "seperate_append" processor then it would be easier to use.

let's modify the ingest pipeline for the same index

PUT /_ingest/pipeline/test-ingest-language-two
{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "79CAH44Bwy4GdbSIoXeq",
        "input_map": [
          {
            "reviews": "inputs1",
            "products": "inputs2"
          }
        ],

        "output_map": [
          {
            "response": "reviews_products_language_classification"
          }
        ]
      }
    },
    {
     "script": {
          "source": """
          def headers = ["reviews_language", "products_language"];
          for (int i = 0; i < ctx.reviews_products_language_classification.length; i++) {
            ctx[headers[i]] = ctx.reviews_products_language_classification[i];
          }
        """
     }
    },
    {
      "remove": {
        "field": "reviews_products_language_classification"  
      }
    }
  ]
} 

then when ingesting the same document, it returns

{
  "_index": "product_index_3",
  "_id": "1",
  "_version": 2,
  "_seq_no": 3,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "reviews": "happy purchase, highly recommended",
    "products_language": {
      "score": 0.6440341472625732,
      "label": "en"
    },
    "reviews_language": {
      "score": 0.9933781027793884,
      "label": "en"
    },
    "id": 1,
    "products": "opensearch hoodies"
  }
}
zhichao-aws commented 8 months ago

Different processors may have different pre-processing or post-processing logics. Can we customize these in the new processor? For example, when there are multiple inference fields in one document, sparse_encoding processors will obtain different response as follows:

# case remote inference
[{
     "response":{
         [
         { TOKEN_WEIGHT_MAP},
         { TOKEN_WEIGHT_MAP}
         ]
     }
}]

# case local deploy
[{
     "response":{
         [
         { TOKEN_WEIGHT_MAP}
         ]
     }
},{
     "response":{
         [
         { TOKEN_WEIGHT_MAP}
         ]
     }
}]

The processor should fetch those TOKEN_WEIGHT_MAP and assign them to the target field. Can you give an example of how to configure a ml_inference processor with customized logics? Thanks!

zhichao-aws commented 8 months ago

BTW, in neural-search plugin we have implemented an abstract class for ml-inference processor: https://github.com/opensearch-project/neural-search/blob/ea49d3c5006efff9dfa36e69791ae9a8e468d25a/src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java#L35. It can be a reference

mingshl commented 8 months ago

Different processors may have different pre-processing or post-processing logics. Can we customize these in the new processor? For example, when there are multiple inference fields in one document, sparse_encoding processors will obtain different response as follows:

# case remote inference
[{
     "response":{
         [
         { TOKEN_WEIGHT_MAP},
         { TOKEN_WEIGHT_MAP}
         ]
     }
}]

# case local deploy
[{
     "response":{
         [
         { TOKEN_WEIGHT_MAP}
         ]
     }
},{
     "response":{
         [
         { TOKEN_WEIGHT_MAP}
         ]
     }
}]

The processor should fetch those TOKEN_WEIGHT_MAP and assign them to the target field. Can you give an example of how to configure a ml_inference processor with customized logics? Thanks!

currently, the remote model and local model are returning different formats, which makes the post-processing works complicated. We are also planning of standardizing the local model predict output with the same format as the remote model predict output, then it will ease the pain of post processing.

But to the point of multiple inference input fields, it depends on whether the users would like to run one round of prediction call or multiple rounds of prediction call.

for example, the inference processors can run one model with multiple inference.

curl -XPUT localhost:9200/_ingest/pipeline/test-ingest -d '{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "S7Uk_I0Bgdza-v2klZ72",
        "input_map": [
          {
            "dairy": "input"
          },
          { 
          "weather": "input"
          }
        ],

        "output_map": [
          {
            "response": "dairy_embedding"
          },
                    {
            "response": "weather_embedding"
          }

        ]
      }
    }
  ]
}' -H "Content-Type:Application/json"                                                                                                                                                                                                                                           

in this setting, it will run the model twice and mapping the output accordingly to two document fields.

the sample response would be

   curl -XGET localhost:9200/daily_index/_doc/1

   {
  "_index": "daily_index",
  "_id": "1",
  "_version": 2,
  "_seq_no": 1,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "dairy_embedding": [
      [
        -0.083478354,
        0.05323705,
        -0.005245433
      ]
    ],
    "weather_embedding": [
      [
        0.017304314,
        -0.021530833,
        0.050184276,
        0.08962978
      ],
      [
        -0.049097132,
        -0.032323096,
        0.014746797,
        -0.06299502,
        0.05543841
      ]
    ],
    "weather": [
      "rainy",
      "cloudy"
    ],
    "id": 1,
    "dairy": [
      "happy"
    ]
  }
} 
zhichao-aws commented 8 months ago

Different processors may have different pre-processing or post-processing logics. Can we customize these in the new processor? For example, when there are multiple inference fields in one document, sparse_encoding processors will obtain different response as follows:

# case remote inference
[{
     "response":{
         [
         { TOKEN_WEIGHT_MAP},
         { TOKEN_WEIGHT_MAP}
         ]
     }
}]

# case local deploy
[{
     "response":{
         [
         { TOKEN_WEIGHT_MAP}
         ]
     }
},{
     "response":{
         [
         { TOKEN_WEIGHT_MAP}
         ]
     }
}]

The processor should fetch those TOKEN_WEIGHT_MAP and assign them to the target field. Can you give an example of how to configure a ml_inference processor with customized logics? Thanks!

currently, the remote model and local model are returning different formats, which makes the post-processing works complicated. We are also planning of standardizing the local model predict output with the same format as the remote model predict output, then it will ease the pain of post processing.

But to the point of multiple inference input fields, it depends on whether the users would like to run one round of prediction call or multiple rounds of prediction call.

I think for now it's not depend on the preference of users, but the cluster is using remote connector or local deployment. For local deployment we're always running prediction for single document (This may change in the future version because of batch ingestion feature (RFC @chishui ) ). For remote connector we'll send all input docs in one batch and recieve the results in one batch, and the response format is different from local deployment for now. If we want to implement this new processor I think we should take these into consideration. The processor should be able to recognize the number/type of inference results for different deployment types and different ml use cases.

zane-neo commented 8 months ago

Different processors have different inputs and output data structure, e.g. text_embedding processor can accept object type and extract specific fields user configured to inference, but generativeQA input might be a single string, other processors might different with them as well based on their functionality. Also their output data structure can be different: text_embedding is a dense vector and a QA might be a string. So different processors might have totally different configuration, we need to consider if it make sense to simply put all different configuration together. Did we investigate on the current processor configuration or do we have an example that can match all processor configurations? Also we need to think of complex data structure like nested objects, it would be better if the example has complex data structure support.

HenryL27 commented 8 months ago

We are also planning of standardizing the local model predict output with the same format as the remote model predict output, then it will ease the pain of post processing

Remote models don't all have the same output schema, do they? Not sure how this can be accomplished.

I'll also bring up the rerank processor since I haven't seen it mentioned anywhere. 1 particular reranker (ml-opensearch) uses an ml inference. Would the plan for that be to move it to an inference processor or to point it at the InferenceProcessorInterface (or whatev that'll be called)?

chishui commented 8 months ago

If processors are cohesive, similar by nature, it totally makes sense to merge them into one. But if they differ a lot, coupling them together may not bring convenient to users but confusion to them. Although, there is "tag" and "description" parameters user can use to call out the purpose of the processor, but they are optional, and it'll be confusing to users when they use multiple such processors here and there. Additionally, processors will use "inference_parameters" to pass parameters, I'm not sure if it's enough to support all potential use cases and how we enforce certain parameter to be required for certain processor.

It's like having a single OpenAI API for all ML tasks, it's doable, but we need to evaluate the pros and cons to see which option outcompetes the other.

xinyual commented 8 months ago

Different processors may have different pre-processing or post-processing logics. Can we customize these in the new processor? For example, when there are multiple inference fields in one document, sparse_encoding processors will obtain different response as follows:

# case remote inference
[{
     "response":{
         [
         { TOKEN_WEIGHT_MAP},
         { TOKEN_WEIGHT_MAP}
         ]
     }
}]

# case local deploy
[{
     "response":{
         [
         { TOKEN_WEIGHT_MAP}
         ]
     }
},{
     "response":{
         [
         { TOKEN_WEIGHT_MAP}
         ]
     }
}]

The processor should fetch those TOKEN_WEIGHT_MAP and assign them to the target field. Can you give an example of how to configure a ml_inference processor with customized logics? Thanks!

currently, the remote model and local model are returning different formats, which makes the post-processing works complicated. We are also planning of standardizing the local model predict output with the same format as the remote model predict output, then it will ease the pain of post processing.

But to the point of multiple inference input fields, it depends on whether the users would like to run one round of prediction call or multiple rounds of prediction call.

for example, the inference processors can run one model with multiple inference.

curl -XPUT localhost:9200/_ingest/pipeline/test-ingest -d '{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "S7Uk_I0Bgdza-v2klZ72",
        "input_map": [
          {
            "dairy": "input"
          },
          { 
          "weather": "input"
          }
        ],

        "output_map": [
          {
            "response": "dairy_embedding"
          },
                    {
            "response": "weather_embedding"
          }

        ]
      }
    }
  ]
}' -H "Content-Type:Application/json"                                                                                                                                                                                                                                           

in this setting, it will run the model twice and mapping the output accordingly to two document fields.

the sample response would be

   curl -XGET localhost:9200/daily_index/_doc/1

   {
  "_index": "daily_index",
  "_id": "1",
  "_version": 2,
  "_seq_no": 1,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "dairy_embedding": [
      [
        -0.083478354,
        0.05323705,
        -0.005245433
      ]
    ],
    "weather_embedding": [
      [
        0.017304314,
        -0.021530833,
        0.050184276,
        0.08962978
      ],
      [
        -0.049097132,
        -0.032323096,
        0.014746797,
        -0.06299502,
        0.05543841
      ]
    ],
    "weather": [
      "rainy",
      "cloudy"
    ],
    "id": 1,
    "dairy": [
      "happy"
    ]
  }
} 

Why we need to run the model twice? Currently the logic like text embedding will gather all input field texts and send to ml model together. Is there any scenario we need to call model twice instead of together in one processor?

mingshl commented 8 months ago

Different processors may have different pre-processing or post-processing logics. Can we customize these in the new processor? For example, when there are multiple inference fields in one document, sparse_encoding processors will obtain different response as follows:

# case remote inference
[{
     "response":{
         [
         { TOKEN_WEIGHT_MAP},
         { TOKEN_WEIGHT_MAP}
         ]
     }
}]

# case local deploy
[{
     "response":{
         [
         { TOKEN_WEIGHT_MAP}
         ]
     }
},{
     "response":{
         [
         { TOKEN_WEIGHT_MAP}
         ]
     }
}]

The processor should fetch those TOKEN_WEIGHT_MAP and assign them to the target field. Can you give an example of how to configure a ml_inference processor with customized logics? Thanks!

currently, the remote model and local model are returning different formats, which makes the post-processing works complicated. We are also planning of standardizing the local model predict output with the same format as the remote model predict output, then it will ease the pain of post processing. But to the point of multiple inference input fields, it depends on whether the users would like to run one round of prediction call or multiple rounds of prediction call. for example, the inference processors can run one model with multiple inference.

curl -XPUT localhost:9200/_ingest/pipeline/test-ingest -d '{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "S7Uk_I0Bgdza-v2klZ72",
        "input_map": [
          {
            "dairy": "input"
          },
          { 
          "weather": "input"
          }
        ],

        "output_map": [
          {
            "response": "dairy_embedding"
          },
                    {
            "response": "weather_embedding"
          }

        ]
      }
    }
  ]
}' -H "Content-Type:Application/json"                                                                                                                                                                                                                                           

in this setting, it will run the model twice and mapping the output accordingly to two document fields. the sample response would be

   curl -XGET localhost:9200/daily_index/_doc/1

   {
  "_index": "daily_index",
  "_id": "1",
  "_version": 2,
  "_seq_no": 1,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "dairy_embedding": [
      [
        -0.083478354,
        0.05323705,
        -0.005245433
      ]
    ],
    "weather_embedding": [
      [
        0.017304314,
        -0.021530833,
        0.050184276,
        0.08962978
      ],
      [
        -0.049097132,
        -0.032323096,
        0.014746797,
        -0.06299502,
        0.05543841
      ]
    ],
    "weather": [
      "rainy",
      "cloudy"
    ],
    "id": 1,
    "dairy": [
      "happy"
    ]
  }
} 

Why we need to run the model twice? Currently the logic like text embedding will gather all input field texts and send to ml model together. Is there any scenario we need to call model twice instead of together in one processor?

It depends on the model input and also use case. Some models only accept one input field, then two input fields require two rounds of prediction. And yes if a model accepts multiple input fields, we can call model and feed multiple input fields in one prediction. That's a common case.

Please keep in the mind that all remote models are deployed with a connector, where the model input field name can be defined, the pre-processing function and post processing function will help with the transformation of the data format for model input and output as well.

For the example, this blueprint:

"request_body": "{ \"input\": ${parameters.input}}", --> is expected a model input field input, but if the model can accept two input fields, then we can make a connector in with this response body, "request_body": "{ \"input1\": ${parameters.input1}, \"input2\": ${parameters.input2}}". Then we can map two document fields into input1 and input2 to send to one model prediction.

Similarly, the "pre_process_function": "connector.pre_process.openai.embedding", "post_process_function": "connector.post_process.openai.embedding" are responsible for pre-processing data to model input and post processing data to become desired ingest document.

mingshl commented 8 months ago

Different processors have different inputs and output data structure, e.g. text_embedding processor can accept object type and extract specific fields user configured to inference, but generativeQA input might be a single string, other processors might different with them as well based on their functionality. Also their output data structure can be different: text_embedding is a dense vector and a QA might be a string. So different processors might have totally different configuration, we need to consider if it make sense to simply put all different configuration together. Did we investigate on the current processor configuration or do we have an example that can match all processor configurations? Also we need to think of complex data structure like nested objects, it would be better if the example has complex data structure support.

I want to emphasize that in the design of using ml connectors, ml_inference processors and other processors, it provides flexibilities for users to handle various models,

in your first concern about different input format, for example, object type, ml connectors will help handling different input format,

Handling object type model input:

for example, I want to use a language classification model , this model's predict function is expecting an object, in the format of {"input": ["text"]}, this can be a good complicated case, it's object with a map of list.

model.predict(
{
  "inputs": [
    "opensearch introduce ml inference processor"
  ]
}
)

1. Using the one field as ml input

then in the ml connector, if the use case is using the one field from the document to identify we can define the connectors and in the response body

##connector_1
POST /_plugins/_ml/connectors/_create
{
 "name": "Sagemaker language identification model connector",
 "description": "Connector for classification model",
 "version": 1,
 "protocol": "aws_sigv4",
 "parameters": {
 "region": "us-east-1",
 "service_name": "sagemaker"
 },
 "credential": {
 "access_key": "your_access_key",
 "secret_key": "your_secret_key",
 "session_token": "your_session_token"},
 "actions": [
 {
 "action_type": "predict",
 "method": "POST",
 "url": "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/your_url/invocations",
 "headers": { 
 "content-type": "application/json" 
 },
 "request_body": "{\"inputs\":[\"${parameters.inputs}\"]}" } ]
}

in this request body, it helps formatting the parameters.input field into the desire model input format. In using this model to predict in ml common, we don't need to worry about the format of an object of a map with list, instead, we use parameters.input as we defined in the connectors.

POST /_plugins/_ml/models/3NDCHI4Bwy4GdbSIgXcY/_predict
{
  "parameters": {
    "inputs": "Say this is a test"
  }
}

##returning 
{
  "inference_results": [
    {
      "output": [
        {
          "name": "response",
          "dataAsMap": {
            "response": [
              {
                "label": "en",
                "score": 0.9411176443099976
              }
            ]
          }
        }
      ],
      "status_code": 200
    }
  ]
}

Let's use the inference processors during ingestions for reviews field:

PUT /_ingest/pipeline/test-ingest-language-one
{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "you_model_id",
        "input_map": [
          {
            "reviews": "inputs"
          }
        ],

        "output_map": [
          {
            "response": "reviews_language_classification"
          }
        ]
      }
    }
  ]
} 

## binding the ingest pipeline with the index: 

PUT /product_review_index_1
 {
  "settings": {
    "index": {
      "default_pipeline": "test-ingest-language-one"
    }
  },
  "mappings": {
    "dynamic": true,
    "properties": {
      "id": {
        "type": "integer"
      },
      "reviews": {
        "type": "text"
      },
      "products": {
        "type": "text", 
        "fields": {
          "standard": { "type": "text" },  
          "raw": { "type": "keyword" }     
        }
      }
    }
  }
}

##during ingestion, it auto triggers to the ingest pipeline with ml_inference processors
PUT /product_review_index_1/_doc/1
{
  "id": 1,
  "reviews": "happy purchase, love it!",
  "products": "opensearch hoodies"
  } 

Now the documents getting ingest already has the model output field named reviews_language_classification

{
  "_index": "product_review_index_1",
  "_id": "1",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "reviews": "happy purchase, love it!",
    "reviews_language_classification": [
      {
        "score": 0.9937509894371033,
        "label": "en"
      }
    ],
    "id": 1,
    "products": "opensearch hoodies"
  }
}

Since I turned on dynamic mapping, to use the language label field, we can search using doc path reviews_language_classification.label in search queries .

GET _search
{
  "query": {
    "match": {
      "reviews_language_classification.label": "en"
    }
  }
}

2. Using the multiple fields as ml input

Another use case, @zhichao-aws also mentioned that many models now accepts multiple model input fields, nowadays, text_embedding models and classification models accepts multiple model inputs, we just need to config the connectors properly to meet this multiple input fields requirements. Similarly in the response body, we config two input fields.

##connector_2
POST /_plugins/_ml/connectors/_create
{
 "name": "Sagemaker language identification model connector",
 ...same as above .. 
 "request_body": "{\"inputs\":[\"${parameters.inputs1}\",\"${parameters.inputs2}\"]}" } ]
}

In this case, it's looking for two input fields to the document, and format properly, the ml_inference processor will handle the mappings for reviews -> input1, products -> input2.

PUT /_ingest/pipeline/test-ingest-language-two
{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "19BlHI4Bwy4GdbSI43dk",
        "input_map": [
          {
            "reviews": "inputs1",
            "products": "inputs2"
          }
        ],

        "output_map": [
          {
            "response": "reviews_products_language_classification"
          }
        ]
      }
    }
  ]
}

###returnning 
{
  "_index": "product_review_index_1",
  "_id": "1",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "reviews": "happy purchase, love it!",
    "reviews_products_language_classification": [
      {
        "score": 0.9937509894371033,
        "label": "en"
      },
   {
        "score": 0.9063221654224541,
        "label": "en"
      }
    ],
    "id": 1,
    "products": "opensearch hoodies"
  }
}

3. Formatting with other processors.

in the connectors, it's supported writing post_process_function, and also we can use other processors before ml_inference_processor to handle model input field format or after to handle model output format.

I am continuing the for the second step, and would like parse the model output field reviews_products_language_classification into two fields. this is to seperate an array and append to new fields, we can use a script processor and remove processor to bundle with it. In the future, we can also add a new type of processor maybe called "seperate_append" processor then it would be easier to use.

let's modify the ingest pipeline for the same index

PUT /_ingest/pipeline/test-ingest-language-two
{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "79CAH44Bwy4GdbSIoXeq",
        "input_map": [
          {
            "reviews": "inputs1",
            "products": "inputs2"
          }
        ],

        "output_map": [
          {
            "response": "reviews_products_language_classification"
          }
        ]
      }
    },
    {
     "script": {
          "source": """
          def headers = ["reviews_language", "products_language"];
          for (int i = 0; i < ctx.reviews_products_language_classification.length; i++) {
            ctx[headers[i]] = ctx.reviews_products_language_classification[i];
          }
        """
     }
    },
    {
      "remove": {
        "field": "reviews_products_language_classification"  
      }
    }
  ]
} 

then when ingesting the same document, it returns

{
  "_index": "product_index_3",
  "_id": "1",
  "_version": 2,
  "_seq_no": 3,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "reviews": "happy purchase, highly recommended",
    "products_language": {
      "score": 0.6440341472625732,
      "label": "en"
    },
    "reviews_language": {
      "score": 0.9933781027793884,
      "label": "en"
    },
    "id": 1,
    "products": "opensearch hoodies"
  }
}
xinyual commented 8 months ago

Different processors have different inputs and output data structure, e.g. text_embedding processor can accept object type and extract specific fields user configured to inference, but generativeQA input might be a single string, other processors might different with them as well based on their functionality. Also their output data structure can be different: text_embedding is a dense vector and a QA might be a string. So different processors might have totally different configuration, we need to consider if it make sense to simply put all different configuration together. Did we investigate on the current processor configuration or do we have an example that can match all processor configurations? Also we need to think of complex data structure like nested objects, it would be better if the example has complex data structure support.

I want to emphasize that in the design of using ml connectors, ml_inference processors and other processors, it provides flexibilities for users to handle various models,

in your first concern about different input format, for example, object type, ml connectors will help handling different input format,

Handling object type model input:

for example, I want to use a language classification model , this model's predict function is expecting an object, in the format of {"input": ["text"]}, this can be a good complicated case, it's object with a map of list.

model.predict(
{
  "inputs": [
    "opensearch introduce ml inference processor"
  ]
}
)

1. Using the one field as ml input

then in the ml connector, if the use case is using the one field from the document to identify we can define the connectors and in the response body

##connector_1
POST /_plugins/_ml/connectors/_create
{
 "name": "Sagemaker language identification model connector",
 "description": "Connector for classification model",
 "version": 1,
 "protocol": "aws_sigv4",
 "parameters": {
 "region": "us-east-1",
 "service_name": "sagemaker"
 },
 "credential": {
 "access_key": "your_access_key",
 "secret_key": "your_secret_key",
 "session_token": "your_session_token"},
 "actions": [
 {
 "action_type": "predict",
 "method": "POST",
 "url": "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/your_url/invocations",
 "headers": { 
 "content-type": "application/json" 
 },
 "request_body": "{\"inputs\":[\"${parameters.inputs}\"]}" } ]
}

in this request body, it helps formatting the parameters.input field into the desire model input format. In using this model to predict in ml common, we don't need to worry about the format of an object of a map with list, instead, we use parameters.input as we defined in the connectors.

POST /_plugins/_ml/models/3NDCHI4Bwy4GdbSIgXcY/_predict
{
  "parameters": {
    "inputs": "Say this is a test"
  }
}

##returning 
{
  "inference_results": [
    {
      "output": [
        {
          "name": "response",
          "dataAsMap": {
            "response": [
              {
                "label": "en",
                "score": 0.9411176443099976
              }
            ]
          }
        }
      ],
      "status_code": 200
    }
  ]
}

Let's use the inference processors during ingestions for reviews field:

PUT /_ingest/pipeline/test-ingest-language-one
{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "you_model_id",
        "input_map": [
          {
            "reviews": "inputs"
          }
        ],

        "output_map": [
          {
            "response": "reviews_language_classification"
          }
        ]
      }
    }
  ]
} 

## binding the ingest pipeline with the index: 

PUT /product_review_index_1
 {
  "settings": {
    "index": {
      "default_pipeline": "test-ingest-language-one"
    }
  },
  "mappings": {
    "dynamic": true,
    "properties": {
      "id": {
        "type": "integer"
      },
      "reviews": {
        "type": "text"
      },
      "products": {
        "type": "text", 
        "fields": {
          "standard": { "type": "text" },  
          "raw": { "type": "keyword" }     
        }
      }
    }
  }
}

##during ingestion, it auto triggers to the ingest pipeline with ml_inference processors
PUT /product_review_index_1/_doc/1
{
  "id": 1,
  "reviews": "happy purchase, love it!",
  "products": "opensearch hoodies"
  } 

Now the documents getting ingest already has the model output field named reviews_language_classification

{
  "_index": "product_review_index_1",
  "_id": "1",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "reviews": "happy purchase, love it!",
    "reviews_language_classification": [
      {
        "score": 0.9937509894371033,
        "label": "en"
      }
    ],
    "id": 1,
    "products": "opensearch hoodies"
  }
}

Since I turned on dynamic mapping, to use the language label field, we can search using doc path reviews_language_classification.label in search queries .

GET _search
{
  "query": {
    "match": {
      "reviews_language_classification.label": "en"
    }
  }
}

2. Using the multiple fields as ml input

Another use case, @zhichao-aws also mentioned that many models now accepts multiple model input fields, nowadays, text_embedding models and classification models accepts multiple model inputs, we just need to config the connectors properly to meet this multiple input fields requirements. Similarly in the response body, we config two input fields.

##connector_2
POST /_plugins/_ml/connectors/_create
{
 "name": "Sagemaker language identification model connector",
 ...same as above .. 
 "request_body": "{\"inputs\":[\"${parameters.inputs1}\",\"${parameters.inputs2}\"]}" } ]
}

In this case, it's looking for two input fields to the document, and format properly, the ml_inference processor will handle the mappings for reviews -> input1, products -> input2.

PUT /_ingest/pipeline/test-ingest-language-two
{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "19BlHI4Bwy4GdbSI43dk",
        "input_map": [
          {
            "reviews": "inputs1",
            "products": "inputs2"
          }
        ],

        "output_map": [
          {
            "response": "reviews_products_language_classification"
          }
        ]
      }
    }
  ]
}

###returnning 
{
  "_index": "product_review_index_1",
  "_id": "1",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "reviews": "happy purchase, love it!",
    "reviews_products_language_classification": [
      {
        "score": 0.9937509894371033,
        "label": "en"
      },
   {
        "score": 0.9063221654224541,
        "label": "en"
      }
    ],
    "id": 1,
    "products": "opensearch hoodies"
  }
}

3. Formatting with other processors.

in the connectors, it's supported writing post_process_function, and also we can use other processors before ml_inference_processor to handle model input field format or after to handle model output format.

I am continuing the for the second step, and would like parse the model output field reviews_products_language_classification into two fields. this is to seperate an array and append to new fields, we can use a script processor and remove processor to bundle with it. In the future, we can also add a new type of processor maybe called "seperate_append" processor then it would be easier to use.

let's modify the ingest pipeline for the same index

PUT /_ingest/pipeline/test-ingest-language-two
{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "79CAH44Bwy4GdbSIoXeq",
        "input_map": [
          {
            "reviews": "inputs1",
            "products": "inputs2"
          }
        ],

        "output_map": [
          {
            "response": "reviews_products_language_classification"
          }
        ]
      }
    },
    {
     "script": {
          "source": """
          def headers = ["reviews_language", "products_language"];
          for (int i = 0; i < ctx.reviews_products_language_classification.length; i++) {
            ctx[headers[i]] = ctx.reviews_products_language_classification[i];
          }
        """
     }
    },
    {
      "remove": {
        "field": "reviews_products_language_classification"  
      }
    }
  ]
} 

then when ingesting the same document, it returns

{
  "_index": "product_index_3",
  "_id": "1",
  "_version": 2,
  "_seq_no": 3,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "reviews": "happy purchase, highly recommended",
    "products_language": {
      "score": 0.6440341472625732,
      "label": "en"
    },
    "reviews_language": {
      "score": 0.9933781027793884,
      "label": "en"
    },
    "id": 1,
    "products": "opensearch hoodies"
  }
}

Does it support multiply output field? For example, I want to map field A result to field Aout, field B result to field Bout.

mingshl commented 8 months ago

@xinyual yes, it supports multiple output fields' mapping. Because input_map and output_map are both (List of Map) in the parameters.

for example, if a model returns

{
  "response": {
    "text_embedding": [0.9874324,0.234324],
     "token_embedding": [0.5765724,0.234324]
  }
}

you can define the output_map as


        "output_map": [
          {
            "response.text_embedding": "sentence_embedding",
             "response.token_embedding": "token_embedding"
          }
        ]

As long as the response is a Map, the dot path notation is also supported to find subfields in the model output.

zane-neo commented 8 months ago

Sometimes, a model only accept one model input fields, and we would like to predicts on multiple fields, we need to run the model multiple times. The inference processors can run one model with multiple inference.

Is this mean that if multiple fields need embedding, we will run the predict mutilple times?

model.predict( { "inputs": [ "opensearch introduce ml inference processor" ] } )

Looks like this is specifically for remote model, will we implement same for local model?

austintlee commented 8 months ago

Not sure I see this in the PR, but will the user be able to set up ML ingest nodes separately from other ML (inference) nodes?

mingshl commented 7 months ago

Sometimes, a model only accept one model input fields, and we would like to predicts on multiple fields, we need to run the model multiple times. The inference processors can run one model with multiple inference.

Is this mean that if multiple fields need embedding, we will run the predict mutilple times?

model.predict( { "inputs": [ "opensearch introduce ml inference processor" ] } )

Looks like this is specifically for remote model, will we implement same for local model?

so multiple fields are used for embedding and would like to conduct a mini batch in one prediction, we can do

PUT /_ingest/pipeline/test-ingest-language-two

{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "19BlHI4Bwy4GdbSI43dk",
        "input_map": [
          {
            "reviews": "inputs",
            "products": "inputs"
          }
        ],

        "output_map": [
          {
            "response.0": "reviews_language_classification",
            "response.1": "products_language_classification",
          }
        ]
      }
    }
  ]
}

if two document fields mapped to the same model input fields, the processors would concatenate into a list ["this is review1","product 1"] and send to one prediction call.

I will include this example in the IT tests.

mingshl commented 7 months ago

Not sure I see this in the PR, but will the user be able to set up ML ingest nodes separately from other ML (inference) nodes?

Hi @austintlee , we didn't consider this into requirement when we come up with the design. But can you address your use case, for the reason why you want to separate the ML ingest nodes and ML inference nodes?

mingshl commented 7 months ago

Sometimes, a model only accept one model input fields, and we would like to predicts on multiple fields, we need to run the model multiple times. The inference processors can run one model with multiple inference.

Is this mean that if multiple fields need embedding, we will run the predict mutilple times?

model.predict( { "inputs": [ "opensearch introduce ml inference processor" ] } )

Looks like this is specifically for remote model, will we implement same for local model?

We will support local model incrementally. The reason behind that is the local model now has different inpuDataset for predictions. And we would like to unify the inputDataset for local models then enable the ml inference processors for local models.

zane-neo commented 7 months ago

Sometimes, a model only accept one model input fields, and we would like to predicts on multiple fields, we need to run the model multiple times. The inference processors can run one model with multiple inference.

Is this mean that if multiple fields need embedding, we will run the predict mutilple times?

model.predict( { "inputs": [ "opensearch introduce ml inference processor" ] } )

Looks like this is specifically for remote model, will we implement same for local model?

We will support local model incrementally. The reason behind that is the local model now has different inpuDataset for predictions. And we would like to unify the inputDataset for local models then enable the ml inference processors for local models.

We can implement incrementally, but I prefer we have the design in the beginning, so that we can know if current implementation for remote case is best or not, otherwise we may need to change the current implementation when implementing for local model.

br3no commented 7 months ago

Please also consider the use-case of asymmetric embedding models (e.g. https://huggingface.co/intfloat/multilingual-e5-small). These models require the content to be embedded to be prefixed by "signal strings" that give the model the information whether it is embedding passages or queries.

I haven't seen this use-case reflected in the discussion, but I might just have missed it in the comments.

The support for asymmetric embedding models has been newly introduced to ml-commons (cf. #1799).

ylwu-amzn commented 7 months ago

Please also consider the use-case of asymmetric embedding models (e.g. https://huggingface.co/intfloat/multilingual-e5-small). These models require the content to be embedded to be prefixed by "signal strings" that give the model the information whether it is embedding passages or queries.

I haven't seen this use-case reflected in the discussion, but I might just have missed it in the comments.

The support for asymmetric embedding models has been newly introduced to ml-commons (cf. #1799).

Thanks @br3no , we will test this case. BTW, in 2.14 we are going to release ingest processor, the search processor will be in 2.15.

mingshl commented 6 months ago

updated timeline: the search response processor will be released in 2.16 open search version.

mingshl commented 4 months ago

For ML inference response processors, it involves a list of documents in the search response under "_source" 's value.

For inference scenario, there are two scenarios:

  1. Many-to-one: taking the field from every document and compile a list of input to be one round of prediction. N documents will make one prediction and return a list of output and added accordingly back to the documents based on the index.

2.One-to-one: taking one field from one document as model input and send one prediction call. N document will make N prediction call, and every prediction output will add back to the document.

Many to one can be the default setting of ML inference response processors. How to support One-to-one inference?

Here are two proposed solutions:

using this sample response for discussion

Sample response in hits:

[
  {
    "_index": "daily_index",
    "_id": "1",
    "_score": 1,
    "_source": {
      "diary": [
        "happy"
      ],
      "weather": {
        "forcast": "rainy",
        "last_year": "cloudy"
      },
      "activities": [
        {
          "sleep": {
            "Date": "2024-07-05",
            "Location": "stay home",
            "Person": "me"
          }
        }
      ]
    }
  },
  {
    "_index": "daily_index",
    "_id": "2",
    "_score": 1,
    "_source": {
      "diary": [
        "excited",
        "productive"
      ],
      "weather": {
        "forcast": "sunny",
        "last_year": "sunny"
      },
      "activities": [
        {
          "swimming": {
            "Date": "2024-07-04",
            "Location": "swimming pool",
            "Person": "Jane"
          }
        }
      ]
    }
  },
  {
    "_index": "daily_index",
    "_id": "3",
    "_score": 1,
    "_source": {
      "diary": [
        "tired",
        "stressed"
      ],
      "weather": {
        "forcast": "cloudy",
        "last_year": "rainy"
      },
      "activities": [
        {
          "yoga": {
            "Date": "2024-07-03",
            "Location": "yoga studio",
            "Person": "Ella"
          }
        }
      ]
    }
  }
]

Option 1: port foreach processor to search pipeline,

for each processor required to take a field parameters that supports:

  1. object — maybe only support objects in the P0
  2. array
  3. array of objects
{
  "foreach": {
    "field": "weather",
    "processor":  {
      "ml_inference": {
        "model_id": "wupL7Y0Bm1mYgYg_PasK",
        "input_map": [
          {
            "input": "forcast"
          }
        ],

        "output_map": [
          {
            "forcast_embedding": "response"
          }
        ]
      }
    }
  }
}
{
  "foreach": {
    "field": "diary",
    "processor":  {
      "ml_inference": {
        "model_id": "wupL7Y0Bm1mYgYg_PasK",
        "input_map": [
          {
            "input": "_search._value"
          }
        ],

        "output_map": [
          {
            "diary_embedding": "response"
          }
        ]
      }
    }
  }
}

Pros:

Cons:

Option 2:

add flag in ML inference processor to let user define the mode (many to 1, 1:1).

curl -XPUT localhost:9200/_search/pipeline/test-response -d '{
  "description": "test ml model search response processor",
  "processors": [
    { "response_processors" : 
    [
      "ml_inference": {
        "model_id": "wupL7Y0Bm1mYgYg_PasK",
        "input_map": [
          {
            "input": "diary"
          }
        ],

        "output_map": [
          {
            "diary_embedding": "response"
          }
        ],
        "one_to_one_inference": true
      }
    }
  ] 
}' -H "Content-Type:Application/json" 

Pros:

Cons: