vmagamedov / grpclib

Pure-Python gRPC implementation for asyncio
http://grpclib.readthedocs.io
BSD 3-Clause "New" or "Revised" License
936 stars 92 forks source link

Error "RuntimeError: There is no current event loop in thread 'AnyIO worker thread'." when FastApi server is running. #177

Closed stanmb closed 1 year ago

stanmb commented 1 year ago

Hello! I have a FastApi server running and try to create Channel object from client.py. When I do so i face the error RuntimeError: There is no current event loop in thread 'AnyIO worker thread'

I've found a way how to fix it: Add this method to the client.py

def get_or_create_eventloop(self):
    try:
        return asyncio.get_event_loop()
    except RuntimeError as ex:
        if "There is no current event loop in thread" in str(ex):
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            return asyncio.get_event_loop()

and use it in the init() method of Channel class like this: self._loop = loop or self.get_or_create_eventloop()

Another way is create loop and pass it as a parameter to the Channel constructer like this:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
channel = Channel(host='host', port=82, loop=loop)

but loop parameter is deprecated.

Is there any better way to resolve the issue?

vmagamedov commented 1 year ago

Can you provide an example of how you setup your server and when/where exactly you create a Channel?

You definitely don't have to create another event loop. The same is true for grpclib itself.

stanmb commented 1 year ago

I use to create a Channel inside static function:

def fetch_stores(pos_ids):
   channel = Channel(host='host', port=82)
   grpc_stub = StoreGRPC(channel)
   response = grpc_stub.store.get_stores(GetStoresRequest(pos_ids=[pos_ids]))
   channel.close()
   return response

But after I moved it to a class, the error doesn't appear:

class Store:
   def fetch_stores(pos_ids):
      channel = Channel(host='host', port=82)
      grpc_stub = StoreGRPC(channel)
      response = grpc_stub.store.get_stores(GetStoresRequest(pos_ids=[pos_ids]))
      channel.close()
      return response

Was that the reason?

stanmb commented 1 year ago

The whole picture is: main.py

from fastapi import FastAPI

from route import router

def get_application() -> FastAPI:
    application = FastAPI()
    application.include_router(router)
    return application

app = get_application()

route.py

@router.get('/getSomething')
def get_something() -> SomethingOutput:
    return SomethingCreator().get_something()

something_creator.py

class SomethingCreator:
       def get_something(self):
          something = Something().get_something(som_id)

something.py

class Something:
   def get_something(som_ids):
      channel = Channel(host='host', port=82)
      grpc_stub = StoreGRPC(channel)
      response = grpc_stub.store.get_stores(GetStoresRequest(som_ids=[som_ids]))
      channel.close()
      return response

Then I run a server by command 'uvicorn main:app' This code works locally but when I run it in docker container in kubernetes, the error There is no current event loop in thread appears again.

vmagamedov commented 1 year ago

The reason loop argument is deprecated is that the Channel should be created inside async function.

If you want to use async calls inside your handler then your handler should be also async:

@router.get('/getSomething')
async def get_something()

In gRPC it is not very efficient to create a channel for every single request. It is better to setup channel when your application starts and use it when you handle request.

In fastapi I would use Lifespan Events -> Async Context Manager to setup Channel and use it inside request handlers.

stanmb commented 1 year ago

@vmagamedov Thank you very much for helping!