iterative / datachain

AI-data warehouse to enrich, transform and analyze unstructured data
https://docs.datachain.ai
Apache License 2.0
2.01k stars 91 forks source link

WDS examples fails #44

Closed dmpetrov closed 4 months ago

dmpetrov commented 4 months ago

Description

It works for like an hour and then fails:

$ python examples/wds.py
...
...

Generated: 327271 rows [12:19, 686.43 rows/s]
Generated: 327425 rows [12:19, 724.09 rows/s]
Processed: 1 rows [20:35, 1235.48s/ rows]s/s]
Generated: 505671 rows [20:30, 410.98 rows/s]
============== Error in user code: 'Mapper' ==============
Traceback (most recent call last):
  File "/Users/dmitry/src/datachain/src/datachain/lib/udf.py", line 39, in process
    return self._func(*args, **kwargs)
TypeError: <lambda>() missing 1 required positional argument: 'file'
==========================================================
Traceback (most recent call last):
  File "/Users/dmitry/src/datachain/examples/wds.py", line 34, in <module>
    df = res.limit(10).to_pandas()
  File "/Users/dmitry/src/datachain/src/datachain/query/dataset.py", line 1375, in to_pandas
    records = self.to_records()
  File "/Users/dmitry/src/datachain/src/datachain/query/dataset.py", line 1370, in to_records
    with self.as_iterable() as result:
  File "/usr/local/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/contextlib.py", line 119, in __enter__
    return next(self.gen)
  File "/Users/dmitry/src/datachain/src/datachain/query/dataset.py", line 1313, in as_iterable
    query = self.apply_steps().select()
  File "/Users/dmitry/src/datachain/src/datachain/query/dataset.py", line 1256, in apply_steps
    result = step.apply(
  File "/Users/dmitry/src/datachain/src/datachain/query/dataset.py", line 1021, in apply
    q2 = self.query2.apply_steps().select().subquery(self.query2.table.name)
  File "/Users/dmitry/src/datachain/src/datachain/query/dataset.py", line 1256, in apply_steps
    result = step.apply(
  File "/Users/dmitry/src/datachain/src/datachain/query/dataset.py", line 1019, in apply
    q1 = self.query1.apply_steps().select().subquery(self.query1.table.name)
  File "/Users/dmitry/src/datachain/src/datachain/query/dataset.py", line 1256, in apply_steps
    result = step.apply(
  File "/Users/dmitry/src/datachain/src/datachain/query/dataset.py", line 708, in apply
    self.populate_udf_table(udf_table, query)
  File "/Users/dmitry/src/datachain/src/datachain/query/dataset.py", line 572, in populate_udf_table
    process_udf_outputs(
  File "/Users/dmitry/src/datachain/src/datachain/query/dataset.py", line 398, in process_udf_outputs
    for udf_output in udf_results:
  File "/Users/dmitry/src/datachain/src/datachain/query/dataset.py", line 441, in run_udf
    output = udf(catalog, batch, is_generator, cache, cb=download_cb)
  File "/Users/dmitry/src/datachain/src/datachain/query/udf.py", line 131, in __call__
    udf_outputs = self.func(*udf_inputs)
  File "/Users/dmitry/src/datachain/src/datachain/lib/udf.py", line 110, in __call__
    result_objs = self.process_safe(objs)
  File "/Users/dmitry/src/datachain/src/datachain/lib/udf.py", line 192, in process_safe
    raise DataChainError(
datachain.lib.utils.DataChainError: Error in user code in class 'Mapper': <lambda>() missing 1 required positional argument: 'file'

Version Info

0.1.dev3+gb377a5b
Python 3.9.16
dberenbaum commented 4 months ago

The fix for now is this:

diff --git a/examples/wds.py b/examples/wds.py
index 7bc2c7a..df3cc81 100644
--- a/examples/wds.py
+++ b/examples/wds.py
@@ -15,14 +15,14 @@ meta_emd = (
     DataChain.from_storage("gs://dvcx-datacomp-small/metadata")
     .filter(C.name.glob("0020f*.npz"))
     .gen(emd=process_laion_meta)
-    .map(stem=lambda file: file.get_file_stem(), params=["emd.file"], output=str)
+    .map(stem=lambda emd: emd.file.get_file_stem(), params=["emd"], output=str)
 )

 meta_pq = (
     DataChain.from_storage("gs://dvcx-datacomp-small/metadata")
     .filter(C.name.glob("0020f*.parquet"))
     .parse_parquet()
-    .map(stem=lambda file: file.get_file_stem(), params=["source.file"], output=str)
+    .map(stem=lambda source: source.file.get_file_stem(), params=["source"], output=str)
 )

 meta = meta_emd.merge(

The problem is that SignalSchema does (edit: not) handle nested signals like emd.file or source.file. It assumes a flat dict and looks up only top-level keys like emd or source.

dberenbaum commented 4 months ago

Simple reproducer:

from datachain.lib.dc import C, DataChain
source = "gs://dvcx-datalakes/dogs-and-cats/"
DataChain.from_storage(source).map(lambda x: x, params=["file.name"], output={"name": str}).show()