iterative / datachain

AI-data warehouse to enrich, transform and analyze unstructured data
https://docs.datachain.ai
Apache License 2.0
1.94k stars 89 forks source link

If UDF execution fails, SQLite database closes #160

Closed volkfox closed 3 months ago

volkfox commented 3 months ago

Description

This is a copy of iterative/dvcx#1663 from dvcx. Raising priority because of the frequent occurence

Description

Let us assume we have a wrong API key to simulate a UDF error. Now execute the following code:

import os
import pandas as pd
from datachain.lib.feature import Feature
from mistralai.client import MistralClient
from mistralai.models.chat_completion import ChatMessage
from datachain.lib.dc import Column, DataChain            

source = "gs://datachain-demo/chatbot-KiT/"           
PROMPT = "Was this dialog successful? Describe the 'result' as 'Yes' or 'No' in a short JSON"

model = "mistral-large-latest"
api_key = os.environ["MISTRAL_API_KEY"]

# TODO: parallelize w/same client? cannot pickle 'SSLContext'

chain = DataChain.from_storage(source)                       \
              .filter(Column("file.name").glob("*.txt"))     \
              .limit(5)                                      \
              .settings(cache=True, parallel = 5)            \
              .map(
                      mistral_response = lambda file:            \
                   MistralClient(api_key=api_key)        \
                        .chat(
                            model=model,
                                                    response_format={"type": "json_object"},
                                    messages= [                          
                                 ChatMessage(role="user", content=f"{PROMPT}: {file.get_value()}") 
                            ]                                                       
                             ).choices[0].message.content, 
                   )

try:
   print(chain.select("mistral_response").results())
except Exception as e:
   print(f"do you have the right Mistral API key? {e}")

As expected, the UDFs will fail:

....
datachain.lib.utils.DataChainError: Error in user code in class 'Mapper': Status: 401. Message: {
  "message":"Unauthorized",
  "request_id":"8dd5091032fd6d4e706aee4e9fdc614b"
}
do you have the right Mistral API key? UDF Execution Failed!

However, the runtime failure renders entire chain unusable due to a closed database:

>>> chain.select("mistral_response").results()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/dkh/dvcx/src/datachain/query/dataset.py", line 1287, in results
    with self.as_iterable(**kwargs) as result:
  File "/usr/local/Cellar/python@3.9/3.9.4/Frameworks/Python.framework/Versions/3.9/lib/python3.9/contextlib.py", line 117, in __enter__
    return next(self.gen)
  File "/Users/dkh/dvcx/src/datachain/query/dataset.py", line 1296, in as_iterable
    query = self.apply_steps().select()
  File "/Users/dkh/dvcx/src/datachain/query/dataset.py", line 1239, in apply_steps
    result = step.apply(
  File "/Users/dkh/dvcx/src/datachain/query/dataset.py", line 687, in apply
    query, tables = self.process_input_query(query)
  File "/Users/dkh/dvcx/src/datachain/query/dataset.py", line 737, in process_input_query
    table = self.create_pre_udf_table(query)
  File "/Users/dkh/dvcx/src/datachain/query/dataset.py", line 717, in create_pre_udf_table
    table = self.catalog.warehouse.create_udf_table(self.udf_table_name(), columns)
  File "/Users/dkh/dvcx/src/datachain/data_storage/warehouse.py", line 890, in create_udf_table
    self.db.create_table(tbl, if_not_exists=True)
  File "/Users/dkh/dvcx/src/datachain/data_storage/sqlite.py", line 208, in create_table
    self.execute(CreateTable(table, if_not_exists=if_not_exists))
  File "/Users/dkh/dvcx/src/datachain/data_storage/sqlite.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/Users/dkh/dvcx/src/datachain/data_storage/sqlite.py", line 149, in execute
    result = self.db.execute(*self.compile_to_args(query))
sqlite3.ProgrammingError: Cannot operate on a closed database.

The workaround is to define the chain again.

Version Info

(clean-datachain) DN0a1e26a5:dvcx dkh$ datachain -V; python -V
0.1.14.dev8+gd94054a
Python 3.9.4

Version Info

0.2.6.dev4+gef2347f
dmpetrov commented 3 months ago

@dtulga please take a look. It seems related to the sessions issue you are working on.