NVIDIA / NeMo-Guardrails

NeMo Guardrails is an open-source toolkit for easily adding programmable guardrails to LLM-based conversational systems.
Other
3.9k stars 352 forks source link

Streaming does not work with BedrockChat interface #459

Open scchengaiah opened 4 months ago

scchengaiah commented 4 months ago

Hello,

When using BedrockChat through langchain, the streaming functionality does not work. Claude3 models requires BedrockChat interface to be used. When I switch to the Bedrock interface with older Claude models, the streaming feature works fine.

Here is the example to replicate the behavior.


from dotenv import load_dotenv
import asyncio

load_dotenv(dotenv_path="../.env")

from langchain_community.chat_models.bedrock import BedrockChat
from langchain_community.llms.bedrock import Bedrock
import os

model_id="anthropic.claude-3-haiku-20240307-v1:0"
model_id="anthropic.claude-instant-v1"

async def execute_with_guardrails():

    colang_content = """
    define user express greeting
        "hello"
        "hi"

    define bot express greeting
        "Hello there!! How can I assist you ?"

    define flow hello
        user express greeting
        bot express greeting

    define flow
        user ...
        $answer = execute call_llm(user_query=$user_message)
        bot $answer
    """

    yaml_content = """
    models:
        -   type: main
            engine: amazon_bedrock        
    """

    llm = BedrockChat(
                region_name=os.getenv("AWS_REGION"),
                model_id = model_id,
                streaming=True
            )
    from nemoguardrails.actions import action
    from typing import Optional
    from langchain_core.language_models import BaseLLM
    from langchain_core.runnables import RunnableConfig
    from nemoguardrails.context import streaming_handler_var

    @action(is_system_action=True)
    async def call_llm(user_query: str, llm: Optional[BaseLLM]) -> str:
        call_config = RunnableConfig(callbacks=[streaming_handler_var.get()])
        response = await llm.ainvoke(user_query, config=call_config)
        return response.content

    from nemoguardrails import LLMRails, RailsConfig

    config = config = RailsConfig.from_content(
        yaml_content=yaml_content,
        colang_content=colang_content
    ) 
    # We go with Azure OpenAI LLM considering the optimization of prompts with Bedrock.
    rails = LLMRails(config, llm=llm)

    # Register custom action
    rails.register_action(call_llm)

    async def process_tokens(streaming_handler):
        async for chunk in streaming_handler:
            print(chunk, end="", flush=True)
            # Or do something else with the token

    from nemoguardrails.streaming import StreamingHandler
    streaming_handler = StreamingHandler()
    streaming_handler_var.set(streaming_handler)

    streaming_task =asyncio.create_task(process_tokens(streaming_handler))

    messages = [{"role": "user","content": "Tell me a joke in 100 words"}]

    result = await rails.generate_async(
        messages=messages, streaming_handler=streaming_handler
    )
    await streaming_task

if __name__ == "__main__":
    asyncio.run(execute_with_guardrails())

In the above code, streaming works when replacing BedrockChat with Bedrock. However, for latest Claude3 models, we have to use BedrockChat.

Can someone help with the workaround or an approach to tackle the above scenario ?

Thank you.

prabuvnc commented 3 months ago

Same issue as above, is there a fix or workaround for this as we need to use claude 3 for our usecase and can't find any solution to use along with nemo.

chengaiahsc commented 3 months ago

We have applied a workaround by attaching the callback handler to the llm instance and streamed response.

# Custom callback handler class to attach to the llm instance
class BotStreamingCallbackHandler(BaseCallbackHandler):
        def __init__(self):
          # Initialize required objects for streaming capabilities. For example: chat interface object
          pass

        async def on_llm_new_token(self, token, **kwargs) -> None:
            # Stream tokens to the chat interface.
            pass

        def __call__(self, *args, **kwargs):
            pass

@action(is_system_action=True)
async def call_llm(user_query: str, llm: Optional[BaseLLM]) -> str:    
    if llm.callbacks is None:
           llm.callbacks = [BotStreamingCallbackHandler()]
        else:
            llm.callbacks.append(BotStreamingCallbackHandler())

    response = await llm.ainvoke(user_query)
    return response.content

Please note that if you do not want to stream the response generated by guardrails invocation, then use dedicated llm object instance inside the call_llm method and attach the callback handler to the same.

Hope this can help 🙂