workfloworchestrator / orchestrator-core

The workflow orchestrator core repository
Apache License 2.0
36 stars 14 forks source link

Investigate using sqlalchemy.ext.asyncio for graphql resolvers #628

Open Mark90 opened 2 months ago

Mark90 commented 2 months ago

We use fastapi and strawberry to utilise the power of asyncio. However, our asynchronous graphql resolvers perform synchronous database calls which block the event loop. This means that while the API process is waiting for a database query to return it will not handle new graphql requests, greatly reducing throughput.

Simply changing the graphql resolvers to be synchronous should make them run in worker threads when using uvicorn & fastapi, but this doesn't seem to be the case in my tests (added below). I don't know if this is something you have to explicitly configure, if so I could try that to make a fair comparison.

At any rate, using asyncio is in line with the libraries we have chosen, so it makes sense to investigate async database queries. We could have a look at SQL Alchemy's asyncio support and attempt to use it within a graphql resolver and a REST endpoint. The 3rd testcase shows the increase in throughput this would warrant.

Tests for reference

In each of these tests the graphql resolver performs a database call pg_sleep(0.5) to simulate a query taking 0.5 seconds. Using the locust I have then tested maximum throughput by performing 30 concurrent graphql queries for a short period of time.

  1. postgres_async_sync.py This is how we currently use async resolvers with sync DB calls. The maximum observed throughput is 2 rps.
# `strawberry server postgres_async_sync`
from datetime import datetime as dt
import psycopg
import strawberry

DSN='postgresql://localhost:5432'
# db = psycopg.connect(DSN)

@strawberry.type
class SleepResult:
    msg: str

async def pg_sleep():
    db = psycopg.connect(DSN)  # not needed, but to make the comparison with the other script fair
    cur = db.execute('SELECT pg_sleep(0.5), version()')
    result = cur.fetchone()
    print(f"{dt.now().strftime('%H:%M:%S')} Performed query {__name__}")
    return SleepResult(msg=str(result))

@strawberry.type
class Query:
    sleepy: SleepResult = strawberry.field(resolver=pg_sleep)

schema = strawberry.Schema(query=Query)
print(f"Initialized {__name__}")
  1. fastapi_postgres_async_sync.py Runs the previous example through uvicorn/fastapi. The maximum throughput is still 2 rps.
# `uvicorn fastapi_postgres_async_sync:app`
from strawberry.fastapi import GraphQLRouter
import fastapi

from postgres_async_sync import schema

graphql_app = GraphQLRouter(schema)
app = fastapi.FastAPI()
app.include_router(graphql_app, prefix="/graphql")
  1. postgres_async_async.py Changes the resolver to make asynchronous DB calls using psycopg. The maximum throughput on my machine was around 50 rps. Using sqlalchemy's asyncio extension should allow similar throughput.
# `strawberry server postgres_async_async`
from datetime import datetime as dt
import psycopg
import strawberry

DSN='postgresql://localhost:5432'

@strawberry.type
class SleepResult:
    msg: str

async def pg_sleep():
    db = await psycopg.AsyncConnection.connect(DSN)
    cur = await db.execute('SELECT pg_sleep(0.5), version()')
    result = await cur.fetchone()
    print(f"{dt.now().strftime('%H:%M:%S')} Performed query {__name__}")
    return SleepResult(msg=str(result))

@strawberry.type
class Query:
    sleepy: SleepResult = strawberry.field(resolver=pg_sleep)

schema = strawberry.Schema(query=Query)
print(f"Initialized {__name__}")

Libraries used:

fastapi==0.110.2
psycopg==3.1.18
psycopg-binary==3.1.18
psycopg-pool==3.2.1
strawberry-graphql==0.227.2
uvicorn==0.29.0
uvloop==0.19.0
Mark90 commented 2 months ago

If an async DB handler is too difficult, investigate alternative solution of making the top-level query resolvers run in a thread

Roughly something like this;

def make_async(fn) -> Coroutine: 
  # Return coroutine that runs fn in thread

def pg_sleep():
    db = psycopg.connect(DSN)  # not needed, but to make the comparison with the other script fair
    cur = db.execute('SELECT pg_sleep(0.5), version()')
    result = cur.fetchone()
    print(f"{dt.now().strftime('%H:%M:%S')} Performed query {__name__}")
    return SleepResult(msg=str(result))

@strawberry.type
class Query:
    sleepy: SleepResult = strawberry.field(resolver=make_async(pg_sleep))