microsoft / promptflow

Build high-quality LLM apps - from prototyping, testing to production deployment and monitoring.
https://microsoft.github.io/promptflow/
MIT License
8.32k stars 712 forks source link

[Feature Request] stream mode generator can't have final llm output as into to other node #3101

Open vhan2kpmg opened 2 weeks ago

vhan2kpmg commented 2 weeks ago

Is your feature request related to a problem? Please describe. We have a use case like: [llm_node] -> [save_complete_answer_in_external_history_node]. when we have [llm_node] stream mode turned on, we can't save history in DAG, instead we will need to put process of final output from llm_node outside of DAG

Describe the solution you'd like Can we have a output parameter in llm node to have final output. Take this as example

dag.yaml

$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
inputs:
  chat_history:
    type: list
    default: []
  question:
    type: string
    is_chat_input: true
    default: What is ChatGPT?
outputs:
  answer:
    type: string
    reference: ${chat.output.answer}
    is_chat_output: true
  final_answer:
    type: string
    reference: ${chat.output.final_answer}
    is_chat_output: false
nodes:
- inputs:
    # This is to easily switch between openai and azure openai.
    # deployment_name is required by azure openai, model is required by openai.
    deployment_name: gpt-35-turbo
    model: gpt-3.5-turbo
    max_tokens: "256"
    temperature: "0.7"
    chat_history: ${inputs.chat_history}
    question: ${inputs.question}
  ##### can we have this  #####
  output:
    answer: answer_generator
    final_answer: final_answer_string
  #### can we have this finished  #####
  name: chat
  type: llm
  source:
    type: code
    path: chat.jinja2
  api: chat
  connection: open_ai_connection
 ##### below are consumer of final answer example  #####
- name: save_history
  type: python
  source:
    type: code
    path: save_history.py
  inputs:
    final_answer: ${chat.output.final_answer}
  ##### consumer of final answer example ends #####
node_variants: {}
environment:
    python_requirements_txt: requirements.txt

And

f = load_flow(source="../../examples/flows/chat/chat-basic/")
f.context.streaming = True
result = f(
    chat_history=[
        {
            "inputs": {"chat_input": "Hi"},
            "outputs": {"chat_output": "Hello! How can I assist you today?"},
        }
    ],
    question="How are you?",
)

answer = ""
# the result will be a generator, iterate it to get the result
for r in result["answer"]:
    answer += r

 # result['final_answer'] should be same with "answer" after generator finished

Describe alternatives you've considered not sure Additional context not sure

0mza987 commented 1 week ago

Hi @vhan2kpmg ,

Just use the ${chat.output} as the input of the save_history node:

$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
inputs:
  chat_history:
    type: list
    default: []
  question:
    type: string
    is_chat_input: true
    default: What is ChatGPT?
outputs:
  answer:
    type: string
    reference: ${chat.output}
    is_chat_output: true
nodes:
- inputs:
    # This is to easily switch between openai and azure openai.
    # deployment_name is required by azure openai, model is required by openai.
    deployment_name: gpt-35-turbo
    model: gpt-3.5-turbo
    max_tokens: "256"
    temperature: "0.7"
    chat_history: ${inputs.chat_history}
    question: ${inputs.question}
  name: chat
  type: llm
  source:
    type: code
    path: chat.jinja2
  api: chat
  connection: open_ai_connection
- name: save_history
  type: python
  source:
    type: code
    path: save_history.py
  inputs:
    record: ${chat.output}

node_variants: {}
environment:
    python_requirements_txt: requirements.txt

And then use the flow as function to run, with streaming mode enbled:

from promptflow import load_flow

f = load_flow(source=r"E:\programs\msft-promptflow\examples\flows\chat\chat-basic-streaming")
f.context.streaming = True

result = f(
    chat_history=[
        {
            "inputs": {"chat_input": "Hi"},
            "outputs": {"chat_output": "Hello! How can I assist you today?"},
        }
    ],
    question="How are you?",
)

answer = ""
# the result will be a generator, iterate it to get the result
for r in result["answer"]:
    answer += r

print(answer)

Inside the save_history node I save the record to a local txt file:

from promptflow.core import tool

@tool
def save(record: str):
    # append the record to the history file
    with open("history.txt", "a") as f:
        f.write(record + "\n")
    print(f"Recorded: {record}")

Everytime I run this flow the record can be recorded to the txt file. Could you please provide more details about the statement:

when we have [llm_node] stream mode turned on, we can't save history in DAG

What's the error message, and do you have a sample to repro it?

vhan2kpmg commented 5 days ago

Hi, thanks for your reply. Sorry I may not explain clearly initially, we can save history, just we will lose the benefit of steam mode meanwhile. From what I tested,

That makes sense in some way because dag outputs are only ready when all nodes are finished? But the purpose of stream mode is to have answer chunk by chunk before finial result finishes, if there is node after llm node, dag wait until all node finishes ? in out example dag wait until response is saved, when all llm response chunks are finished.

Is there anyway we can output generator immediately, meanwhile leave save history as some background tasks?