NVIDIA / NeMo-Guardrails

NeMo Guardrails is an open-source toolkit for easily adding programmable guardrails to LLM-based conversational systems.
Other
4.23k stars 403 forks source link

Streams intermediate LLM calls in custom action. #646

Open mikeolubode opened 4 months ago

mikeolubode commented 4 months ago

I am using a custom action which is make up of several chains from langchain. I am using only self-check-input in configuration and I am streaming the output.

rag_chain = (
  restructure_question_chain
  | llm_chain
)

When I run app.generate_async(), I only get output from the restructure_question_chain, and the streaming (at the front end) stops. The output of the second chain, which is the output I want, only streams on my backend console. What am I doing wrong. Please help.

Here is my config and my code.

models:
 - type: main
   engine: openai
   model: 
   parameters:
    api_key: 
    base_url: 
    temperature: 0.1

 - type: self_check_input
   engine: openai
   model: 
   parameters:
    api_key: 
    base_url: 
    temperature: 0.1

instructions:
  - type: general
    content: |
      Below is a conversation between a bot and a user. 

sample_conversation: |
  user "Hello there!"
    express greeting
  bot express greeting
    "Hello! How can I assist you today?"

rails:
  input:
    flows:
      - self check input
    dialog:
        user_messages:
            embeddings_only: True
streaming: True

# rails.co file content below
define flow self check input
  $allowed = execute self_check_input

  if $allowed
    $result = execute custom_action(question=$user_message)
    bot $result
    stop
  else
    bot refuse to respond
    stop    

@action(is_system_action=True, execute_async=True)
async def rag_chain(question):
    config = RunnableConfig(callbacks=[streaming_handler_var.get()])
    response = await rag_chain.ainvoke(
        input={"question"},
        config=config,
    )
    return response

@app.post("/rails")
async def rails_app(body: BaseInput):
    if llm_rails.config.streaming_supported and llm_rails.main_llm_supports_streaming:

        app.register_action(rag_chain, name="custom_action")

        streaming_handler = StreamingHandler(enable_print=True)
        streaming_handler_var.set(streaming_handler)

        asyncio.create_task(
            app.generate_async(
                messages=[{"role": "user", "content": body.question}],
                streaming_handler=streaming_handler
            )
        )
        return StreamingResponse(streaming_handler)

## using this to call /rails route
with httpx.stream("POST",www.api.com/rails,json=data) as response:
    for chunk in response.iter_text():
        await msg.stream_token(chunk)
niels-garve commented 2 months ago

Hi @mikeolubode ,

Thanks for bringing that up!

I ran into the same problems and opened a pull request: #735

I would suggest reading through the pull request, as I added documentation of the problem and fix there. But please don't hesitate to ask questions about the above fix.

Cheers! Niels