run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
33.11k stars 4.61k forks source link

[Question]: Bedrock LLMs async #10714

Open austinmw opened 4 months ago

austinmw commented 4 months ago

Question Validation

Question

Hi, I've constructed a query engine and am testing the following query pipeline with p.run vs p.arun:

image

Regular:

response = p.run(input="What did the author do during his time in YC?")

Async:

response = await p.arun(input="What did the author do during his time in YC?")

When I use the default OpenAI LLM, the async operation is significantly faster. However, when I switch the LLM to a Bedrock LLM, the operations both take the same amount of time. Do Bedrock LLMs not yet support async?

dosubot[bot] commented 4 months ago

🤖

Hey there @austinmw! Great to see you back. 😊 I remember our last interaction. I'm diving into your query about the async operations with Bedrock LLMs now and will get back to you with an answer shortly!

austinmw commented 4 months ago

Nvm I see in Bedrock class the following:

    async def achat(
        self, messages: Sequence[ChatMessage], **kwargs: Any
    ) -> ChatResponse:
        """Chat asynchronously."""
        # TODO: do synchronous chat for now
        return self.chat(messages, **kwargs)

    async def acomplete(
        self, prompt: str, formatted: bool = False, **kwargs: Any
    ) -> CompletionResponse:
        raise NotImplementedError

Curious if it's possible to support async yet?

dosubot[bot] commented 4 months ago

🤖

Yes, it is indeed possible for Bedrock LLMs in the LlamaIndex to support asynchronous operations. The current implementation of the achat method in the Bedrock class is designed to chat asynchronously, but it is currently just calling the synchronous chat method. This is indicated by the comment # TODO: do synchronous chat for now.

As for the acomplete method, it is supposed to complete a prompt asynchronously. However, it is not implemented yet and raises a NotImplementedError when called.

The LlamaIndex provides several utility functions that facilitate asynchronous operations using Python's asyncio library. For instance, the run_async_tasks function accepts a list of coroutine tasks and runs them asynchronously. If the show_progress parameter is set to True, it uses the tqdm library to display a progress bar.

Here is an example of how you might use these functions to run asynchronous tasks:

tasks = [some_async_function(arg) for arg in args_list]
results = run_async_tasks(tasks)

In this example, some_async_function would be an asynchronous function that you want to run for each argument in args_list, and run_async_tasks would run these tasks asynchronously and return a list of results.

The difference between the run and arun methods in the LlamaIndex codebase lies in their execution nature. The _run_component method is a synchronous method, while the _arun_component method is an asynchronous method. In the _run_component method, the query method of the query_engine object is called synchronously. This means that the execution of the program will block or wait until the query method has completed its execution. On the other hand, in the _arun_component method, the aquery method of the query_engine object is called asynchronously using the await keyword. This means that the execution of the program will not wait for the aquery method to complete its execution. Instead, it will move on to execute other parts of the program, and come back to it once the aquery method has completed its execution.

In summary, the main difference between the run and arun methods is that run is synchronous (blocking) and arun is asynchronous (non-blocking).

You can find more details in the LlamaIndex repository.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.
logan-markewich commented 4 months ago

Looks like it could be implemented, but just hasn't yet

Would need to use an async boto3 session: https://pypi.org/project/aioboto3/

austinmw commented 4 months ago

Ah okay. It looks like I can use aioboto3 with bedrock-runtime like this:

# # For notebooks
# import nest_asyncio
# nest_asyncio.apply()

import aioboto3
import json
from botocore.config import Config
import asyncio

session = aioboto3.Session()

async def fetch_completion(
    session: aioboto3.session.Session,
    body: str,
    modelId: str = "anthropic.claude-instant-v1",
    accept: str="application/json",
    contentType: str="application/json",
    ):

    retry_config = Config(retries={'max_attempts': 10, 'mode': 'adaptive'})

    async with session.client('bedrock-runtime', config=retry_config) as brt:

        # Making the invoke_model call asynchronously
        response = await brt.invoke_model(
            body=body,
            modelId=modelId,
            accept=accept,
            contentType=contentType,
        )

        # Asynchronously reading the response body

        # Check if 'Body' key exists or inspect the response structure
        if 'body' in response:
            response_body_str = await response['body'].read()
        else:
            print("Response structure:", response)
            return  # Exit the function if the structure is not as expected

        response_body = json.loads(response_body_str)
        return response_body.get('completion')

body = json.dumps({
    "prompt": "\n\nHuman: concisely explain black holes to 8th graders\n\nAssistant:",
    "max_tokens_to_sample": 50,
    "temperature": 0.1,
    "top_p": 0.9,
})

# Running once
result = asyncio.run(fetch_completion(session, body))
print(result)

# Running multiple times in parallel
payloads = [body] * 10
results = await asyncio.gather(*(fetch_completion(session, payload) for payload in payloads))
assert len(results) == 10
print(results)
austinmw commented 4 months ago

This seems to work for me and successfully improve my response time; not sure if there's any issues with it I might have missed:

import asyncio
from botocore.config import Config
import json
from typing import Any, Sequence

from llama_index.llms.bedrock import Bedrock
from llama_index.llms.bedrock.utils import completion_with_retry
from llama_index.core.base.llms.types import (
    ChatMessage,
    ChatResponse,
    CompletionResponse
)
from llama_index.core.llms.generic_utils import (
    completion_response_to_chat_response,
)

from llama_index.core.llms.callbacks import (
    llm_chat_callback,
    llm_completion_callback,
)
from llama_index.core.bridge.pydantic import Field, PrivateAttr

class AsyncBedrock(Bedrock):

    _session: Any = PrivateAttr()

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Initialize the aioboto3 session here for use in async methods
        self._session = aioboto3.Session(
            aws_access_key_id=self.aws_access_key_id,
            aws_secret_access_key=self.aws_secret_access_key,
            aws_session_token=self.aws_session_token,
            region_name=self.region_name,
            profile_name=self.profile_name,
        )

    @llm_completion_callback()
    async def acomplete(
        self, prompt: str, formatted: bool = False, **kwargs: Any
    ) -> CompletionResponse:
        # Ensure prompt formatting
        if not formatted:
            prompt = self.completion_to_prompt(prompt)
        all_kwargs = self._get_all_kwargs(**kwargs)
        request_body = self._provider.get_request_body(prompt, all_kwargs)
        request_body_str = json.dumps(request_body)

        retry_config = Config(retries={'max_attempts': self.max_retries, 'mode': 'adaptive'},
                              connect_timeout=self.timeout,
                              read_timeout=self.timeout)

        # Use the aioboto3 client within an async with block
        async with self._session.client('bedrock-runtime', config=retry_config) as client:
            response = await client.invoke_model(
                body=request_body_str,
                modelId=self.model,
                accept='application/json',
                contentType='application/json',
            )

            if 'body' in response:
                response_body_str = await response['body'].read()
                response_body = json.loads(response_body_str)
                return CompletionResponse(
                    text=self._provider.get_text_from_response(response_body), raw=response_body
                )
            else:
                raise ValueError("Unexpected response format")

    @llm_chat_callback()
    async def achat(
        self, messages: Sequence[ChatMessage], **kwargs: Any
    ) -> ChatResponse:
        prompt = self.messages_to_prompt(messages)
        completion_response = await self.acomplete(prompt, formatted=True, **kwargs)
        return completion_response_to_chat_response(completion_response)

region = "us-east-1"

llm = AsyncBedrock(
    model="anthropic.claude-instant-v1",
    model_kwargs={
        "temperature": 0.2
    },
    region_name=region,
)
billmetangmo commented 3 months ago

@austinmw I am thinking about doing a pull request from your work. Oki for you ?

brycecf commented 2 months ago

@billmetangmo Any updates on this? Also interested since synchronous can be painfully slow.

@logan-markewich LangChain has added this, so it would be good to have it here as well.

austinmw commented 2 months ago

@billmetangmo go for it!

brycecf commented 1 month ago

@billmetangmo Have you started this? If not, I'll take a stab.

billmetangmo commented 1 month ago

Did not have the time yet. You cn go for ot @brycecf

mdciri commented 1 week ago

@austinmw, I am using Bedrock provider too. How should I change the code to make run the LLM evaluation as in: https://docs.llamaindex.ai/en/stable/examples/retrievers/auto_merging_retriever/#evaluation ?