Quansight / ragna

RAG orchestration framework ⛵️
https://ragna.chat
BSD 3-Clause "New" or "Revised" License
181 stars 22 forks source link

Synchronous endpoints on ragna.core.Chat #212

Open pmeier opened 1 year ago

pmeier commented 1 year ago

In #183 I suggested to have sync endpoints on the ragna.core.Chat in addition to the async ones. This could help people that are not familiar with async programming and aren't looking for maximum performance, but rather just something that they can run easily.

I see two ways to go about this:

  1. Make Chat.prepare and Chat.answer the sync endpoints and move the current functionality to Chat.aprepare and Chat.aanswer. This would break existing code, but aligns the naming scheme with the canonical one in the Python ecosystem. Breaking stuff is not terribly bad right now, since we are still on a 0. release.
  2. Create Chat.sprepare and Chat.sanswer as sync endpoints and keep the async endpoints as is. This would make it more explicit that Ragna is async first, but would go against canonical naming and thus could lead to confusion.

I assumed this was not hard given that we have the async endpoints already. However, I've tried to implement this in #205, but couldn't get it working properly. The problem comes from the fact that I don't want to re-implement the sync endpoints when we already have the async ones. Meaning, the sync endpoints should just be thin wrappers that run the async endpoints synchronously. To make matters worse, this also needs to be supported in case we actually have an async event loop already running that the user doesn't want to use, e.g. they are programming in a jupyter notebook. So actually what we want is to synchronously run async code while potentially being inside an existing event loop.

Here is some test code that I had in #205 when going for option 1. This would need to run without any issues on all platforms for a PR to be accepted:

import asyncio
import inspect

import pytest

import ragna.core
from ragna import Rag
from ragna.assistants import RagnaDemoAssistant
from ragna.source_storages import RagnaDemoSourceStorage

async def e2e_async(*, documents, source_storage, assistant):
    async with Rag().chat(
        documents=documents,
        source_storage=source_storage,
        assistant=assistant,
    ) as chat:
        return await chat.aanswer("?")

async def e2e_sync_in_async(*, documents, source_storage, assistant):
    with Rag().chat(
        documents=documents,
        source_storage=source_storage,
        assistant=assistant,
    ) as chat:
        return chat("?")

def e2e_sync(*, documents, source_storage, assistant):
    with Rag().chat(
        documents=documents,
        source_storage=source_storage,
        assistant=assistant,
    ) as chat:
        return chat("?")

@pytest.mark.parametrize("e2e_fn", [e2e_async, e2e_sync_in_async, e2e_sync])
def test_e2e(tmp_local_root, e2e_fn):
    document_root = tmp_local_root / "documents"
    document_root.mkdir()
    document_path = document_root / "test.txt"
    with open(document_path, "w") as file:
        file.write("!\n")

    document = ragna.core.LocalDocument.from_path(document_path)

    answer = e2e_fn(
        documents=[document],
        source_storage=RagnaDemoSourceStorage,
        assistant=RagnaDemoAssistant,
    )
    if inspect.iscoroutine(answer):
        answer = asyncio.run(answer)

    assert isinstance(answer, ragna.core.Message)
    assert answer.role is ragna.core.MessageRole.ASSISTANT
    assert {source.document.name for source in answer.sources} == {document.name}

This is a superset of what we have in

https://github.com/Quansight/ragna/blob/2c1e5c407c4e8afd7f2976021bd5f6551ff57af0/tests/core/test_e2e.py#L9

which currently only tests the equivalent of e2e_async.

nenb commented 1 year ago

My impression is that your work in #205 was probably the recommended approach - at least that is what I got from reading this still active thread on the topic. It seems to me like doing what you have done and sticking with the async-only approach is the most straightforward approach for the moment.

I just spent a little time playing around with it in a Jupyter notebook where an event loop is already running (couldn't resist!) and here was my fairly crude attempt:

from concurrent.futures import ThreadPoolExecutor
import asyncio
import threading

async def async_func(event):
    print("Sleeping...")
    await asyncio.sleep(2)
    print("Exiting async...")
    event.set()

async def other_async_func_in_main_thread():
    print("Enter other...")
    await asyncio.sleep(1)
    print("Exiting other...")

def sync_wrapper(async_func):
    new_loop = asyncio.new_event_loop()
    ThreadPoolExecutor().submit(new_loop.run_forever)
    event = threading.Event()
    asyncio.run_coroutine_threadsafe(async_func(event), new_loop)
    print("Waiting for async to finish...")
    asyncio.create_task(other_async_func_in_main_thread())
    event.wait()
    new_loop.call_soon_threadsafe(new_loop.stop)
    print("Async done!")
pmeier commented 1 year ago

Thanks a ton @nenb for the xref! That was a good read. Although there is no solution yet, I'm at least glad that this is a hard problem and I didn't miss something obvious.

Since I would expect most of the users of the Python API to either

  1. be in a jupyter notebook and thus basically only need to sprinkle in an await from time to time since they have a dormant event loop running, or
  2. build something on top of Ragna and thus should use the async API anyway

I think sync endpoints are of a lower priority for now. If we reconsider here based on user feedback we at least have your snippet now as base.