opensearch-project / ml-commons

ml-commons provides a set of common machine learning algorithms, e.g. k-means, or linear regression, to help developers build ML related features within OpenSearch.
Apache License 2.0
96 stars 134 forks source link

[BUG] Batch ingestion API bugs #2930

Closed ylwu-amzn closed 1 month ago

ylwu-amzn commented 1 month ago

Test with OS2.17 RC4

POST /_plugins/_ml/_batch_ingestion
{
  "index_name": "my-test1",
  "field_map": {
    "_id": "$.recordId",
    "embedding": "source[0].$.modelOutput.embedding"
  },
  "credential": {
    "region": "us-east-1",
    "access_key": "xxx",
    "secret_key": "xxx",
    "session_token": "xxx"
  },
  "data_source": {
    "type": "s3",
    "source": ["s3://ylwu-model-test-output/cszce2bsex07/my_batch2.jsonl.out"]
  }
}

sample data of my_batch2.jsonl.out

{"modelInput":{"inputText":"hello word 1"},"modelOutput":{"embedding":[-0.034975495,0.072906666],"inputTextTokenCount":5},"recordId":"CALL0000001"}

It returns task id xHk64pEBG9EkCQDLzc-I

But this task stays on CREATED forever. Checked log , error happens

Remove source[0]. from embedding field map can work

[2024-09-11T17:58:07,399][ERROR][o.o.m.e.i.S3DataIngestion] [client1] Missing property in path $['source']
[2024-09-11T17:58:07,400][ERROR][o.o.b.OpenSearchUncaughtExceptionHandler] [client1] uncaught exception in thread [opensearch[client1][opensearch_ml_train][T#2]]
org.opensearch.OpenSearchStatusException: Failed to batch ingest: Missing property in path $['source']
    at org.opensearch.ml.engine.ingest.S3DataIngestion.ingestSingleSource(S3DataIngestion.java:148) ~[?:?]
    at org.opensearch.ml.engine.ingest.S3DataIngestion.ingest(S3DataIngestion.java:66) ~[?:?]
    at org.opensearch.ml.action.batch.TransportBatchIngestionAction.lambda$doExecute$0(TransportBatchIngestionAction.java:96) ~[?:?]
    at org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:946) ~[opensearch-2.17.0.jar:2.17.0]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]

Suggestion:

  1. Support source[0] prefix even we have one source file
  2. Update task status as failed
  3. I remember you will add a new dedicated thread pool for batch ingestion. Why the log still shows train tread pool opensearch_ml_train ? Can you confirm if we have dedicated thread pool ?
ylwu-amzn commented 1 month ago

Another issue, ingest_fields can't work

  "field_map": {
    "_id": "$.recordId",
    "embedding": "$.modelOutput.embedding"
  },
  "ingest_fields": ["$.modelInput.inputText"],

This way can work

  "field_map": {
    "_id": "$.recordId",
    "embedding": "$.modelOutput.embedding",
    "input": "$.modelInput.inputText"
  },
Zhangxunmt commented 1 month ago

Yes it's still using the TRAIN thread pool. The initial code doesn't use this dedicated Train thread so the exceptions are caught in the main thread and ML Tasks are updated to "Failed". After I added this "TRAIN" thread, the exceptions handle in the Train thread so they are not caught in the main anymore. I forgot to move the catch exceptions from the Main to "TRAIN". After the load tests, I will create a new thread pool just for Ingestion.

ylwu-amzn commented 1 month ago

Bedrock batch inference job returns jobArn like this

{
  "jobArn": "arn:aws:bedrock:us-east-1:<account_id>:model-invocation-job/cszce2bsex07"
}

But the code currently only parse TransformJobArn and id. https://github.com/opensearch-project/ml-commons/blob/main/plugin/src/main/java/org/opensearch/ml/task/MLPredictTaskRunner.java#L367 , please enhance this part to make the parsing more general.

Suggest change this line https://github.com/opensearch-project/ml-commons/blob/main/plugin/src/main/java/org/opensearch/ml/task/MLPredictTaskRunner.java#L367C44-L367C52

 if (dataAsMap != null
        && (dataAsMap.containsKey("TransformJobArn") || dataAsMap.containsKey("id"))) {

to

  Integer statusCode = tensorOutput.getMlModelOutputs().get(0).getStatusCode();
  if (dataAsMap != null
      &&  statusCode != null && statusCode >= 200 && statusCode < 300) {

?