openfga / sdk-generator

OpenFGA Client SDK Generator
Apache License 2.0
14 stars 30 forks source link

A way to access the asyncio loop, or a way to set it early #384

Open Torxed opened 4 days ago

Torxed commented 4 days ago

Checklist

Description

In a wrapper class, in short I have:

self.configuration = ClientConfiguration(
    api_url=f"{config.openfga.protocol}://{config.openfga.host}:{config.openfga.port}"
)
self.session = OpenFgaClient(self.configuration)

And when the wrapper's .read() is called from different co-routine as an example, this happens quite frequently:

File "/app/openfga_wrapper/__init__.py", line 82, in read
    return await self.session.read(body, self.options)
...
RuntimeError: Timeout context manager should be used inside a task

And attempting to create a task specifically yields the same result:

    return await asyncio.create_task(self.session.read(body, self.options))

It still complains that the Timeout context is not executed from within a task. I'm not entirely comfortable with asyncio in general, but from what I can tell it's mainly because of how OpenFGA attempts to find/use the currently running event loop, instead of consistently using one defined loop.

And attempting to create a secondary long lasting asyncio loop with threads will produce:

attached to a different loop

Obviously if you have the option to wrap everything in one asyncio.run() call, that's the ideal solution. However if using certain libraries or threads this becomes troublesome.

Expectation

A way to set a target event loop in either ClientConfiguration or OpenFgaClient

Reproduction

Below is a sample code of an API that will generate the issue. Note that fixing the below code to work, by creating a async def _main() and avoid using separate threads would fix this particular code - but it's intended to show the threaded issue.

Run curl http://127.0.0.1:8888 against:

import asyncio
import base64
import json
import random
import string
import time
import threading
import logging
from fastapi import FastAPI
from hypercorn.config import Config
from hypercorn.asyncio import serve
from openfga_sdk.exceptions import ApiException
from openfga_sdk import ClientConfiguration, OpenFgaClient, ReadRequestTupleKey

class OpenFGAWrapper(threading.Thread):
    def __init__(self):
        self.configuration = ClientConfiguration(
            api_url=f"http://openfga:8080"
        )
        self.session = None
        self.options = None
        self.store_id = None
        self.loop = None

        threading.Thread.__init__(self)
        self.start()

    def run(self):
        asyncio.run(self._reconnect())

    def close(self):
        try:
            self.loop = asyncio.get_running_loop()
        except RuntimeError:
            self.loop = asyncio.new_event_loop()

        return self.loop.run_until_complete(openfga.session.close())

    async def _reconnect(self):
        if self.session is None:
            try:
                self.loop = asyncio.get_running_loop()
            except RuntimeError:
                self.loop = asyncio.new_event_loop()

            self.session = OpenFgaClient(self.configuration)

            pageination_options = {
                "page_size": 25,
                "continuation_token": base64.urlsafe_b64encode(json.dumps({
                    "pk":"LATEST_NSCONFIG_auth0store",
                    "sk":''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(12))
                }).encode()).decode()
            }

            # After connecting, use set_store_id() to set a default store for
            # this session.
            try:
                response = await self.session.list_stores(pageination_options)
            except ApiException as error:
                print(error.parsed_exception)
                raise error

            for store in response.stores:
                self.session.set_store_id(store.id)
                self.store_id = store.id

                # We set default options for later use, that use the last
                # available authorization model.
                try:
                    response = await self.session.read_authorization_models()
                except ApiException as error:
                    print(error.parsed_exception)
                    raise error

                for model in response.authorization_models:
                    self.options = {
                        "authorization_model_id": model.id
                    }

        while True:
            time.sleep(0.25)

    async def read(self, body):
        task = await self.loop.create_task(self.session.read(body, self.options))
        return task.result()

if __name__ == '__main__':
    logging.getLogger("hypercorn.error").setLevel(logging.DEBUG)
    logging.getLogger("hypercorn.access").setLevel(logging.DEBUG)

    corn_conf = Config()
    corn_conf.bind = f"127.0.0.1:8888"
    corn_conf.loglevel = "DEBUG"
    corn_conf.accesslog = '-'
    corn_conf.errorlog = '-'

    openfga = OpenFGAWrapper()
    app = FastAPI(
        title="TestAPI"
    )

    @app.get("/")
    async def entrypoint():
        body = ReadRequestTupleKey(
            user=f"user:testuser",
            relation="owner",
            object="document:",
        )

        try:
            response = await openfga.read(body)
        except ApiException as error:
            print(error.parsed_exception)
            raise error

    asyncio.run(serve(app, corn_conf))
    openfga.close()

SDK Checklist

OpenFGA SDK version

0.6.0

OpenFGA version

v1.4.3

SDK Configuration

See example code

Logs

No response

References

No response