openvinotoolkit / openvino

OpenVINO™ is an open-source toolkit for optimizing and deploying AI inference
https://docs.openvino.ai
Apache License 2.0
7.23k stars 2.26k forks source link

Question: How to implement efficient mixture of experts? #17663

Closed catid closed 1 year ago

catid commented 1 year ago

I'm interested in designing an image upsampler based on a mixture of experts model, where an initial network extracts features and decides between 8 different models to use for each block of 8x8 pixels.

It would be very inefficient to run all 8 models across the whole image and then select on the CPU or GPU.

What would be a good way to implement this with OpenVINO?

wangleis commented 1 year ago

@catid Could you please share more detail of the pipeline in your use case? For example, the following scenarios are different from optimization perspective:

  1. 8 models run inference parallelly. Each model process part of blocks.
  2. 8 models process whole image successively. Each model process part of blocks
catid commented 1 year ago

Looking at the first case where each model processes a different subset of the blocks.

catid commented 1 year ago

Maybe the async API for OpenVINO can dispatch the work for each block separately somehow?

jiwaszki commented 1 year ago

Hi @catid , I would suggest to take a look at AsyncInferQueue if you are using OpenVINO's Python API.

Simple draft to fill-in, a sketch of how I imagine using it:

import openvino.runtime as ov
import numpy as np

core = ov.Core()
# Load your specialized model.
model = core.load_model(...)
# Compile it, please note that you may add configuration here. I advise to experiment with it so models would run in more optimized way.
compiled = core.compile_model(model, ...)

# In this scenario `jobs` are equal to number of models you would like to run, so 8.
infer_queue = ov.AsyncInferQueue(compiled, jobs=8)

# Prepare a space in memory to hold results.
# Adjust it to your needs. Let's assume that a single chunk will be upscaled from 8x8 to 32x32.
# Order of chunks can be following: from top-left to bottom-right.
finished_chunks = [np.empty((32, 32), dtype=...) for _ in range(len(infer_queue))]  # in this scenario is logical to rely on `AsyncInferQueue` number of jobs

# Split image into separate chunks - part of work on the host side, does not really matter when it will happen.
# Generally output should match your model requirements and adjust to the callback's "ordering".
img_chunks = do_what_is_required_to_image(..., num_of_chunks=len(infer_queue))

# Create a callback that will save results in memory based on ordering.
def chunk_callback(request, userdata):
    # Copy the data back to host memory.
    finished_chunks[userdata] = request.get_output_tensor().data[:]

# Set this callback on all jobs.
infer_queue.set_callback(chunk_callback)

# Run all jobs.
for i in range(len(img_chunks)):
    # Assume input is just a chunk, and looping follows the order.
    infer_queue.start_async({0: img_chunks[i]}, userdata=i)
# Wait for all requests to finish.
infer_queue.wait_all()

# Gather data. You can either go with callbacks or manually gather results in a loop.
# Here we would go with callbacks, easiest way to preserve data ordering.
# Please refer to documentation for more details.
result_img = glue_chunks(finished_chunks)

Can you share your model (+ test image)? I can't tests the code without it, the model and weights would improve our collaboration 😃 Awaiting your response!

More on this feature:

jiwaszki commented 1 year ago

@catid Second part of the answer.

As you mentioned "different models", you could try to apply similar ideas as presented with AsyncInferQueue.

Let's simplify it a little bit and think of two models that will work on half of the image. As I understand, first model could give answer to "which model should be used on a specific chunk". Assume the answer is just an output of the first model and control flow is designed already (i.e. switch-case/dict).

# Compile models, first the "decision model".
decision_model = core.compile(...)
# Expert models. Created two requests for each, as they might overlap (an assumption from the description).
first_model = core.compile(...)
first_request_0 = first_model.create_infer_request()
first_request_1 = first_model.create_infer_request()

second_model = core.compile(...)
second_request_0 = second_model.create_infer_request()
second_request_1 = second_model.create_infer_request()

# First model can run sequentially as it needs to finish before next step.
first_decision, second_decision = decision_model.infer(original_image, ...)

# `start_async` calls are not blocking, so they run together.
# Assumed decision is in format of integer value.
first_request_0_flag = False
first_request_1_flag = False
# and so on...

if first_decision == 1 and second_decision == 1:
    first_request_0.start_async({0: half_of_image_0})
    first_request_1.start_async({0: half_of_image_1})
    first_request_0_flag = True
    first_request_1_flag = True
elif first_decision == 0 and second_decision == 1:
    second_request_0.start_async({0: half_of_image_0})
    first_request_1.start_async({0: half_of_image_1})
    # ...
# so on and so on...

# Now synchronization step:
if first_request_0 is True and first_request_1 is True:
    # Wait for both requests to finish.
    first_request_0.wait()
    first_request_1.wait()
    # Postprocess result step.
    result = glue_chunks(first_request_0.get_output_tensor().data, first_request_1.get_output_tensor().data)
elif ...
# rest of the code...

This is a sketch of potential pipeline. I tried to be as verbose as possible with names to capture the idea.

As the pipeline grows in complexity (i.e. number of models), it depends on you to provide correct and efficient synchronization mechanisms. Above example can be modified to use callbacks as well (you can set them directly on specific requests, for example callbacks could help by triggering different request), or even provide more sophisticated re-use of requests and start_async/wait calls. It all depends on design now. Some notes:

Some more docs refs:

Generally this concept should be possible to translate to C++ API as well (excluding AsyncInferQueue). I am looking forward to your response!

jiwaszki commented 1 year ago

@catid I have also discovered your repository with OpenVINO code: https://github.com/catid/upsampling/blob/master/openvino/test_openvino.py

It looks amazing, I really like that you use PrePostProcessor in it! Yet, here are some optimization tips:

Preprocess first frame:

frame = preprocess_image(frames[0]) for i in range(1, len(frames)): infer_request.start_async({0: frame}) frame = preprocess_image(frames[i]) infer_request.wait()

gather data...

...

This is a general idea how loop would work. As `start_async` call frees the GIL lock, current thread handling a loop would preprocess next image while waiting for results.
* Additional tip is to use `shared_memory` mode for the inference (also available on current main):
```python
... = infer_request.infer(data, shared_memory=True)
... = infer_request.async_infer(data, shared_memory=True)
... = compiled_model(data, shared_memory=True)

It provides "zero-copy" of data if it is applicable. But be careful with this mode especially while making many parallel calls as data is shared between original array and OpenVINO. The safety of the data is on the pipeline side when using this mode. Additionally, data must follow specific rules of C++ memory layout. However, it might be only a matter of calling numpy.ascontiguousarray - which you can hide in preprocessing step (the mechanisms of OV should align data if it is not already aligned, but then you do not benefit from "zero-copy").

You can test out current main branch which will be available soon as a package.

catid commented 1 year ago

Thanks! I'm going to look at this again maybe starting next week. Trying to ship some LLM hype this week and get back to the real ML stuff next. Probably the first step would be to take your feedback into a benchmark to see how it impacts performance vs a single model to understand the trade offs before designing the full training pipeline. I'll incorporate your feedback, thanks for reviewing my code!

jiwaszki commented 1 year ago

@catid you're welcome! It was a pleasure to review it. Please ping me directly if anything else is needed.

avitial commented 1 year ago

Closing this, I hope previous responses were sufficient to help you proceed. Feel free to reopen and ask additional questions related to this topic.