jina-ai / jina

☁️ Build multimodal AI applications with cloud-native stack
https://docs.jina.ai
Apache License 2.0
20.87k stars 2.22k forks source link

Other threads are currently calling into gRPC, skipping fork() handlers #6022

Closed Song-Gy closed 1 year ago

Song-Gy commented 1 year ago

Describe your proposal/problem

After I receive a request in an Executor endpoint, I will start a new process, process the model reasoning task asynchronously in the process, and then encapsulate the reasoning result into a Doc that meets the business requirements, and send it to Other nodes continue to process, but in practice, it is found that the post method of Client is used in the child process, resulting in blocking. Similar logic is shown in the following code:

class TDoc(BaseDoc):
    taskId: str
    dataPath: str

def func_call():
    count = 0
    while True:

        bls_client = Client(host="103.234.22.70",
                            port=11768)

        frame_meta = TDoc(
            taskId="139",
            dataPath="xxxxxxxxxx----" + str(count),
        )

        # bls_client.post(on='/submit',
        #                 inputs=[frame_meta])
        logger.info("start post to bls with count %s", count)
        bls_client.post(on='/submit',
                        inputs=[frame_meta])
        logger.info("end post to bls with count %s", count)
        if count > 10:
            break
        count += 1
class SbuMpxec(Executor):
    def __init__(self,

                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)

    @requests(on="/mock")
    async def ping(self,
                   **kwargs):
        # mp_ctx = mp.get_context('fork')
        func_runner = mp.Process(target=func_call)
        func_runner.start()

f = Flow().config_gateway(protocol="http", port=11787).add(uses=SbuMpxec)
with f:
    f.block()

Environment

jina==3.19.0 Screenshots

JoanFM commented 1 year ago

Hello @Song-Gy,

Where do you see the blocking behavior? Can you share some logs examples?

I wonder, why do you require another process for this? wouldn't having an async endpoint using the Client with asyncio=True lead to a similar behavior?

Thanks for raising this issue.

Song-Gy commented 1 year ago

Hello @Song-Gy,

Where do you see the blocking behavior? Can you share some logs examples?

I wonder, why do you require another process for this? wouldn't having an async endpoint using the Client with asyncio=True lead to a similar behavior?

Thanks for raising this issue.

The output in the child process will be blocked at logger.info("start post to bls with count %s", count), and will not output "end post ...", I also used Client(host="0.0.0.0" , port=11768, async=True), there will be no blocking at this time, but the receiving end will not receive the message (it can be received by directly calling the Client to send); the reason why I use the sub-process method for reasoning Task, this is because the original business logic is also designed in this way, and if there is a problem with the underlying C++-encapsulated reasoning framework (customized), it will not cause the entire service to crash, but will only cause the reasoning process to terminate. .

Song-Gy commented 1 year ago

The following code is more in line with the description of the above problem. My real use here is to call Client in the subprocess to transfer the result of the current node to the sub node. When the following code executes post in the subprocess, it will Blocking occurs, you can execute the following code to see the specific situation

class TDoc(BaseDoc):
    taskId: str
    dataPath: str

def func_call():
    count = 0
    while True:

        bls_client = Client(host="0.0.0.0",
                            port=11788)

        frame_meta = TDoc(
            taskId="139",
            dataPath="xxxxxxxxxx----" + str(count),
        )
        logger.info("start post to bls with count %s", count)
        bls_client.post(on='/mock',
                        inputs=[frame_meta])
        logger.info("end post to bls with count %s", count)
        if count > 10:
            break
        count += 1

class MainExec(Executor):

    @requests(on="/mock")
    async def ping(self,
                   **kwargs):
        bls_client = Client(host="0.0.0.0",
                            port=11788)

        frame_meta = TDoc(
            taskId="139",
            dataPath="main process mock message",
        )
        bls_client.post(on='/mock',
                        inputs=[frame_meta])
        func_runner = mp.Process(target=func_call)
        func_runner.start()

class SubExec(Executor):
    @requests(on="/mock")
    def ping(self,
             docs: DocList[TDoc],
             **kwargs):
        for doc in docs:
            doc.summary()
        return docs

f = Flow().config_gateway(protocol="http", port=11787).add(uses=MainExec)
f.start()

rec_f = Flow().config_gateway(protocol="grpc", port=11788).add(uses=SubExec)
rec_f.start()
JoanFM commented 1 year ago

Have u tried starting the 2 Flows in different terminals? We have seen issues with grpc and starting many Flows in some testing scenarios?

We would still look into it, but may be a way to unblock you.

Also, would not this be equivalent to having 2 Executors in the same Flow chained? MainExec followed by SubExec?

Song-Gy commented 1 year ago

Have u tried starting the 2 Flows in different terminals? We have seen issues with grpc and starting many Flows in some testing scenarios?

We would still look into it, but may be a way to unblock you.

Also, would not this be equivalent to having 2 Executors in the same Flow chained? MainExec followed by SubExec?

In a real application scenario, Main Exec and Sub Exec are located on different servers, and the above code is just for convenience of testing.

JoanFM commented 1 year ago

then could u try by starting them in different terminals? It may unblock you at the moment

Song-Gy commented 1 year ago

then could u try by starting them in different terminals? It may unblock you at the moment

I have just tried to use different processes to start two flows, but the blocking situation still exists, and SubExec cannot receive messages distributed by MainExec using subprocesses.

JoanFM commented 1 year ago

Also, you may find it useful how to add external executors to a Flow https://docs.jina.ai/concepts/orchestration/add-executors/#use-external-executors.

Song-Gy commented 1 year ago

Also, you may find it useful how to add external executors to a Flow https://docs.jina.ai/concepts/orchestration/add-executors/#use-external-executors.

I have seen this example a long time ago, but in my entire business scenario, all nodes form a distributed microservice, and the nodes hope to use Jina Client for data interaction, so what you pointed out This example of is not suitable for my usage needs.

JoanFM commented 1 year ago

Also, you may find it useful how to add external executors to a Flow https://docs.jina.ai/concepts/orchestration/add-executors/#use-external-executors.

I have seen this example a long time ago, but in my entire business scenario, all nodes form a distributed microservice, and the nodes hope to use Jina Client for data interaction, so what you pointed out This example of is not suitable for my usage needs.

I would like to understand why you want to do this yourself instead of having Jina sending them from the Gateway? Understanding different usecases can help improve the framework

Song-Gy commented 1 year ago

Also, you may find it useful how to add external executors to a Flow https://docs.jina.ai/concepts/orchestration/add-executors/#use-external-executors.

I have seen this example a long time ago, but in my entire business scenario, all nodes form a distributed microservice, and the nodes hope to use Jina Client for data interaction, so what you pointed out This example of is not suitable for my usage needs.

I would like to understand why you want to do this yourself instead of having Jina sending them from the Gateway? Understanding different usecases can help improve the framework

After MainExec receives the inference request, it will create the underlying inference execution process. Generally, the process will not be completed in a short time. For example, the inference task is to decode the video, draw frames, detect and other work, so it needs to wait for the interface in the Gateway After the end, it will be distributed to other nodes; this distributed task can only be completed asynchronously by the child process itself, and it must be ensured that when the abnormal process in the middle crashes, it will not cause the process communication service to terminate.

Song-Gy commented 1 year ago

If you use multi-threaded distribution, it is relatively normal. So I'm more curious, why the multi-process method doesn't work? The modified code is as follows:

class MainExec(Executor):

    @requests(on="/mock")
    async def ping(self,
                   **kwargs):
        bls_client = Client(host="0.0.0.0",
                            port=11789)

        frame_meta = TDoc(
            taskId="139",
            dataPath="main process mock message",
        )
        bls_client.post(on='/mock',
                        inputs=[frame_meta])
        # func_runner = mp.Process(target=func_call)
        func_runner = threading.Thread(target=func_call)
        func_runner.start()
Song-Gy commented 1 year ago

I just tried to modify the communication between nodes to websocket, which can solve the problem of sub-process distribution. Thank you very much for your answer. But I still hope that the official can help locate the problem that grpc fails to distribute in the case of multi-process. The modified code is as follows:

class TDoc(BaseDoc):
    taskId: str
    dataPath: str

def func_call():
    count = 0
    while True:

        bls_client = Client(host="websocket://0.0.0.0:11789")

        frame_meta = TDoc(
            taskId="139",
            dataPath="xxxxxxxxxx----" + str(count),
        )
        logger.info("start post to bls with count %s", count)
        bls_client.post(on='/mock',
                        inputs=[frame_meta])
        logger.info("end post to bls with count %s", count)
        if count > 10:
            break
        count += 1

class MainExec(Executor):

    @requests(on="/mock")
    async def ping(self,
                   **kwargs):
        bls_client = Client(host="websocket://0.0.0.0:11789")

        frame_meta = TDoc(
            taskId="139",
            dataPath="main process mock message",
        )
        bls_client.post(on='/mock',
                        inputs=[frame_meta])
        func_runner = mp.Process(target=func_call)
        func_runner.start()

f = Flow().config_gateway(protocol="http", port=11787).add(uses=MainExec)
f.start()
class TDoc(BaseDoc):
    taskId: str
    dataPath: str

class SubExec(Executor):
    @requests(on="/mock")
    def ping(self,
             docs: DocList[TDoc],
             **kwargs):
        for doc in docs:
            doc.summary()
        return docs

rec_f = Flow().config_gateway(protocol="websocket", port=11789).add(uses=SubExec)
rec_f.start()
JoanFM commented 1 year ago

Sure, thanks for opening the issue, we'll look into it