Closed chishui closed 4 months ago
We would like to hear more voice from the neural search audience since some options may trigger the contribution to OpenSearch core. If the first option is chosen, we will move the RPC link to opensearch repo and continue the discussion there.
Batch ingestion feature has completed. Parallel ingestion is out of the scope for now.
Problem Statements
When users utilize
bulk
API to ingest multiple documents in a single request, the OpenSearch ingest pipeline only handles one at a time in a sequential order (ref). The ingest pipeline is constituted by a collection of processors and processor is the computing unit of a pipeline. Most of the processors are pretty light weighted such as append, uppercase, lowercase, and to process multiple documents one after another or to process them in parallel would make no observable difference. But for time-consuming processors such as neural search processors, which by their nature require more time to compute, being able to run them in parallel could save user some valuable time. Apart from ingestion time, processors like neural search, can benefit from processing of batch documents together as it can reduce the requests to remote ML services via batch APIs to maximally avoid hitting rate limit restriction. (Feature request: https://github.com/opensearch-project/ml-commons/issues/1840, rate limit example from OpenAI: https://platform.openai.com/docs/guides/rate-limits)Due to the lack of parallel ingesting and batch ingestion capabilities in data ingestion flow, in this doc we propose two solutions to address them.
Solutions
Option 1: A Batchable Ingest Pipeline (Preferred)
Use Case Walk Through
User creates a batch pipeline with request example shown below:
Later, user uses
bulk
API to ingest multiple documents through the batch pipeline. Sincetext_embedding
processor has enabled batch capability, it can send documents from bulk API to ML node in one request.set
processor doesn't have customized batch operation, documents are processed one by one in the processor as default behavior.Proposal
An ingest pipeline is constructed by a list of processors and a single document could flow through each processor one by one before it can be stored into index. Currently, both pipeline and processor can only handle one document each time and even if with bulk API, documents are iterated and handled in sequential order. As shown in figure 1, to ingest doc1, it would firstly flow through ingest pipeline 1, then through pipeline 2. Then, the next document would go through both pipeline.
To support batch processing of documents, we'll create a batchable pipeline in which, instead of single document, multiple documents can flow through the a pipeline and its processors. The batch pipeline option should be configurable by user and they need to explicitly enable it. We will provide a default implementation in
Processor
interface to iteratively process document so that most of the processors don't need to make change and only if there's necessity for them to process multiple documents in parallel (e.g. text embedding processor), they can have their own implementation, otherwise, even receiving documents altogether, they default to process them one by one.To batch process documents, user need to use
bulk
API to pass multiple documents. And if the document count exceeds themaximum_batch_size
value, documents would be split to subsets so that each subset will satisfy the maximum batch size requirement. If the document count is less than themaximum_batch_size
, they will be batched and processed.Work need to be done:
Support new setting in pipeline API
Add a new pipeline field
settings.batch_enabled
andsettings.maximum_batch_size
in pipeline API. Support batch settings in neural search processors.New batch function in both
Pipeline
andProcessor
In
Pipeline
class, addbatchExecute
to consume a collection ofIngestDocument
. InProcessor
class, addbatchExecute
to consume a collection ofIngestDocument
, its default behavior is to iterate the documents and execute them one by one. For neural search processors, they need to implementbatchExecute
and combine text from multiple documents and send to ml-commons for inferencing.Ingest flow logic change
Current logic of the ingestion flow of documents can be shown from the pseudo code below:
We'll change the flow to logic shown below if the pipeline has enable the batch option.
Pros and Cons
Pros
bulk
API.Cons
bulk
APIOption 2: Caching Documents in Neural Search Processors
Use Case Walk Through
User creates a pipeline with
text_embedding
processor enabling document caching with parameter shown below:After pipeline is created, user can start to ingest documents using single ingest API or
bulk
API. Let's say user ingests three documents in separate requests and all these requests are handled by the same OpenSearch node, same pipeline, and same model, if they are sent within 500ms, after the third request is sent, user can receive responses for all three.Proposal
To limit the impact of the change and to only focus on inference processors which can benefit the most out of the batch solution, this proposal tries to solve the problem from within neural search processors.
Processor can process one document each time. To batch documents without changing interfaces, we need to cache them. We'll create a
BatchManager
which caches the documents in a cache queue and until these conditions are met:A cache queue will be created for a unique “pipeline_id + model_id” and documents are batched in neural search processors per model and per pipeline. Note that, although rare, but in case user configures multiple neural search processors in the same pipeline and point to the same model, if they set different batch sizes in them, they might see a mix of batch size in request to ML nodes.
Cache in Neural Search vs in ml-commons
In Neural Search
In this option, document caching logic is maintained in neural search package and triggered by neural search ingest processors.
Pros:
Cons:
In ml-commons
As ml-commons can be invoked not only by ingestion flow, but also by OpenSearch core for other flows, caching data here is not purely a batch ingestion solution but more of a ML client side caching solution. It can ensure data are always batched before sending to ML node to inference.
Pros:
Cons:
Caching Solution
Local Cache
The local cache solution, each OpenSearch node will have the cache the data in memory and only the documents reaching to the same host can be cached.
Distributed Cache
In this solution, we need dedicated cache platform hosted separately in a cache fleet. All OpenSearch nodes will reach out to this cache fleet to store and fetch data. This solution can ensure that even documents are routed to different OpenSearch nodes to process, they can be stored in the same cache entry and batched globally. It can also decouple the OpenSearch nodes with ML nodes. The two most popular cache solutions are: Memcached and Redis. And Redis is more favourable as it features advanced data structures and provides durability through its cluster.
However, distribute cache also brings significant complexity to our logic. Firstly, nodes would now compete to process cached documents and we might need to add distributed lock to ensure documents are only processed once (similar to SQS). Secondly, the results of the inference should also be stored to cache and nodes need to keep polling for result so that they can continue the ingest process.
Which Caching Solution to Use
Due to the complexity of distributed cache and the additional infrastructure set up it requires, we can start from local cache solution and reconsider distributed cache option later.
Work need to be done:
Support new settings in neural search processors
Users can set batch parameters for inference processors when they create pipeline:
The new settings need to be passed to processors to consume.
Create BatchManager
Create a new
BatchManager
class which can:Processors update
Instead of calling ml-commons to inference, call BatchManager to cache document. Provide batch document inference logic and dispatch logic.
Pros and Cons
Pros
Cons