Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] eval API errors out with UnexpectedError: Unexpected error occurred while executing the batch run. Error: (TypeError) cannot pickle '_thread.RLock' object. #3575

Öffnen Sie
yanggaome opened this issue Jul 23, 2024 · 8 comments
Assignees
Labels

Kommentare

@yanggaome
Copy link

Describe the bug
A clear and concise description of the bug.

    myTargetCallObj= MyTargetCallClass()
    credential = DefaultAzureCredential()
    content_safety_chat_evaluator = ContentSafetyChatEvaluator(project_scope=project_scope, credential=credential)
    results = evaluate(
        evaluation_name="test",
        data="absolute path to jsonl",
        target=myTargetCallObj,
        evaluators={"content_safety_chat" : content_safety_chat_evaluator},
        azure_ai_project=project_scope
    )
Traceback (most recent call last):
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/batch/_batch_engine.py", line 257, in run
    return async_run_allowing_running_loop(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_utils/async_utils.py", line 96, in async_run_allowing_running_loop
    return asyncio.run(_invoke_async_with_sigint_handler(async_func, *args, **kwargs))
  File "/anaconda/envs/azureml_py38/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/anaconda/envs/azureml_py38/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_utils/async_utils.py", line 65, in _invoke_async_with_sigint_handler
    return await async_func(*args, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/batch/_batch_engine.py", line 417, in _exec_in_task
    return task.result()
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/batch/_batch_engine.py", line 476, in _exec
    results, is_timeout = await self._executor_proxy._exec_batch(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_proxy/_python_executor_proxy.py", line 113, in _exec_batch
    with LineExecutionProcessPool(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/executor/_line_execution_process_pool.py", line 144, in __enter__
    self.start()
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/executor/_line_execution_process_pool.py", line 200, in start
    self._processes_manager.start_processes()
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/executor/_process_manager.py", line 302, in start_processes
    process.start()
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle '_thread.RLock' object

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
...
    results = evaluate(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_telemetry/__init__.py", line 111, in wrapper
    result = func(*args, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_evaluate.py", line 365, in evaluate
    raise e
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_evaluate.py", line 340, in evaluate
    return _evaluate(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_evaluate.py", line 401, in _evaluate
    input_data_df, target_generated_columns, target_run = _apply_target_to_data(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_evaluate.py", line 183, in _apply_target_to_data
    run = pf_client.run(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_pf_client.py", line 301, in run
    return self._run(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_pf_client.py", line 226, in _run
    return self.runs.create_or_update(run=run, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_telemetry/activity.py", line 265, in wrapper
    return f(self, *args, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/operations/_run_operations.py", line 135, in create_or_update
    created_run = RunSubmitter(client=self._client).submit(run=run, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 52, in submit
    task_results = [task.result() for task in tasks]
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 52, in <listcomp>
    task_results = [task.result() for task in tasks]
  File "/anaconda/envs/azureml_py38/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/anaconda/envs/azureml_py38/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/anaconda/envs/azureml_py38/lib/python3.9/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 134, in _run_bulk
    self._submit_bulk_run(flow=flow, run=run, local_storage=local_storage)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 221, in _submit_bulk_run
    raise e
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 187, in _submit_bulk_run
    batch_result = batch_engine.run(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/batch/_batch_engine.py", line 280, in run
    raise unexpected_error from e
promptflow._core._errors.UnexpectedError: Unexpected error occurred while executing the batch run. Error: (TypeError) cannot pickle '_thread.RLock' object.

How To Reproduce the bug
Steps to reproduce the behavior, how frequent can you experience the bug:
In the implementation of MyTargetCallClass I have init and call method defined.

In the init method, it creates an object from another class

class MyTargetCallClass():
  def __init__(self):
    self._clientA = ClientA()
    self._clientB = ClientB()
  def __call__(self, query):
   self._clientB.get_output(self._clientA)
   return {xxx}

In ClientB class init method, it creates another object from ClientC

Class ClientB:
  def __init__(self):
    self._clientC = ClientC()
  
  def doSomething(self):
    self._cleintC.someMethod()
  
  def get_output(self, client_a):
    xxx

Observations:

  1. first of all, if I don't call the evaluate API, just create my target class object, pass in a query, it works perfectly
  2. the above code with evaluate API will error out in batch run multi threading
  3. What I noticed is, if I create clientC inside the method doSomething (not in the init constructor), it will work
Class ClientB:
  def __init__(self):

  def doSomething(self):
    local_clientC = ClientC()
    local_clientC.someMethod()
  
def get_output(self, client_a):
    xxx

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Running Information(please complete the following information):

  • Promptflow Package Version using pf -v: [e.g. 0.0.102309906]
  • Operating System: [e.g. Ubuntu 20.04, Windows 11]
  • Python Version using python --version: [e.g. python==3.10.12]
    {
    "promptflow": "1.13.0",
    "promptflow-azure": "1.13.0",
    "promptflow-core": "1.13.0",
    "promptflow-devkit": "1.13.0",
    "promptflow-evals": "0.3.1",
    "promptflow-tracing": "1.13.0"
    }

Executable '/anaconda/envs/azureml_py38/bin/python'
Python (Linux) 3.9.19 | packaged by conda-forge | (main, Mar 20 2024, 12:50:21)
[GCC 12.3.0]

Additional context
Add any other context about the problem here.

@yanggaome yanggaome added the bug Something isn't working label Jul 23, 2024
@0mza987
Copy link
Contributor

0mza987 commented Jul 24, 2024

Looks like to be a similar issue with #3413
@guming-learning Could you please take a loot at this issue?

@guming-learning
Copy link
Contributor

Hi @yanggaome , please try setting environment variable "PF_BATCH_METHOD" to "spawn".
Promptflow by default uses fork to create new process to execute each line in batch run in Linux to save memory, which seems not working when you have generator in constructor.

@yanggaome
Copy link
Author

Hi @guming-learning , I tried this

export PF_BATCH_METHOD='spawn'
echo $PF_BATCH_METHOD
spawn

but still get the same error

promptflow._core._errors.UnexpectedError: Unexpected error occurred while executing the batch run. Error: (TypeError) cannot pickle '_thread.RLock' object.

@guming-learning
Copy link
Contributor

Hi @yanggaome , is it still the same error stack? Specifically, does this line still occurs in the error stack?
"""
File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/executor/_process_manager.py", line 302, in start_processes
process.start()
"""

@yanggaome
Copy link
Author

Hi @guming-learning , looks like the stacktrace is a bit different, it is at process manager, but a different line number.

Traceback (most recent call last):
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/batch/_batch_engine.py", line 257, in run
    return async_run_allowing_running_loop(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_utils/async_utils.py", line 96, in async_run_allowing_running_loop
    return asyncio.run(_invoke_async_with_sigint_handler(async_func, *args, **kwargs))
  File "/anaconda/envs/azureml_py38/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/anaconda/envs/azureml_py38/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_utils/async_utils.py", line 65, in _invoke_async_with_sigint_handler
    return await async_func(*args, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/batch/_batch_engine.py", line 417, in _exec_in_task
    return task.result()
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/batch/_batch_engine.py", line 476, in _exec
    results, is_timeout = await self._executor_proxy._exec_batch(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_proxy/_python_executor_proxy.py", line 113, in _exec_batch
    with LineExecutionProcessPool(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/executor/_line_execution_process_pool.py", line 144, in __enter__
    self.start()
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/executor/_line_execution_process_pool.py", line 200, in start
    self._processes_manager.start_processes()
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/executor/_process_manager.py", line 209, in start_processes
    self.new_process(i)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/executor/_process_manager.py", line 233, in new_process
    process.start()
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/anaconda/envs/azureml_py38/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle '_thread.RLock' object

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
...
    results = evaluate(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_telemetry/__init__.py", line 111, in wrapper
    result = func(*args, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_evaluate.py", line 365, in evaluate
    raise e
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_evaluate.py", line 340, in evaluate
    return _evaluate(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_evaluate.py", line 401, in _evaluate
    input_data_df, target_generated_columns, target_run = _apply_target_to_data(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/evals/evaluate/_evaluate.py", line 183, in _apply_target_to_data
    run = pf_client.run(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_pf_client.py", line 301, in run
    return self._run(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_pf_client.py", line 226, in _run
    return self.runs.create_or_update(run=run, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_telemetry/activity.py", line 265, in wrapper
    return f(self, *args, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/operations/_run_operations.py", line 135, in create_or_update
    created_run = RunSubmitter(client=self._client).submit(run=run, **kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 52, in submit
    task_results = [task.result() for task in tasks]
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 52, in <listcomp>
    task_results = [task.result() for task in tasks]
  File "/anaconda/envs/azureml_py38/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/anaconda/envs/azureml_py38/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/anaconda/envs/azureml_py38/lib/python3.9/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 134, in _run_bulk
    self._submit_bulk_run(flow=flow, run=run, local_storage=local_storage)
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 221, in _submit_bulk_run
    raise e
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/_sdk/_orchestrator/run_submitter.py", line 187, in _submit_bulk_run
    batch_result = batch_engine.run(
  File "/anaconda/envs/azureml_py38/lib/python3.9/site-packages/promptflow/batch/_batch_engine.py", line 280, in run
    raise unexpected_error from e
promptflow._core._errors.UnexpectedError: Unexpected error occurred while executing the batch run. Error: (TypeError) cannot pickle '_thread.RLock' object.

@Hhhilulu
Copy link
Contributor

Hi, @yanggaome ,
Can you provide the minimum repro code?

@yanggaome
Copy link
Author

Hi @Hhhilulu ,

This is user target call file and class definition, the endpoint/token provider/api version can be left as empty as below, it won't block the repro.

from openai import AzureOpenAI

class GPT:
    def __init__(self):
        self._client = AzureOpenAI(
            azure_endpoint="",
            azure_ad_token_provider="",
            api_version=""
        )

class ClientA:
    def __init__(self):
        """Init."""
        self._gpt_client = GPT() # this will cause error in pickle

class UserTargetCall:
    def __init__(self):
        """User implement logic."""

        self._clientA = ClientA()

    def __call__(self):
        """Call."""

        output = {"conversation": [
            {"role": "user", "content": "What is the value of 2 + 2?"}, {"role": "assistant", "content": "2 + 2 = 4"},
            {"role": "user", "content": "What is the value of 3 + 3?"}, {"role": "assistant", "content": "3 + 3 = 6"}
            ]
        }
        return output

This is the caller part code:

from UserTargetCall import UserTargetCall
from promptflow.evals.evaluate import evaluate
from promptflow.evals.evaluators import ContentSafetyChatEvaluator
from azure.identity import DefaultAzureCredential


if __name__ == '__main__':
    project_scope = {
        "subscription_id": "xxx",
        "resource_group_name": "xxx",
        "project_name": "xxx"
    }

    user_call = UserTargetCall()
    credential = DefaultAzureCredential()
    content_safety_chat_evaluator = ContentSafetyChatEvaluator(project_scope=project_scope, credential=credential)
    results = evaluate(
        evaluation_name="test",
        data="test.jsonl",
        target=user_call,
        evaluators={"content_safety_chat" : content_safety_chat_evaluator},
        azure_ai_project=project_scope
    )

and the test.jsonl only has one line:

{"query": "this is user query"}

@Hhhilulu
Copy link
Contributor

Hhhilulu commented Jul 26, 2024

Hi, @yanggaome,
Your target object cannot be pickled. We use multi-process mode to execute batch run, which requires that all parameters of the multi-process target function can be serialized, so this case is not supported. We recommend not initializing the client in the init function.

Currently, we don't support this usage.
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants