opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.46k stars 1.74k forks source link

[RFC] Parallel & Batch Ingestion #12457

Closed chishui closed 3 months ago

chishui commented 6 months ago

Is your feature request related to a problem? Please describe

Problem Statements

Today, users can utilize bulk API to ingest multiple documents in a single request. All documents from this request are handled by one ingest node and on this node, if there's any ingest pipeline configured, documents are processed by pipeline one at a time in a sequential order (ref). The ingest pipeline is constituted by a collection of processors and processor is the computing unit of a pipeline. Most of the processors are pretty light weighted such as append, uppercase, lowercase, and to process multiple documents one after another or to process them in parallel would make no observable difference. But for time-consuming processors such as neural search processors, which by their nature, require more time to compute, being able to run them in parallel could save user some valuable ingest time. Apart from ingestion time, processors like neural search, can benefit from processing batch documents together as it can reduce the requests to remote ML services via batch APIs to maximally avoid hitting rate limit restriction. (Feature request: https://github.com/opensearch-project/ml-commons/issues/1840, rate limit example from OpenAI: https://platform.openai.com/docs/guides/rate-limits)

Due to the lack of parallel ingestion and batch ingestion capabilities in ingest flow, we propose below solution to address them.

Describe the solution you'd like

Proposed Features

1. Batch Ingestion

An ingest pipeline is constructed by a list of processors and a single document could flow through each processor one by one before it can be stored into index. Currently, both pipeline and processor can only handle one document each time and even if with bulk API, documents are iterated and handled in sequential order. As shown in figure 1, to ingest doc1, it would firstly flow through ingest pipeline 1, then through pipeline 2. Then, the next document would go through both pipeline.

ingest-Page-1

To support batch processing of documents, we'll add a batchExecute API in ingest pipeline and processors which take multiple documents as input parameters. We will provide a default implementation in Processor interface to iteratively call existingexecute API to process document one by one so that most of the processors don't need to make change and only if there's necessity for them to batch process documents (e.g. text embedding processor), they can have their own implementation, otherwise, even receiving documents altogether, they default to process them one by one.

To batch process documents, user need to use bulk API. We'll add two optional parameters for bulk API for user to enable batch feature and set batch size. Based on maximum_batch_size value, documents are split into batches.

Since in bulk API, different documents could be ingested to different indexes, indexes could use the same pipelines but in different order, e.g. index “movies” uses pipeline P1 as default pipeline, P2 as final pipeline; index “musics” uses P2 as default pipeline and P1 as final pipeline. To avoid over-complexity of handling cross indexes batching (topology sorting), we would batch documents in index level.

2. Parallel Ingestion

Apart from batch ingestion, we also propose to have parallel ingestion to accompany with batch ingestion to boost the ingestion performance. When user enables parallel ingestion, based on batch size, documents from bulk API will be split into batches, then, batches are processed in parallel with threads managed by thread pool. Although limiting the maximum concurrency of parallel ingestion, thread pool can help us protect host resources to not be exhausted by batch ingestion threads.

ingest-Page-2

Ingest flow logic change

Current logic of the ingestion flow of documents can be shown from the pseudo code below:

for (document in documents) {  
    for (pipeline in pipelines) {  
        for (processor in pipeline.processors) {  
            document = processor.execute(document)  
        }  
    }  
}

We'll change the flow to logic shown below if the pipeline has enable the batch option.

if (enabledBatch) {
    batches = calculateBatches(documents);
    for (batch in batches) {
        for (pipeline in pipelines) {  
            for (processor in pipeline.processors) {  
                documents = processor.batchExecute(documents)  
            }  
        }
    }
} else if (enabledParallelBatch) {
    batches = calculateBatches(documents);
    for (batch in batches) {
        threadpool.execute(()-> {
            for (pipeline in pipelines) {  
                for (processor in pipeline.processors) {  
                    documents = processor.batchExecute(documents)  
                }  
            }
        });
    }
} else {
    // fallback to exsiting ingestion logic
}

Update to Bulk API

We propose new parameters to bulk API, all of them are optional.

Parameter Type Description
batch_ingestion_option String Configure whether to enable batch ingestion. It has three options: none, enable and parallel. By default, it's none. When set it to enable, batch ingestion is enabled, and batches are processed in sequential order. When set it to parallel, batch ingestion is enabled and batches are processes in parallel.
maximum_batch_size Integer The batched document size. Only work when batch ingestion option is set to enable or parallel. It's 1 by default.

3. Split and Redistribute Bulk API

Users tend to use bulk API to ingest many documents which can be very time consuming sometimes. In order to achieve lower ingestion time, they have to use multiple clients to make multiple bulk requests with smaller document size so that the requests can be distributed to different ingest nodes. To offload the burden from user side, we can support the split and redistribute work from server side and help distribute the ingest load more evenly. Note: although brought up here, we think it's better to discuss this topic in a separate RFC doc which will be published later.

Related component

Indexing:Performance

Describe alternatives you've considered

No response

Additional context

No response

peternied commented 6 months ago

[Triage - attendees 1 2 3 4 5] @chishui Thanks for creating this RFC, it looks like this could be related to [1] [2]

chishui commented 6 months ago

@peternied Yes, it looks like the proposed feature 3 in this RFC has very similar idea with the streaming API especially the coordinator part to load balancing the ingest load. For feature 3, it just tries to reuse the bulk API.

Feature 1 and 2 are different from streaming API as they focus on parallel and batch ingestion on a single node which would happen post streaming API or feature 3.

msfroh commented 6 months ago

@dbwiddis, @joshpalis -- you may be interested in this, as you've been thinking about parallel execution for search pipelines. For ingest pipelines, the use-case is a little bit more "natural", because we already do parallel execution of _bulk request (at least across shards).

@chishui, can you confirm where exactly the parallel/batch execution would run? A bulk request is received on one node (that serves as coordinator for the request), then the underlying DocWriteRequests get fanned out to the shards. Does this logic run on the coordinator or on the individual shards? I can never remember where IngestService acts.

chishui commented 6 months ago

@msfroh, the "parallel/batch execution" would be run on the ingest pipeline side. The DocWriteRequests are first processed by ingest pipeline and its processors on a single ingest node, then the processed documents are fanned out to shards to be indexed. To answer your question, the logic would be run on the coordinator.

chishui commented 6 months ago

Additional information about parallel ingestion:

Performance:

Light-weighted processors - no improvement

We benchmarked the performance on some light weighted processors (lowercase + append) with current solution and parallelized batch solution, we don't see improvement on either latency or throughput which is aligned with our expectation that they are already very fast and parallelization wouldn't help and could bring some additional overhead.

ML processors - already in async

ML processors are the processors doing heavy lifting work, but they actually put the predict logic in a thread (code) which brings the ingestion of that document to async.

Reasons to have parallel ingestion

  1. A general solution: The parallel ingestion proposed here does the parallelization on document level, any time-consuming processors either existing today or introduced later can benefit from the parallelization directly without needing to make any changes.
  2. Maximum concurrency: Today, if processors makes their logic async, then only itself and the following processors will be run in a separate thread, all previous processors are still run in a same thread synchronously. Parallel ingestion can make the whole ingestion flow of a document in parallel to achieve maximum concurrency.
  3. Give user controls: It provides users flexibility to control concurrency level through batch size or user can even disable parallel ingestion through request parameter.
  4. Less dev efforts and resource usage if other processors want to achieve concurrency: Today, if some processor wants to achieve concurrency, they have to implement their own concurrency logic and they may also need to create their own thread-pool. It's not necessary as for a single document, processor has to be run one by one and causes wasting of resources and leads to overhead when thread switching.

Reasons not to have parallel ingestion

  1. There is no urgent need or immediate gain.
model-collapse commented 6 months ago

Scenario for batch processor in neural search document ingestion: Since OpenSearch 2.7, ml-commons released its remote connector, allowing opensearch to connect with remote inference endpoint. However, ml-commons can take a list of strings as input but only supports to invoke the inference API on each input text one by one. The pipeline is like follows: pipeline1 Intuitively, to enable the batch API of many 3rdparty LLM inference provider such as openAI and cohere, we can let ml-commons pass thru the list of strings as "a batch" to the API. Like this: pipeline2 However, this kind of approach cannot fully leverage the GPU computation power because of two reasons: 1) The batch size is sticked with how many fields are being picked by the processor, but in fact each API have their suggested batch size such as 32 or 128. 2) In deep learning for NLP, text in a batch should have similar lengths in order to obtain highest GPU efficiency, but intuitively we will regard the text from different fields will have diverse length. The best option is to implement a "batched" processor and recompose the "batches" by collecting texts from the same field. See following: folding

Alternative Approach There is one approach called "Dynamic Batching" which holds flushable queues in ml-commons. Each queues will gather the text input from the requests to ml-commons with the similar lengths. When timeout or the queue is full, the queue is flushed and the batch API of inference service is invoked. The con of this approach is that ml-commons will have quite big memory consumption to hold the queues, and timeout queue's implementation is more risky (dead locks, blocking calls) than batched processors.

Why we need the batch API? The computation model of GPU is using block-wise SIMD (single instruction with multiple data). In AI, inferencing model by stacking input tensors together (as a batch) will effectively increase the GPU utilization. This approach is a more economic choice than using single request API.

gaobinlong commented 6 months ago

@reta , could you also help to take a look at this RFC, thanks!

reta commented 6 months ago

Thanks @gaobinlong

@reta , could you also help to take a look at this RFC, thanks!

@msfroh @model-collapse @chishui I think the idea of enhancing ingest processors with batch processing is sound in general but it may have unintended consequences, due to complexity of bulk APIs in particular:

Making the ingestion API streaming based (apologies again for bribing for https://github.com/opensearch-project/OpenSearch/issues/3000) is fundamentally a different approach to ingestion - we would be able to vary the ingestion based on how fast the documents could be ingested at this moment of time, without introducing the complexity of batch / parallelism management.

@nknize I think you mind be eager to chime in here :)

model-collapse commented 6 months ago

Thanks @gaobinlong

@reta , could you also help to take a look at this RFC, thanks!

@msfroh @model-collapse @chishui I think the idea of enhancing ingest processors with batch processing is sound in general but it may have unintended consequences, due to complexity of bulk APIs in particular:

  • for example, bulk support scripts and upserts, and combination of those ... changing the ingestion sequence could lead to very surprising results (by and large, bulk API has to provide some guarantees on document processing)
  • also, picking up the parallelism and batching becomes a nightmare (in my opinion), just today picking the right batch for bulk is very difficult, but adding yet more parallelization / internal batching would make it much harder

Making the ingestion API streaming based (apologies again for bribing for #3000) is fundamentally a different approach to ingestion - we would be able to vary the ingestion based on how fast the documents could be ingested at this moment of time, without introducing the complexity of batch / parallelism management.

@nknize I think you mind be eager to chime in here :)

Thanks for the comment. For machine learning inference, making use of batched inference API will significantly increase the GPU utilization and reduce the ingestion time. Thus batch is very important thing. You pointed out that "picking the right batch for bulk is very difficult, but adding yet more parallelization / internal batching would make it much harder". Can you elaborate more on that and give your suggestions on how to make ingestion faster?

chishui commented 6 months ago

@reta thanks for the feedbacks

bulk support scripts and upserts, and combination of those ... changing the ingestion sequence could lead to very surprising results

The proposal only targets the ingest pipeline & its processor part, it won't touch the indexing part. Even documents are processed in a batch manner, these things are still ensured:

  1. for a single document, it'll be processed by processors sequentially in the same order as the processor order defined in pipeline.
  2. Only when all documents in a bulk request have been processed by ingest pipeline, they are dispatched to be indexed on shards which is the same with current logic.

Either the action is index or update, upsert or script, they would be processed by ingest pipeline in the same way. I don't see the proposal will cause "changing the ingestion sequence", please let me know if I miss a piece of the puzzle.

chishui commented 6 months ago

Due to the aforementioned reasons about "parallel ingestion", we won't have immediate gain from delivering the feature, we have decided to deprioritize the “parallel ingestion” part of this RFC and mainly focus on the "batch ingestion".

reta commented 6 months ago

I don't see the proposal will cause "changing the ingestion sequence", please let me know if I miss a piece of the puzzle.

@chishui The parallelization (which is mentioned in this proposal) naturally changes the order which documents are being ingested, does it make sense? I think your last comment is the reflection of that, thank you.

Can you elaborate more on that and give your suggestions on how to make ingestion faster?

@model-collapse the problem with batching (at least how it is implemented currently in OS and what we've seen so far with bulk API) is that choosing the right batch size is difficult, taking into account that there are circuit breakers in place that try to estimate the heap usage etc. (as of the moment of ingestion) and may reject the request sporadically.

chishui commented 6 months ago

@reta in ingest flow when documents are processed by ingest pipeline, could one document depend on another? Even for today, text_embedding and sparse_encoding processors have their inference logic run in a thread which makes the document ingestion run in parallel, right? https://github.com/opensearch-project/ml-commons/blob/020207ecd6322fed424d5d54c897be74623db103/plugin/src/main/java/org/opensearch/ml/task/MLPredictTaskRunner.java#L194

reta commented 6 months ago

@reta in ingest flow when documents are processed by ingest pipeline, could one document depend on another?

@chishui yes, in general documents could depend on each other (just think about an example of the documents that are ingested out of any CDC or message broker, where the documents are being constructed as a sequence of changes).

Even for today, text_embedding and sparse_encoding processors have their inference logic run in a thread which makes the document ingestion run in parallel, right? https://github.com/opensearch-project/ml-commons/blob/020207ecd6322fed424d5d54c897be74623db103/plugin/src/main/java/org/opensearch/ml/task/MLPredictTaskRunner.java#L194

This is purely plugin specific logic

gaobinlong commented 6 months ago

@chishui yes, in general documents could depend on each other (just think about an example of the documents that are ingested out of any CDC or message broker, where the documents are being constructed as a sequence of changes).

In my understanding, in terms of the execution of pipeline, each document in a bulk runs independently, no ingest processor can access other in-flight documents in the same bulk request, so in the process of executing pipelines, maybe a document cannot depend on another? And subsequently, for the processing of indexing(call lucene api to write), we have the write thread_pool, each document is processed in parallel, so the indexing order in a bulk cannot be guaranteed, the client side needs to ensure the indexing order. @reta, correct me if something is wrong, thank you!

gaobinlong commented 6 months ago

I think executing pipelines run before the indexing process, firstly, we use a single transport thread to execute pipelines for all the documents in a bulk request, and then use the write thread_pool to process the new generated documents in parallel, so it seems that when executing pipelines for the documents, the execution order doesn't matter.

reta commented 6 months ago

Thanks @gaobinlong

In my understanding, in terms of the execution of pipeline, each document in a bulk runs independently, no ingest processor can access other in-flight documents in the same bulk request, so in the process of executing pipelines, maybe a document cannot depend on another?

The documents could logically depend on each other (I am not referring to any sharing that may happen in ingest processor). Since we are talking about bulk ingestion, where document could be indexed / updated / deleted, we certainly don't want to the deletes to be "visible" before documents are indexed.

I think executing pipelines run before the indexing process, firstly, we use a single transport thread to execute pipelines for all the documents in a bulk request, and then use the write thread_pool to process the new generated documents in parallel, so it seems that when executing pipelines for the documents, the execution order doesn't matter.

This part is not clear to me: AFAIK we offload processing of bulk requests (batches) to thread pool, not individual documents. Could you please point out where we parallelize the ingestion of the individual documents in the batch? Thank you

gaobinlong commented 6 months ago

The documents could logically depend on each other (I am not referring to any sharing that may happen in ingest processor). Since we are talking about bulk ingestion, where document could be indexed / updated / deleted, we certainly don't want to the deletes to be "visible" before documents are indexed.

Yeah, you're correct, but for this RFC, it only focuses on the execution of ingest pipeline which only performs on the coordinate node, just the pre-processing part, not the indexing part, the indexing operations will not happen before the execution of ingest pipeline completes for all the documents in a bulk request.

This part is not clear to me: AFAIK we offload processing of bulk requests (batches) to thread pool, not individual documents. Could you please point out where we parallelize the ingestion of the individual documents in the batch? Thank you

After the execution of ingest pipeline for all documents in a bulk, the coordinate code groups these documents by shard and send them to different shards, each shard processes its documents in parallel, so at least in shard level, we process the documents in a bulk request in parallel. But I think this RFC will not touch the processing logic in each shard which processes the create/update/delete operations for the same document in order, so it's not harmful.

model-collapse commented 6 months ago

@reta What is your estimation where the circuit breaking will happen? If you mean it will happen in side the batch processor's own process, that could be, because it is impossible to estimate how much memory will be consumed by its code. Therefore, we need to let the users to configure the batch_size in the bulk_api.

reta commented 6 months ago

@reta What is your estimation where the circuit breaking will happen?

@model-collapse there are no estimates the one could make upfront, this is purely operational issue (basically depends on what is going on at the moment)

Therefore, we need to let the users to configure the batch_size in the bulk_api.

Due to previous comment, users have difficulties with that: same batch_size may work now and may not 10m from now (if cluster is under duress). The issue referred there has all the details.

chishui commented 5 months ago

Benchmark Results on Batch ingestion with Neural Search Processors

We implemented the PoC of batch ingestion locally and enabled the capability of sending batch documents to remote ML servers. We used "opensearch-benchmark" to benchmark both batch enabled and disabled situation on different ML servers (SageMaker, Cohere, OpenAI) and here are the benchmark results

Benchmark Results

Environment Setup

Metrics no batch batch (batch size=10)
Min Throughput (docs/s) 65.51 260.1
Mean Throughput (docs/s) 93.96 406.12
Median Throughput (docs/s) 93.86 408.92
Max Throughput (docs/s) 99.76 443.08
Latency P50 (ms) 1102.16 249.544
Latency P90 (ms) 1207.51 279.467
Latency P99 (ms) 1297.8 318.965
Total Benchmark Time (s) 3095 770
Error Rate (%) 17.10%1 0

Cohere

Environment Setup

Metrics no batch batch (batch size=10)
Min Throughput (docs/s) 72.06 74.87
Mean Throughput (docs/s) 80.71 103.7
Median Throughput (docs/s) 80.5 103.25
Max Throughput (docs/s) 83.08 107.19
Latency P50 (ms) 1193.86 963.476
Latency P90 (ms) 1318.48 1193.37
Latency P99 (ms) 1926.17 1485.22
Total Benchmark Time (s) 3756 2975
Error Rate (%) 0.47 0.03

OpenAI

Environment Setup

Metrics no batch batch (batch size=10)
Min Throughput (docs/s) 49.25 48.62
Mean Throughput (docs/s) 56.71 92.2
Median Throughput (docs/s) 57.53 92.84
Max Throughput (docs/s) 60.22 95.32
Latency P50 (ms) 1491.42 945.633
Latency P90 (ms) 2114.53 1388.97
Latency P99 (ms) 4269.29 2845.97
Total Benchmark Time (s) 5150 3275
Error Rate (%) 0.17 0

Results

  1. Batch ingestion has significant higher throughput and low latency.
  2. Batch ingestion has much lower error rate comparing to non-batch result..

[1]: The errors are coming from SageMaker 4xx response which was also reported in ml-commons issue https://github.com/opensearch-project/ml-commons/issues/2249

gaobinlong commented 5 months ago

@andrross @sohami could you experts also help to take a look at this RFC, any comments will be appreciated, thank you!

navneet1v commented 5 months ago

@chishui in your benchmarks you just allowed the ingest pipeline to run parallel on multiple documents no other piece code was changed, is this a correct understanding?

OpenSearch JVM: Xms:48g, Xmx: 48g

Why we are using 48GB of heap? Should we do this with 32GB of heap?

Can you also provide some details around how many shards were used?

Another thing which is pretty interesting to note here is:

Light-weighted processors - no improvement We benchmarked the performance on some light weighted processors (lowercase + append) with current solution and parallelized batch solution, we don't see improvement on either latency or throughput which is aligned with our expectation that they are already very fast and parallelization wouldn't help and could bring some additional overhead.

On the lightweight benchmarks we didn't see any improvements. Which creates more doubts in my mind whether the call made by the Neural Search processor and its downstream ML commons are really async in nature or there is some bottleneck there in the call made to those Models. Because if parallelizing the document processing helps it should help all the processors.

chishui commented 5 months ago

@chishui in your benchmarks you just allowed the ingest pipeline to run parallel on multiple documents no other piece code was changed, is this a correct understanding?

@navneet1v the benchmark was on batch ingestion, as mentioned above, we decided to deprioritize parallel ingestion for now. For batch ingestion, if bulk API has 100 documents and batch size is 10, then 100 documents are grouped into 10 batches, and each batch is sent to ingest pipeline and its processors to process. For neural search processors benchmarked here, they receive 10 documents at a time so they can be combined and sent to remote ML servers for inference.

Why we are using 48GB of heap? Should we do this with 32GB of heap?

The EC2 host I used is a r6a.4xlarge instance which has 128G memory, I wanted to set it slightly higher to make heap memory not a bottleneck for the benchmark (not saying 32GB is not enough) as the purpose is to compare batch performance with non-batched one.

Can you also provide some details around how many shards were used?

Thanks for raising this question, I forgot to mention in the description, it's only one shard.

On the lightweight benchmarks we didn't see any improvements. Which creates more doubts in my mind whether the call made by the Neural Search processor and its downstream ML commons are really async in nature or there is some bottleneck there in the call made to those Models. Because if parallelizing the document processing helps it should help all the processors.

Even parallelization can help the performance of lightweight processors, let's say, we improve the latency from 2ms to 1ms, however, indexing part is the bottleneck of the whole process and if it takes 100ms, we can barely observe the 1ms improvement from benchmark. But for ingest process with ML processors, ML inferencing is the latency bottleneck and the latency reduction of it can significantly improve overall latency.

navneet1v commented 5 months ago

For neural search processors benchmarked here, they receive 10 documents at a time so they can be combined and sent to remote ML servers for inference.

So to be clear in 1 ML predict API request we were sending more than 1 string for converting text to embeddings right?

Thanks for raising this question, I forgot to mention in the description, it's only one shard.

This is great. Lets add this.

But for ingest process with ML processors, ML inferencing is the latency bottleneck and the latency reduction of it can significantly improve overall latency.

So this is where I don't agree completely and I think I would think we should 1 more benchmark. Here is what I think is happening:

  1. In your benchmark when your processor is receiving 10 documents at a time and sending all 10 of them at once to ML commons, you are using 1 thread of predict API thread pool instead of 10, if processors were working serially. This keeps me bugging that are we using the Predict API thread pool properly in ML commons and not letting threads free once they have send out request to external model. Or the bottleneck is on the model hosting side that it works better if more strings are provided at once which I believe should not be for models like OpenAI etc.
  2. Another thing that bugs me here is right now we would be converting 1 text field to embeddings, as the text embedding processor is generic and it can convert more than 1 field to embeddings batching text for predict api we can end up sending too many batches to predict api which can overall reduce the performance.
  3. All the benchmarks are with single Indexing client, which is not a general use-case, so we should test with more than 1 client too, may be a list of clients like [1, 2, 5, 10 ..] to really understand if batching is helping.

Suggestions:

  1. What would be helpful for the benchmarks if you can also add few more metrics like CPU utilization of data and ML nodes. Number of threads(_predict api threads) in use for ML Commons. Basically trying to see if the threads are waiting for HTTP response from remote model.
  2. Another thing I would do completely separate of these benchmarks is benchmark the predict API with a remote model alone and see where is the bottleneck happening by sending only 1 text per request to predict api. Because if you are using the remote model then the ML node should just keep on taking more and more requests as it should be just acting as a pass through layer and nothing else.
chishui commented 5 months ago

So to be clear in 1 ML predict API request we were sending more than 1 string for converting text to embeddings right?

Yes

This is great. Lets add this.

Already updated the description.

Or the bottleneck is on the model hosting side that it works better if more strings are provided at once which I believe should not be for models like OpenAI.

We also tested against remote ML server directly and more batches do lead to more throughput and less per text latency.

Another thing that bugs me here is right now we would be converting 1 text field to embeddings, as the text embedding processor is generic and it can convert more than 1 field to embeddings batching text for predict api we can end up sending too many batches to predict api which can overall reduce the performance.

If your concern is that we may have too many threads and occupying CPU resources, currently, each predict API is run in a thread of a fixed size threadpool, and for the majority of the time, these threads are only doing IO work waiting for the response that they don't occupying CPU resources.

If your concern is that too many batches to remote server which can degrade the GPU performance of the remote ML server, that's possible. And to achieve the optimal performance, we are considering to provide a tool to help user to identify the optimal batch size automatically.

All the benchmarks are with single Indexing client, which is not a general use-case, so we should test with more than 1 client too, may be a list of clients like [1, 2, 5, 10 ..] to really understand if batching is helping.

I think even with one client, we already have the benchmark results which can demonstrate that the batch ingestion can help improve the ingestion performance with ML processors. And the improvement is mainly from the remote ML server that we aggregate multiple texts from document batch and send them to remote ML server in a single request. Benchmark with more clients won't change that conclusion, right?

What would be helpful for the benchmarks if you can also add few more metrics like CPU utilization ...

Thanks for the suggestions, the CPU utilization was below 10% during all benchmarks (it's quite powerful), while threading are waiting for IO, they'll be put to waiting status without consuming CPU resources.

Another thing I would do completely separate of these benchmarks is benchmark the predict API ...

As mentioned above, we did test remote ML server API directly with different batch size and with batch size increase, we saw higher throughput and lower per text latency.

navneet1v commented 5 months ago

We also tested against remote ML server directly and more batches do lead to more throughput and less per text latency.

can we test by sending 1 string per request but increase the concurrency to see how much throughput changes? and can you paste the results here for the benchmarks you did directly in the ML server.

If your concern is that we may have too many threads and occupying CPU resources, currently, each predict API is run in a thread of a fixed size threadpool, and for the majority of the time, these threads are only doing IO work waiting for the response that they don't occupying CPU resources.

Predict API runs in fixed threadpool I understand. But does it need to wait for response from ML server? that thread can be freed up and can take more request. In this fashion more request can be sent to the remote model with same machine.

And the improvement is mainly from the remote ML server that we aggregate multiple texts from document batch and send them to remote ML server in a single request. Benchmark with more clients won't change that conclusion, right?

See if the improvement is from ML server we should do more deep-dive why ML server works better if the strings are sent in bulk.

Thanks for the suggestions, the CPU utilization was below 10% during all benchmarks (it's quite powerful), while threading are waiting for IO, they'll be put to waiting status without consuming CPU resources.

This is was my worry. As the ML node is sitting idle. It means it can take more requests. But we are not providing it more as the threads are in wait state. Can we make the call to remote ML server async so that threads on ML node can take more requests and sent it to remote model.

As mentioned above, we did test remote ML server API directly with different batch size and with batch size increase, we saw higher throughput and lower per text latency.

Please add results of those tests. :)

chishui commented 5 months ago

But does it need to wait for response from ML server? that thread can be freed up and can take more request. In this fashion more request can be sent to the remote model with same machine.

Can we make the call to remote ML server async so that threads on ML node can take more requests and sent it to remote model.

We do have a pull request in ml-commons https://github.com/opensearch-project/ml-commons/pull/1958 which changes the http client from sync to async and it's supposed to be released in 2.14.

Please add results of those tests. :)

Here are the test results that we made batched text requests directly to ML server. From results, we could see that with batch increase, we can achieve higher throughput and lower latency.

Cohere embedding:

Batch Time/s Throughput (docs/s) Average time on single doc
1 0.96 1.04167 0.96
2 1.128 1.77305 0.564
4 1.321 3.02801 0.33025
8 1.717 4.65929 0.21463
16 1.937 8.2602 0.12106
32 2.293 13.95552 0.07166
64 2.845 22.49561 0.04445
96 3.87 24.8062 0.04031

OpenAI embedding

Batch Time/s Throuput (docs/s) Time per single doc (s)
1 1.139 0.87796 1.139
2 1.143 1.74978 0.5715
4 1.568 2.55102 0.392
8 1.568 5.10204 0.196
16 1.883 8.49708 0.11769
32 2.105 15.2019 0.06578
64 2.103 30.43272 0.03286
128 2.574 49.72805 0.02011
256 3.368 76.0095 0.01316
navneet1v commented 5 months ago

We do have a pull request in ml-commons https://github.com/opensearch-project/ml-commons/pull/1958 which changes the http client from sync to async and it's supposed to be released in 2.14.

Have we benchmarked the performance of this change? how much is the throughput increasing after this change?

chishui commented 5 months ago

the problem with batching (at least how it is implemented currently in OS and what we've seen so far with bulk API) is that choosing the right batch size is difficult

@reta to address your concern we plan to provide an automation tool to help user run a series of benchmarks against their OS with different batch size and recommend the optimal batch size. Here is the feature link: https://github.com/opensearch-project/OpenSearch/issues/13009

Could you please take a look and see if your concerns are addressed, we really want to push this forward to benefit users.

chishui commented 5 months ago

Have we benchmarked the performance of this change? how much is the throughput increasing after this change?

The async http client benchmark results are attached here https://github.com/opensearch-project/ml-commons/issues/1839

chishui commented 5 months ago

@reta since we only pursue batch ingestion in this RFC, and to address your concern that user will have difficulty tuning batch size, we also proposed to have a automation tool to make it easier for user https://github.com/opensearch-project/opensearch-benchmark/issues/508. Is there any other things that you believe we should address before moving forward?

reta commented 5 months ago

@chishui I honestly don't know at what extent tool could help, you may need to provide the guide for the users to explain how it is supposed to be used. At least it may give some confidence probably.

AFAIK OpenSearch benchmarks does targeted measurements for specific operations (this is what it was designed for), but does not measure the different interleaving operational workloads (and shouldn't I think): fe running search while ingesting new documents, etc ...

chishui commented 5 months ago

@reta IMO, to have this batch ingestion feature, is from 0 to 1, that user can start to use it to accelerate their ingestion process and have fewer chances to get throttled by remote ML server (benefits are shown from the benchmark results above). Maybe it's not easy for them to find the optimal batch size initially, but they have an option and can benefit immediately once they use batch feature. Then, to have a tool to help them find an optimal batch size automatically, is from 1 to 10, that we make this feature easy to use for everyone.

you may need to provide the guide for the users to explain how it is supposed to be used

Yes, we definitely need a document on OpenSearch website when we introduce this feature explaining how the feature should be used, how it can benefit, how the tool can help.

but does not measure the different interleaving operational workloads (and shouldn't I think): fe running search while ingesting

That's what I understand as well.

Zhangxunmt commented 5 months ago

All documents from this request are handled by one ingest node - Is this a correct statement? For multi nodes cluster, the documents in the _bulk will be distributed to each node for ingestion?

chishui commented 5 months ago

@Zhangxunmt thanks for the comment. The RFC is only about preprocessing, all documents are handled by a single ingest node which remains the same as current behavior. After preprocessing is the indexing process, as you said, documents are distributed to different node which also remains the same as we don't touch this part of logic.

Zhangxunmt commented 4 months ago

What is the preprocessing? Does it mean processors in the pipeline? Recently we noticed that in neural search, the text-embedding processor in the ingest pipeline sends remote inference traffics that are proportional to the number of data nodes in the cluster. That means, the _bulk requests takes N documents, and N documents are evenly distributed among all nodes for the text-embedding processor to run remote inference for vectorization. So this means all the docs are divided into smaller batches and preprocessed in every nodes? @chishui

chishui commented 4 months ago

What is the preprocessing? Does it mean processors in the pipeline?

Yes

That means, the _bulk requests takes N documents, and N documents are evenly distributed among all nodes for the text-embedding processor to run remote inference for vectorization.

The text-embedding processor is actually run on the node which accepts "_bulk" API. When it needs to send out text for inferencing, it could route the requests to "ml" nodes depending on the plugins.ml_commons.only_run_on_ml_node setting, right?

So this means all the docs are divided into smaller batches and preprocessed in every nodes?

Basically, since we won't change inferencing API, if texts are already dispatched to every node for inferencing, with this batch enabled, batched texts are dispatched to every nodes.

Zhangxunmt commented 4 months ago

Got it. I think it makes sense. In most cases the ml_node setting is false. AOS doesn't have ml nodes so far so the inferencing would happen in data nodes.

Based on the prior discussion here, the pre-processing runs on the node which accepts the _bulk, but it will call "Predict" API in Ml-Commons that routes the inference traffics to all data nodes. So essentially it's still the whole cluster handling the batch of documents. So in the cases of a single text-embedding processor in a ingest pipeline, does the proposed parallel ingestion still help the performance since the processor itself is already handling docs in parallel mode?

Zhangxunmt commented 4 months ago

@chishui One takeaway from this issue is that we'd better to use a bigger cluster (>10 nodes) for the performance benchmarking because nodes number is direct proportional to the concurrency TPS we send to the model service. Smaller OS cluster easily reaches its hard limit in the concurrency requests and may not represent the real customer scenarios.

chishui commented 4 months ago

does the proposed parallel ingestion still help the performance since the processor itself is already handling docs in parallel mode?

@Zhangxunmt I explained the benefits of having parallel ingestion in this comment https://github.com/opensearch-project/OpenSearch/issues/12457#issuecomment-1978992911. In the scenario you described, it won't help the performance.

One takeaway from this https://github.com/opensearch-project/ml-commons/issues/2249 is that we'd better to use a bigger cluster

I think it's actually the opposite. Even with only one data node in the cluster, inferencing is done in thread pool and the thread pool size controls the maximum concurrent TPS. And based on our benchmark result, without batch, each document is inferenced in a single thread and can easily run into 4xx from sagemaker. But with batch, each batch is in a single thread and can less likely run into 4xx from sagemaker. So with bigger cluster, you would have higher concurrency and would get 4xx more likely and batch can definitely help with this.

dblock commented 4 months ago

I am late to this RFC, but wanted to highlight https://github.com/opensearch-project/OpenSearch/pull/13306#discussion_r1583352382 for those who commented here - if you can please take a quick look? I think the API proposed should have been discussed a little more, starting with the inconsistent use of maximum_ vs. max_, but more imnportantly whether we need batch_ingestion_option at all.

andrross commented 3 months ago

I know I'm super late here as this has already been implemented and released in 2.14, but I'm questioning the new batch_size parameter in the bulk request. Why did we push this all the way up for the end user to supply in every single request? Is this something that is expected to vary from one request to the next, and do we think the end user is in the best position to configure this value?

I don't know all the details, but to me it would seem better for the optimal batch size to be chosen by each ingest processor implementation. It could be a setting that a system admin provides to each processor, or each processor could make an automatic decision based on a variety of factors. Regardless, it seems to me that both the system itself or a system administrator is in a better position to choose this batch size than the end user.

Also, the "batch_size" name is quite confusing. The user is already responsible for choosing a batch size, i.e. the number of documents they provide in each bulk request. Now there is a new parameter named "batch_size" that only applies to ingest pipelines, and only actually results in a different behavior if the ingest pipeline they are using happens to support batching.

chishui commented 3 months ago

Why did we push this all the way up for the end user to supply in every single request? Is this something that is expected to vary from one request to the next, and do we think the end user is in the best position to configure this value?

To most users they don't need to use this parameter at all. Only users who use remote ML servers to generate embeddings and who are sensitive to ingestion latency and want to optimize the ingestion performance will use it. They may want to tune it to achieve a better performance.

to me it would seem better for the optimal batch size to be chosen by each ingest processor implementation

Agreed, that's how we implement in neural search processors. batch_size controls how many documents are fed into ingest processors, processors determine the actual batch size for their logic.

I think as a request parameter, it provides per-request flexibility to give user fine-grain control. I'm not against a system setting as a default value without request parameter. I think they can live together like pipeline setting which is both a bulk request parameter and also a default setting of the index. If user wants to make it a system setting, we can support then.

andrross commented 3 months ago

batch_size controls how many documents are fed into ingest processors, processors determine the actual batch size for their logic.

So there are potentially three levels of batching? 1) the user determines how many documents to put into the _bulk request, 2) the user decides how those batches are sub-batched and fed into the ingest processors, and 3) the ingest processors decide how to sub-batch the sub-batches that were fed to them

On the surface it seems this second level of batching might be overly complex and not necessary.

I think as a request parameter, it provides per-request flexibility to give user fine-grain control.

Do you have a concrete use case where this level of flexibility is needed?

chishui commented 3 months ago

My previous statement might be inaccurate. The ingest-processors level batch is not always required. Only if batch size matters to their logic they can have it. For example, for text_embedding and sparse_encoding processors, they may connect to a remote model hosted in other services e.g. OpenAI. These services might have a maximum batch size limitation, to not exceeding the limit, text_embedding and sparse_encoding processors could set a maximum batch size (through connector setting) and if it receives docs more than that it can cut them into sub batches.

So from user's perspective, they don't need to consider all three. They could:

  1. reuse the bulk size they are happy with
  2. set maximum batch size in remote model connector to ensure not exceeding the remote server's batch limitation. (one-time thing. And if bulk size is known to be always within the limit, this step can be skipped)
  3. tune batch_size parameter to get good ingest performance, once it's determined, they can reuse.

Do you have a concrete use case where this level of flexibility is needed?

  1. User may want to run benchmark with different batch_size to get the one which leads to the optimal ingestion performance.
  2. Different data may need to be ingested through different model, and these models could have different optimal performance with different batch_size.
andrross commented 3 months ago
  1. User may want to run benchmark with different batch_size to get the one which leads to the optimal ingestion performance.

If the goal is to find a single optimal value, then a server-side setting is better because you can set it once and you don't have to modify all your ingestion tools to specify a new parameter. You can still benchmark with different values all you want with a server side setting.

  1. Different data may need to be ingested through different model, and these models could have different optimal performance with different batch_size.

If the optimal size depends on the model, then it seems like it should be configured on the server side when you provide whatever configuration is needed to wire up that model? Again, this avoids the need to modify all your ingestion tooling to provide the optimal parameter.

chishui commented 3 months ago

A request parameter or a system setting, they don't conflict. If users want to have a system setting, I don't see a reason why we shouldn't.

andrross commented 3 months ago

A request parameter or a system setting, they don't conflict.

What I don't understand is the need to do the sub-batching that happens prior to documents being fed into the ingest processors (this is what the request parameter controls). Why is this needed or even desirable? It adds complexity and makes for a bad user experience. Why not just pass the whole batch that came in the _bulk request to each ingest processor, and then let each ingest processor make the decision on how to use those batches? If an ingest processor makes a call to OpenAI and needs to break the batch into sub-batches it can do so (and it must do so even with the current design because there is nothing preventing a user from setting a batch_size value larger than the OpenAI limit).

To be clear, I'm advocating for deprecating and removing the batch_size parameter, simplifying the generic ingest service logic to just pass the entire batch that came in the original bulk request, and then implementing the batching inside the ingest processors (this can be a configuration setting or dynamic logic or anything else as appropriate for each ingest processor).

chishui commented 3 months ago

batch_size also acts as a switch that user has to explicitly set it to turn on the feature. User can be even unaware of this parameter, then they get the experience they are used to. We don't want to make it a default behavior once user upgrades to new version, they see something different but don't know why.

We could have a cluster setting or a pipeline setting or a processor setting but that also means the more fine grained control we provide to users, more settings they need to make. And if user wants to modify the settings, we don't even have a pipeline or processor update API.

It adds complexity and makes for a bad user experience

I don't think it's a bad user experience. Different choices have their pros and cons. Most users won't even need to be aware of this parameter, it's optional. For ingest latency sensitive users who utilize remote ML models for inferencing, they may want to seek out ways to improve their latency. And they would also want to experiment with different values and maybe different values for different types of documents. These are my assumption, but I know the parameter could give their flexibility to experiment. A system setting may also work, but whenever they want to try with a different batch_size, they'd need to make an additional request to update setting, that't even more work comparing to adding a parameter.

To be clear, I'm advocating for deprecating and removing the batch_size parameter

Still, users have all kinds of requirements, the parameter and the setting don't conflict, one can be a good supplement to the other, it's not one way door.