jina-ai / serve

☁️ Build multimodal AI applications with cloud-native stack
https://jina.ai/serve
Apache License 2.0
21.13k stars 2.22k forks source link

ASGI Application Error for Request when Executors are Starting #4616

Closed kiritowu closed 2 years ago

kiritowu commented 2 years ago

Describe the bug An error will arise when a new request is being sent while waiting for all executors to be ready.

MWU to reproduce (Slightly modified from the previous issue):

import multiprocessing
import time

from jina import Flow, Executor, requests, Client, Document, DocumentArray

class SlowInitExec(Executor):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        time.sleep(10.0)

    @requests(on='/bla')
    def foo(self, docs, **kwargs):
        for doc in docs:
            doc.text = 'bla'

def query_flow():
    time.sleep(3.0)  # Update sleep to 3 seconds
    client = Client(protocol='http', port=12345)
    result = client.post('/bla', inputs=DocumentArray(Document()))
    print(result)

if __name__ == '__main__':
    p = multiprocessing.Process(target=query_flow)
    p.start()
    with Flow(protocol='http', port=12345).add(uses=SlowInitExec) as f:
        f.block()
    p.join()

Error Message: (Ignore the ContentTypeError but the real problem is the ERROR: Exception in ASGI application portion)

HTTPClient@17954[E]:Error while fetching response from HTTP server ContentTypeError(RequestInfo(url=URL('http://0.0.0.0:12345/post'), method='POST', headers=<CIMultiDictProxy('Host': '0.0.0.0:12345', 'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'User-Agent': 'Python/3.9 aiohttp/3.8.1', 'Content-Length': '507', 'Content-Type': 'application/json')>, real_url=URL('http://0.0.0.0:12345/post')), (), message='Attempt to decode JSON with unexpected mimetype: text/plain; charset=utf-8', headers=<CIMultiDictProxy('Date': 'Thu, 14 Apr 2022 10:19:57 GMT', 'Server': 'uvicorn', 'Content-Length': '21', 'Content-Type': 'text/plain; charset=utf-8')>)
Process Process-1:
Traceback (most recent call last):
  File "/home/wongz/miniconda3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/home/wongz/miniconda3/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/wongz/code/av-intelligence/ai-model/test.py", line 20, in query_flow
    result = client.post('/bla', inputs=DocumentArray(Document()))
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/jina/clients/mixin.py", line 156, in post
    return run_async(
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/jina/helper.py", line 1336, in run_async
    return get_or_reuse_loop().run_until_complete(func(*args, **kwargs))
  File "uvloop/loop.pyx", line 1501, in uvloop.loop.Loop.run_until_complete
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/jina/clients/mixin.py", line 147, in _get_results
    async for resp in c._get_results(*args, **kwargs):
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/jina/clients/base/http.py", line 124, in _get_results
    raise e
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/jina/clients/base/http.py", line 77, in _get_results
    r_str = await response.json()
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/aiohttp/client_reqrep.py", line 1103, in json
    raise ContentTypeError(
aiohttp.client_exceptions.ContentTypeError: 0, message='Attempt to decode JSON with unexpected mimetype: text/plain; charset=utf-8', url=URL('http://0.0.0.0:12345/post')
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/uvicorn/protocols/http/httptools_impl.py", line 372, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 75, in __call__
    return await self.app(scope, receive, send)
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/fastapi/applications.py", line 261, in __call__
    await super().__call__(scope, receive, send)
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/starlette/applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/starlette/middleware/errors.py", line 181, in __call__
    raise exc
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/starlette/middleware/errors.py", line 159, in __call__
    await self.app(scope, receive, _send)
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/starlette/exceptions.py", line 82, in __call__
    raise exc
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/starlette/exceptions.py", line 71, in __call__
    await self.app(scope, receive, sender)
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/starlette/routing.py", line 656, in __call__
    await route.handle(scope, receive, send)
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/starlette/routing.py", line 259, in handle
    await self.app(scope, receive, send)
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/starlette/routing.py", line 61, in app
    response = await func(request)
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/fastapi/routing.py", line 227, in app
    raw_response = await run_endpoint_function(
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
    return await dependant.call(**values)
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/jina/serve/runtimes/gateway/http/app.py", line 178, in post
    result = await _get_singleton_result(
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/jina/serve/runtimes/gateway/http/app.py", line 334, in _get_singleton_result
    async for k in streamer.stream(request_iterator=request_iterator):
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/jina/serve/stream/__init__.py", line 74, in stream
    async for response in async_iter:
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/jina/serve/stream/__init__.py", line 135, in _stream_requests
    response = self._result_handler(future.result())
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/jina/serve/runtimes/gateway/request_handling.py", line 75, in _process_results_at_end_gateway
    partial_responses = await asyncio.gather(*tasks)
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/jina/serve/runtimes/gateway/graph/topology_graph.py", line 82, in _wait_previous_and_send
    resp, metadata = await connection_pool.send_requests_once(
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/jina/serve/networking.py", line 603, in task_wrapper
    return await connection.send_requests(
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/jina/serve/networking.py", line 196, in send_requests
    await self._init_stubs()
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/jina/serve/networking.py", line 170, in _init_stubs
    available_services = await GrpcConnectionPool.get_available_services(
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/jina/serve/networking.py", line 906, in get_available_services
    async for res in response:
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/grpc/aio/_call.py", line 326, in _fetch_stream_responses
    await self._raise_for_status()
  File "/home/wongz/.cache/pypoetry/virtualenvs/avi-ai-5_3htS5T-py3.9/lib/python3.9/site-packages/grpc/aio/_call.py", line 236, in _raise_for_status
    raise _create_rpc_error(await self.initial_metadata(), await
grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "failed to connect to all addresses"
        debug_error_string = "{"created":"@1649931597.633074074","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3150,"referenced_errors":[{"created":"@1649931597.633072771","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":165,"grpc_status":14}]}"

Environment

JoanFM commented 2 years ago

Hey, @kiritowu.

What is the expected behavior here? Of course, if the Executors are not yet started, sending a query when they are not ready will not fail? Would you expect a better error handling or what exactly?

kiritowu commented 2 years ago

@JoanFM In Jina 2.x, when the executors are starting, the behaviour which I experienced is that any request will be put on hold until all the executors are ready to process the request.

The reason why this bug is slightly inconvenient is that when I deployed this as a container on the cloud, the container would only be started when a new request is sent. However, since the container is just spinning up and all the executors are in the preparing phase, the error ended up being propagated to the HTTP client.

What I am looking for is a similar behaviour in Jina 2.x whereby the content of the request is sent to the executors only when all executors are ready. Hope that better clarifies.

JoanFM commented 2 years ago

Hey @kiritowu,

Due to the changes of 3.0 this behavior is by nature not there.

What you need to do is to make sure that the Executors are ready.

This can be done in 2 ways;