iterative / datachain

AI-data warehouse to enrich, transform and analyze unstructured data
https://docs.datachain.ai
Apache License 2.0
1.94k stars 89 forks source link

Parallel execution: Can't pickle #20

Closed dmpetrov closed 4 months ago

dmpetrov commented 4 months ago

Description

The code below fails in parallel mode (works with no parallelizm).

We had this issue several times 🙁 - iterative/dvcx#1620

import os

from mistralai.client import MistralClient
from mistralai.models.chat_completion import ChatMessage, ChatCompletionResponse

from datachain.lib.dc import DataChain, Column

MistralResponse = pydantic_to_feature(ChatCompletionResponse)

PROMPT = "Was this bot dialog successful? Describe the 'result' as 'Yes' or 'No' in a short JSON"

model = "mistral-large-latest"
api_key = os.environ["MISTRAL_API_KEY"]

chain = (
    DataChain.from_storage("gs://datachain-demo/chatbot-KiT/")
    .limit(5)
    .settings(cache=True, parallel=4)
    .map(
        mistral_response=lambda file: MistralResponse(
            **MistralClient(api_key=api_key).chat(
                    model=model,
                    response_format={"type": "json_object"},
                    messages=[
                        ChatMessage(role="user", content=f"{PROMPT}: {file.get_value()}")
                    ],
                ).dict()
        ),
        output=MistralResponse
    )
    .save()
)

print(chain.select("mistral_response").collect())

Error:

...
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
    ^^^^^^^^^^^^
  File "/Users/dmitry/venv/venv-py311/lib/python3.11/site-packages/dill/_dill.py", line 1869, in save_type
    StockPickler.save_global(pickler, obj, name=obj_name)
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/pickle.py", line 1071, in save_global
    raise PicklingError(
_pickle.PicklingError: Can't pickle <class 'datachain.lib.feature_utils.FunctionCall'>: it's not found as datachain.lib.feature_utils.FunctionCall

Version Info

0.1.dev3+gb377a5b
Python 3.9.16

3.11 also fails. `main` branch - the same.
dmpetrov commented 4 months ago

CC @amritghimire

dmpetrov commented 4 months ago

note, this code uses dynamic schema

amritghimire commented 4 months ago

At this point with all this issue, we should probably plan for moving away from pydantic issue if possible in my opinion. I am not sure how to identify the dynamic object during parsing stage if it is feature class or not

amritghimire commented 4 months ago

cc. @dtulga Were you able to figure out any alternatives to make these pickle friendly?

dmpetrov commented 4 months ago

If that helps - all dynamic Feature classes are registered in the cache in feature_utils.py:

feature_cache: dict[type[BaseModel], type[Feature]] = {}
dmpetrov commented 4 months ago

@dtulga Were you able to figure out any alternatives to make these pickle friendly?

That would be great if we can find some solution.

Also, I was thinking... we can try re-creating all dynamic Feature classes in each of the thread/process. I'll try this on the weekends.

dmpetrov commented 4 months ago

we can try re-creating all dynamic Feature classes in each of the thread/process. I'll try this on the weekends.

Well... re-creating the objects should not be a problem. However, it's not clear how to delete the objects before deserialization 😅 Cleaning up the cache is not enough - the objects can be created in user code.

Is there a way to specify for pickle/dill not to serialize particular type of objects (subclasses of Feature)?

amritghimire commented 4 months ago

Is there a way to specify for pickle/dill not to serialize particular type of objects (subclasses of Feature)?

I dont think it is possible implicitly but we can determine it and make sure it is not pickled when forming the udf_info that is passed as pickled.

dmpetrov commented 4 months ago

Closed by #45