microsoft / promptflow

Build high-quality LLM apps - from prototyping, testing to production deployment and monitoring.
https://microsoft.github.io/promptflow/
MIT License
9.58k stars 878 forks source link

[BUG] eval API errors out with UnexpectedError: Unexpected error occurred while executing the batch run. Error: (TypeError) cannot pickle '_thread.RLock' object. #3575

Closed yanggaome closed 2 months ago

yanggaome commented 4 months ago

Describe the bug A clear and concise description of the bug.

    myTargetCallObj= MyTargetCallClass()
    credential = DefaultAzureCredential()
    content_safety_chat_evaluator = ContentSafetyChatEvaluator(project_scope=project_scope, credential=credential)
    results = evaluate(
        evaluation_name="test",
        data="absolute path to jsonl",
        target=myTargetCallObj,
        evaluators={"content_safety_chat" : content_safety_chat_evaluator},
        azure_ai_project=project_scope
    )
Traceback (most recent call last):
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/batch/_batch_engine.py", line 257, in run
    return async_run_allowing_running_loop(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_utils/async_utils.py", line 96, in async_run_allowing_running_loop
    return asyncio.run(_invoke_async_with_sigint_handler(async_func, *args, **kwargs))
  File "/anaconda/envs/azureml_py38/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/anaconda/envs/azureml_py38/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_utils/async_utils.py", line 65, in _invoke_async_with_sigint_handler
    return await async_func(*args, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/batch/_batch_engine.py", line 417, in _exec_in_task
    return task.result()
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/batch/_batch_engine.py", line 476, in _exec
    results, is_timeout = await self._executor_proxy._exec_batch(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_proxy/_python_executor_proxy.py", line 113, in _exec_batch
    with LineExecutionProcessPool(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/executor/_line_execution_process_pool.py", line 144, in __enter__
    self.start()
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/executor/_line_execution_process_pool.py", line 200, in start
    self._processes_manager.start_processes()
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/executor/_process_manager.py", line 302, in start_processes
    process.start()
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle '_thread.RLock' object

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
...
    results = evaluate(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_telemetry/__init__.py", line 111, in wrapper
    result = func(*args, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_evaluate.py", line 365, in evaluate
    raise e
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_evaluate.py", line 340, in evaluate
    return _evaluate(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_evaluate.py", line 401, in _evaluate
    input_data_df, target_generated_columns, target_run = _apply_target_to_data(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_evaluate.py", line 183, in _apply_target_to_data
    run = pf_client.run(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_pf_client.py", line 301, in run
    return self._run(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_pf_client.py", line 226, in _run
    return self.runs.create_or_update(run=run, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_telemetry/activity.py", line 265, in wrapper
    return f(self, *args, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/operations/_run_operations.py", line 135, in create_or_update
    created_run = RunSubmitter(client=self._client).submit(run=run, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 52, in submit
    task_results = [task.result() for task in tasks]
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 52, in <listcomp>
    task_results = [task.result() for task in tasks]
  File "/anaconda/envs/azureml_py38/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/anaconda/envs/azureml_py38/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/anaconda/envs/azureml_py38/lib/python3.9/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 134, in _run_bulk
    self._submit_bulk_run(flow=flow, run=run, local_storage=local_storage)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 221, in _submit_bulk_run
    raise e
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 187, in _submit_bulk_run
    batch_result = batch_engine.run(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/batch/_batch_engine.py", line 280, in run
    raise unexpected_error from e
promptflow._core._errors.UnexpectedError: Unexpected error occurred while executing the batch run. Error: (TypeError) cannot pickle '_thread.RLock' object.

How To Reproduce the bug Steps to reproduce the behavior, how frequent can you experience the bug: In the implementation of MyTargetCallClass I have init and call method defined.

In the init method, it creates an object from another class

class MyTargetCallClass():
  def __init__(self):
    self._clientA = ClientA()
    self._clientB = ClientB()
  def __call__(self, query):
   self._clientB.get_output(self._clientA)
   return {xxx}

In ClientB class init method, it creates another object from ClientC

Class ClientB:
  def __init__(self):
    self._clientC = ClientC()

  def doSomething(self):
    self._cleintC.someMethod()

  def get_output(self, client_a):
    xxx

Observations:

  1. first of all, if I don't call the evaluate API, just create my target class object, pass in a query, it works perfectly
  2. the above code with evaluate API will error out in batch run multi threading
  3. What I noticed is, if I create clientC inside the method doSomething (not in the init constructor), it will work

    
    Class ClientB:
    def __init__(self):
    
    def doSomething(self):
    local_clientC = ClientC()
    local_clientC.someMethod()

def get_output(self, client_a): xxx



**Expected behavior**
A clear and concise description of what you expected to happen.

**Screenshots**
If applicable, add screenshots to help explain your problem.

**Running Information(please complete the following information):**
 - Promptflow Package Version using `pf -v`: [e.g. 0.0.102309906]
 - Operating System: [e.g. Ubuntu 20.04, Windows 11]
 - Python Version using `python --version`: [e.g. python==3.10.12]
{
  "promptflow": "1.13.0",
  "promptflow-azure": "1.13.0",
  "promptflow-core": "1.13.0",
  "promptflow-devkit": "1.13.0",
  "promptflow-evals": "0.3.1",
  "promptflow-tracing": "1.13.0"
}

Executable '/anaconda/envs/azureml_py38/bin/python'
Python (Linux) 3.9.19 | packaged by conda-forge | (main, Mar 20 2024, 12:50:21) 
[GCC 12.3.0]

**Additional context**
Add any other context about the problem here.
0mza987 commented 4 months ago

Looks like to be a similar issue with https://github.com/microsoft/promptflow/issues/3413 @guming-learning Could you please take a loot at this issue?

guming-learning commented 4 months ago

Hi @yanggaome , please try setting environment variable "PF_BATCH_METHOD" to "spawn". Promptflow by default uses fork to create new process to execute each line in batch run in Linux to save memory, which seems not working when you have generator in constructor.

yanggaome commented 4 months ago

Hi @guming-learning , I tried this

export PF_BATCH_METHOD='spawn'
echo $PF_BATCH_METHOD
spawn

but still get the same error

promptflow._core._errors.UnexpectedError: Unexpected error occurred while executing the batch run. Error: (TypeError) cannot pickle '_thread.RLock' object.
guming-learning commented 4 months ago

Hi @yanggaome , is it still the same error stack? Specifically, does this line still occurs in the error stack? """ File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/executor/_process_manager.py", line 302, in start_processes process.start() """

yanggaome commented 4 months ago

Hi @guming-learning , looks like the stacktrace is a bit different, it is at process manager, but a different line number.

Traceback (most recent call last):
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/batch/_batch_engine.py", line 257, in run
    return async_run_allowing_running_loop(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_utils/async_utils.py", line 96, in async_run_allowing_running_loop
    return asyncio.run(_invoke_async_with_sigint_handler(async_func, *args, **kwargs))
  File "/anaconda/envs/azureml_py38/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/anaconda/envs/azureml_py38/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_utils/async_utils.py", line 65, in _invoke_async_with_sigint_handler
    return await async_func(*args, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/batch/_batch_engine.py", line 417, in _exec_in_task
    return task.result()
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/batch/_batch_engine.py", line 476, in _exec
    results, is_timeout = await self._executor_proxy._exec_batch(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_proxy/_python_executor_proxy.py", line 113, in _exec_batch
    with LineExecutionProcessPool(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/executor/_line_execution_process_pool.py", line 144, in __enter__
    self.start()
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/executor/_line_execution_process_pool.py", line 200, in start
    self._processes_manager.start_processes()
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/executor/_process_manager.py", line 209, in start_processes
    self.new_process(i)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/executor/_process_manager.py", line 233, in new_process
    process.start()
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle '_thread.RLock' object

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
...
    results = evaluate(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_telemetry/__init__.py", line 111, in wrapper
    result = func(*args, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_evaluate.py", line 365, in evaluate
    raise e
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_evaluate.py", line 340, in evaluate
    return _evaluate(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_evaluate.py", line 401, in _evaluate
    input_data_df, target_generated_columns, target_run = _apply_target_to_data(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_evaluate.py", line 183, in _apply_target_to_data
    run = pf_client.run(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_pf_client.py", line 301, in run
    return self._run(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_pf_client.py", line 226, in _run
    return self.runs.create_or_update(run=run, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_telemetry/activity.py", line 265, in wrapper
    return f(self, *args, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/operations/_run_operations.py", line 135, in create_or_update
    created_run = RunSubmitter(client=self._client).submit(run=run, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 52, in submit
    task_results = [task.result() for task in tasks]
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 52, in <listcomp>
    task_results = [task.result() for task in tasks]
  File "/anaconda/envs/azureml_py38/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/anaconda/envs/azureml_py38/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/anaconda/envs/azureml_py38/lib/python3.9/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 134, in _run_bulk
    self._submit_bulk_run(flow=flow, run=run, local_storage=local_storage)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 221, in _submit_bulk_run
    raise e
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 187, in _submit_bulk_run
    batch_result = batch_engine.run(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/batch/_batch_engine.py", line 280, in run
    raise unexpected_error from e
promptflow._core._errors.UnexpectedError: Unexpected error occurred while executing the batch run. Error: (TypeError) cannot pickle '_thread.RLock' object.
Hhhilulu commented 4 months ago

Hi, @yanggaome , Can you provide the minimum repro code?

yanggaome commented 4 months ago

Hi @Hhhilulu ,

This is user target call file and class definition, the endpoint/token provider/api version can be left as empty as below, it won't block the repro.

from openai import AzureOpenAI

class GPT:
    def __init__(self):
        self._client = AzureOpenAI(
            azure_endpoint="",
            azure_ad_token_provider="",
            api_version=""
        )

class ClientA:
    def __init__(self):
        """Init."""
        self._gpt_client = GPT() # this will cause error in pickle

class UserTargetCall:
    def __init__(self):
        """User implement logic."""

        self._clientA = ClientA()

    def __call__(self):
        """Call."""

        output = {"conversation": [
            {"role": "user", "content": "What is the value of 2 + 2?"}, {"role": "assistant", "content": "2 + 2 = 4"},
            {"role": "user", "content": "What is the value of 3 + 3?"}, {"role": "assistant", "content": "3 + 3 = 6"}
            ]
        }
        return output

This is the caller part code:

from UserTargetCall import UserTargetCall
from promptflow.evals.evaluate import evaluate
from promptflow.evals.evaluators import ContentSafetyChatEvaluator
from azure.identity import DefaultAzureCredential

if __name__ == '__main__':
    project_scope = {
        "subscription_id": "xxx",
        "resource_group_name": "xxx",
        "project_name": "xxx"
    }

    user_call = UserTargetCall()
    credential = DefaultAzureCredential()
    content_safety_chat_evaluator = ContentSafetyChatEvaluator(project_scope=project_scope, credential=credential)
    results = evaluate(
        evaluation_name="test",
        data="test.jsonl",
        target=user_call,
        evaluators={"content_safety_chat" : content_safety_chat_evaluator},
        azure_ai_project=project_scope
    )

and the test.jsonl only has one line:

{"query": "this is user query"}
Hhhilulu commented 4 months ago

Hi, @yanggaome, Your target object cannot be pickled. We use multi-process mode to execute batch run, which requires that all parameters of the multi-process target function can be serialized, so this case is not supported. We recommend not initializing the client in the init function.

Currently, we don't support this usage. image

github-actions[bot] commented 2 months ago

Hi, we're sending this friendly reminder because we haven't heard back from you in 30 days. We need more information about this issue to help address it. Please be sure to give us your input. If we don't hear back from you within 7 days of this comment, the issue will be automatically closed. Thank you!