jina-ai / jina

☁️ Build multimodal AI applications with cloud-native stack
https://docs.jina.ai
Apache License 2.0
20.99k stars 2.22k forks source link

What is the order of multi-replicas flow's result? #5399

Closed wqh17101 closed 1 year ago

wqh17101 commented 1 year ago

Describe the bug Hi there i have a flow like clip-as-service as below

class Inference(Executor):

    def _preproc_texts(self, docs: 'DocumentArray'):
        with self.monitor(
            name='preprocess_texts_seconds',
            documentation='texts preprocess time in seconds',
        ):
            return preproc_text(docs, tokenizer=self.per_tokenizer, device=self.per_device, args=self.per_args)

    @requests
    async def inference(self, docs: DocumentArray, parameters, **kwargs):
        with torch.no_grad():
              for minibatch, batch_data in docs.map_batch(self._preproc_texts, batch_size=200, pool=pool):
                  minibatch.tensors = (model(**batch_data).cpu().numpy())
        return docs

if __name__ == "__main__":
    torch.multiprocessing.set_start_method("spawn")
    f = Flow(port=8888).add(uses=Inference, replicas=2)
    with f:
        f.block()

Here i have a docArray with 180 samples. And then i found that one replicas has been given 100 samples and the other one has been given 80 samples.

There are two questions or bugs

  1. What's your load-balance strategy? Because the batch_size i set is 200. It does not reach the upper bound. Why it will use all of two replicas?
  2. After i request too many times, i find that there will be two results for the same docArray.tensors.
    • [80 samples results,100 samples results]
    • [100 samples results,80 samples results]

So it seems like you will divide the docArray into two partitions.And they will inference at the same time with two replicas. The final result is a combination of the results of each partition.The order is according to which partition can be competed faster.

So is there any way to keep the order of result as the same as the input docArray? Or i need to reorganize the result according to the text after querying?

Environment

JoanFM commented 1 year ago

Hey @wqh17101 ,

Let me try to answer your questions:

  1. The load balance strategy is to divide the load between replicas. If u see, the batch_size of 200 that you are setting in the Executor applies to the docs already reached the Executor, which has nothing to do with the size of the requests pass to an Executor at the same time. For this you may want to check request_size parameter https://docs.jina.ai/fundamentals/clean-code/#set-request-size. I believe the default request_size is 100 so this explains why one replica gets 100 and then the other receives 80 (1 request to each other)
  2. This question I do not understand. What is the problem? Can you give us an easy reproducible code to show the problem?
JoanFM commented 1 year ago

I think I understood the question. The order in which the requests are returned is the order in which they are sent, so you do not need to reorder the results

wqh17101 commented 1 year ago

@JoanFM Wow , so fast!

  1. The load balance strategy is to divide the load between replicas. If u see, the batch_size of 200 that you are setting in the Executor applies to the docs already reached the Executor, which has nothing to do with the size of the requests pass to an Executor at the same time. For this you may want to check request_size parameter https://docs.jina.ai/fundamentals/clean-code/#set-request-size. I believe the default request_size is 100 so this explains why one replica gets 100 and then the other receives 80 (1 request to each other)

This one really explains the first question.

I think I understood the question. The order in which the requests are returned is the order in which they are sent, so you do not need to reorder the results

That is what i think too.However it really comes out two results of the same input.

JoanFM commented 1 year ago

Do you have an Example for us to reproduce the issue, I do not understand what you mean

wqh17101 commented 1 year ago

For example: result of the 100 samples: [a1,a2...a100] result of the left 80 samples: [b1,b2...b80]

After i queried many times,i got two results for the same input. result 1:[a1,a2,....a100,b1,b2...b80] result 2:[b1,b2...b80,a1,a2,....a100]

JoanFM commented 1 year ago

Can I have a Minimal reproducible code where I can reproduce the code? How are you queriyng for instance?

wqh17101 commented 1 year ago

Hard to provide the detailed code.

the client code:

from jina import Client, DocumentArray, Document

class JinaClient:

    client_1 = Client(port=123)
    client_2 = Client(port=456)

    @classmethod
    def predict_1(cls, task_name, data_list):
        """

        :param task_name:
        :param data_list:
        :return:
        """
        r = cls.client_1.post('/', DocumentArray([Document(text=text) for text in data_list]),
                               parameters={'task_name': task_name})
        return r.tensors.tolist()

    @classmethod
    def predict_2(cls, task_name, data_list):
        """

        :param task_name:
        :param data_list:
        :return:
        """
        r = cls.client_2.post('/', DocumentArray([Document(text=text) for text in data_list]),
                                  parameters={'task_name': task_name})

    @classmethod
    def predict_1_api(cls, sentence_pairs):
        return cls.predict_1("predict1", sentence_pairs)
    @classmethod
    def predict_2_api(cls, sentence_pairs):
        return cls.predict_2("predict2", sentence_pairs)

So it is better to set the request_size as the len(data_list) as you see above. I don't think this is the key reason of the question 2.

JoanFM commented 1 year ago

I am not sure if you do something above this, but the order of results is guaranteed for one given client, but each client will receive its own requests.

Also it seems that you are using 2 clients to 2 different Flows (different ports).

JoanFM commented 1 year ago

But I would need to understand how you interact with the Flows and the this JinaClient of yours

wqh17101 commented 1 year ago

Yeah i have another class to serve client as an api

from concurrent.futures import ThreadPoolExecutor, Future, as_completed

from pred_client import JinaClient

class AsyncJinaClient:
    """
    AsyncJinaClient
    """

    thread_pool = ThreadPoolExecutor(1)

    @classmethod
    def predict_1(cls, sentence_pairs):
        return cls.thread_pool.submit(JinaClient.predict_1_api, sentence_pairs)

    @classmethod
    def predict_1(cls, data):
        return cls.thread_pool.submit(JinaClient.predict_2_api, data)

    @classmethod
    def result(cls, future: Future, timeout=2):
        """

        :param future:
        :param timeout:
        :return:
        """

        if as_completed([future], timeout):
            return future.result()
        else:
            return None

predict_future = AsyncJinaClient.predict_1(sentence_pairs)
result =AsyncJinaClient.result(predict_future)
JoanFM commented 1 year ago

Then it seems clear to me that u are getting the results as they are completed, so in that case u should handle the order on your end

wqh17101 commented 1 year ago

@JoanFM Consider of the performance 1.So what is the right way to use this Async Jina Client instead of as_completed? How to write the code? 2.Is it better to set the request_size as the len(data_list) as you see above than use something like map to get all the async result as the order i want.

JoanFM commented 1 year ago
  1. U can use asyncio.gather(*list_of_tasks) that gathers the resutlts in order. Not sure how it works with Futures. This is an async Python problem that is general to Futures and not to Jina.

  2. The request_size optimal number depends on the amount of Data ur Executor can process at once. It should not be linked to the order expected in the Client. The problem you have is to be solved by handling the as_completed in a smarter way. Maybe this post helps u https://stackoverflow.com/questions/67189283/how-to-keep-the-original-order-of-input-when-using-threadpoolexecutor

wqh17101 commented 1 year ago

2. The problem you have is to be solved by handling the as_completed in a smarter way.

However, you can see that i just submit one task to thread pool,so i will only get on Future. Jina split it to two subtasks, but as_completed is used for my parent task. I am not sure it will determine the order of jina's subtasks.

wqh17101 commented 1 year ago

I have tried these two solutions.

  1. set request_size to len(data_list) : all the data will be processed on one replicas,so the order will be the same as input.
  2. i remove as_completed and it does not work.The order is not fixed.
    
    @classmethod
    def result(cls, future: Future, timeout=2):
             return future.result()


> However, you can see that i just submit one task to thread pool,so i will only get on Future. Jina split it to two subtasks, but `as_completed` is used for my parent task. I am not sure it will determine the order of jina's subtasks.

I think this is why it does not work.
JoanFM commented 1 year ago

Okey I am reproducing the issue, I will take a deeper look

wqh17101 commented 1 year ago

@JoanFM Great,thank you.

JoanFM commented 1 year ago

We will work on adding an option to return the requests in order.