microsoft / promptflow

Build high-quality LLM apps - from prototyping, testing to production deployment and monitoring.
https://microsoft.github.io/promptflow/
MIT License
9.17k stars 828 forks source link

[BUG] PFClient.run using flow with AsyncIterator raises `TypeError: cannot pickle '_thread.lock' object` #3413

Open bwilliams2 opened 3 months ago

bwilliams2 commented 3 months ago

Describe the bug Batch runs cannot be successfully completed if the flow call produces an AsyncIterator that ends up wrapped by promptflow.tracing.TracedAsyncIterator. Each run from data files errors with TypeError: cannot pickle 'thread.lock' object

How To Reproduce the bug Code below produces error consistently

import tempfile

from openai import AzureOpenAI
from promptflow.tracing import trace
from promptflow.core import AzureOpenAIModelConfiguration, Prompty
from promptflow.client import PFClient

class ChatFlow:
    def __init__(
        self, model_config: AzureOpenAIModelConfiguration
    ):
        self.model_config = model_config

    @trace
    async def __call__(
        self,
        topic: str,
    ) -> str:
        """Flow entry function."""

        client = AzureOpenAI(
            azure_endpoint=self.model_config.azure_endpoint,
            api_key=self.model_config.api_key,
            api_version=self.model_config.api_version,
        )

        response = client.chat.completions.create(
            model=self.model_config.azure_deployment,
            messages = [
                {"role": "system", "content": "Create a story about the topic provided by the user"},
                {"role": "user", "content": f"Tell me a story about {topic}"},
            ],
            max_tokens=150,
        )

        for chunk in response:
            if len(chunk.choices) > 0 and (message := chunk.choices[0].message):
                content = message.content
                yield content + "\n"

def main():
    f = tempfile.NamedTemporaryFile(suffix=".csv", mode="w+t")
    try:
        f.write("topic\nlittle league\n")
        f.seek(0)
        config = AzureOpenAIModelConfiguration(
            connection="aoai_connection", azure_deployment="gpt-35-turbo"
        )
        chat_flow = ChatFlow(model_config=config)
        result = PFClient().run(chat_flow, data=f.name)
    finally:
        f.delete()

if __name__ == "__main__":
    main()

Bug can be traced to run_info submitted to queue at

https://github.com/microsoft/promptflow/blob/745704a5b7f868c61c71f7a12eb13ef695ab4333/src/promptflow-core/promptflow/storage/_queue_run_storage.py#L24

For the example code, the results property within run_info has an instance of promptflow.tracing.TracedAsyncIterator which is not able to be pickled when submitted to multiprocessing queue and raises the mentioned error.

Error file from batch run attached error.json

Expected behavior Successful execution of batch run

Running Information(please complete the following information):

guming-learning commented 1 month ago

Hi @bwilliams2 , it seems that you forgot to specify "streaming" to true in completion API, and the method to iterate over result is not right. Please try change your code to the following:

        response = client.chat.completions.create(
            model="gpt-35-turbo",
            messages=[
                {"role": "system", "content": "Create a story about the topic provided by the user"},
                {"role": "user", "content": f"Tell me a story about {topic}"},
            ],
            max_tokens=150,
            stream=True
        )

        for chunk in response:
            print(f"chunk: {chunk}")
            if len(chunk.choices) > 0 and (message := chunk.choices[0].delta.content):
                yield str(message)
bwilliams2 commented 1 month ago

@guming-learning

I updated with these changes and get the exact same error. I don't believe the function ever gets executed because of the pickling error in the original issue. It seems that the promptflow batch run cannot be used with streaming flows.