Eventual-Inc / Daft

Distributed data engine for Python/SQL designed for the cloud, powered by Rust
https://getdaft.io
Apache License 2.0
2.3k stars 159 forks source link

Cannot write fixed size list (and by extension, fixed size tensors and fixed size images) with null values to Parquet #2839

Open jaychia opened 1 month ago

jaychia commented 1 month ago

Describe the bug

To reproduce:

import daft
import numpy as np

df = daft.from_pydict({"x": [np.array([1, 2, 3]), None, np.array([1, 2, 3])]})
df = df.with_column("y", df["x"].cast(daft.DataType.fixed_size_list((3,), daft.DataType.int64())))
df.write_parquet("foo")

Related error: https://github.com/apache/arrow/issues/35697

jaychia commented 1 month ago
---------------------------------------------------------------------------
ArrowNotImplementedError                  Traceback (most recent call last)
Cell In[3], line 6
      4 df = daft.from_pydict({"x": [np.array([1, 2, 3]), None, np.array([1, 2, 3])]})
      5 df = df.with_column("y", df["x"].cast(daft.DataType.fixed_size_list(daft.DataType.int64(), 3)))
----> 6 df.write_parquet("foo")

File ~/code/venv-demo/lib/python3.10/site-packages/daft/api_annotations.py:26, in DataframePublicAPI.<locals>._wrap(*args, **kwargs)
     24 type_check_function(func, *args, **kwargs)
     25 timed_method = time_df_method(func)
---> 26 return timed_method(*args, **kwargs)

File ~/code/venv-demo/lib/python3.10/site-packages/daft/analytics.py:202, in time_df_method.<locals>.tracked_method(*args, **kwargs)
    200 start = time.time()
    201 try:
--> 202     result = method(*args, **kwargs)
    203 except Exception as e:
    204     _ANALYTICS_CLIENT.track_df_method_call(
    205         method_name=method.__name__, duration_seconds=time.time() - start, error=str(type(e).__name__)
    206     )

File ~/code/venv-demo/lib/python3.10/site-packages/daft/dataframe/dataframe.py:541, in DataFrame.write_parquet(self, root_dir, compression, partition_cols, io_config)
    539 # Block and write, then retrieve data
    540 write_df = DataFrame(builder)
--> 541 write_df.collect()
    542 assert write_df._result is not None
    544 if len(write_df) > 0:
    545     # Populate and return a new disconnected DataFrame

File ~/code/venv-demo/lib/python3.10/site-packages/daft/api_annotations.py:26, in DataframePublicAPI.<locals>._wrap(*args, **kwargs)
     24 type_check_function(func, *args, **kwargs)
     25 timed_method = time_df_method(func)
---> 26 return timed_method(*args, **kwargs)

File ~/code/venv-demo/lib/python3.10/site-packages/daft/analytics.py:202, in time_df_method.<locals>.tracked_method(*args, **kwargs)
    200 start = time.time()
    201 try:
--> 202     result = method(*args, **kwargs)
    203 except Exception as e:
    204     _ANALYTICS_CLIENT.track_df_method_call(
    205         method_name=method.__name__, duration_seconds=time.time() - start, error=str(type(e).__name__)
    206     )

File ~/code/venv-demo/lib/python3.10/site-packages/daft/dataframe/dataframe.py:2346, in DataFrame.collect(self, num_preview_rows)
   2333 @DataframePublicAPI
   2334 def collect(self, num_preview_rows: Optional[int] = 8) -> "DataFrame":
   2335     """Executes the entire DataFrame and materializes the results
   2336 
   2337     .. NOTE::
   (...)
   2344         DataFrame: DataFrame with materialized results.
   2345     """
-> 2346     self._materialize_results()
   2348     assert self._result is not None
   2349     dataframe_len = len(self._result)

File ~/code/venv-demo/lib/python3.10/site-packages/daft/dataframe/dataframe.py:2328, in DataFrame._materialize_results(self)
   2326 context = get_context()
   2327 if self._result is None:
-> 2328     self._result_cache = context.runner().run(self._builder)
   2329     result = self._result
   2330     assert result is not None

File ~/code/venv-demo/lib/python3.10/site-packages/daft/runners/pyrunner.py:250, in PyRunner.run(self, builder)
    249 def run(self, builder: LogicalPlanBuilder) -> PartitionCacheEntry:
--> 250     results = list(self.run_iter(builder))
    252     result_pset = LocalPartitionSet()
    253     for i, result in enumerate(results):

File ~/code/venv-demo/lib/python3.10/site-packages/daft/runners/pyrunner.py:314, in PyRunner.run_iter(self, builder, results_buffer_size)
    312 with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"):
    313     results_gen = self._physical_plan_to_partitions(tasks)
--> 314     yield from results_gen

File ~/code/venv-demo/lib/python3.10/site-packages/daft/runners/pyrunner.py:463, in PyRunner._physical_plan_to_partitions(self, plan)
    461 for done_future in done_set:
    462     done_task = local_futures_to_task.pop(done_future)
--> 463     materialized_results = done_future.result()
    465     pbar.mark_task_done(done_task)
    466     del self._inflight_futures[done_task.id()]

File ~/.pyenv/versions/3.10.8/lib/python3.10/concurrent/futures/_base.py:451, in Future.result(self, timeout)
    449     raise CancelledError()
    450 elif self._state == FINISHED:
--> 451     return self.__get_result()
    453 self._condition.wait(timeout)
    455 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/.pyenv/versions/3.10.8/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

File ~/.pyenv/versions/3.10.8/lib/python3.10/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)

File ~/code/venv-demo/lib/python3.10/site-packages/daft/runners/pyrunner.py:528, in PyRunner.build_partitions(self, instruction_stack, partitions, final_metadata)
    521 def build_partitions(
    522     self,
    523     instruction_stack: list[Instruction],
    524     partitions: list[MicroPartition],
    525     final_metadata: list[PartialPartitionMetadata],
    526 ) -> list[MaterializedResult[MicroPartition]]:
    527     for instruction in instruction_stack:
--> 528         partitions = instruction.run(partitions)
    530     results: list[MaterializedResult[MicroPartition]] = [
    531         PyMaterializedResult(part, PartitionMetadata.from_table(part).merge_with_partial(partial))
    532         for part, partial in zip(partitions, final_metadata)
    533     ]
    534     return results

File ~/code/venv-demo/lib/python3.10/site-packages/daft/execution/execution_step.py:355, in WriteFile.run(self, inputs)
    354 def run(self, inputs: list[MicroPartition]) -> list[MicroPartition]:
--> 355     return self._write_file(inputs)

File ~/code/venv-demo/lib/python3.10/site-packages/daft/execution/execution_step.py:359, in WriteFile._write_file(self, inputs)
    357 def _write_file(self, inputs: list[MicroPartition]) -> list[MicroPartition]:
    358     [input] = inputs
--> 359     partition = self._handle_file_write(
    360         input=input,
    361     )
    362     return [partition]

File ~/code/venv-demo/lib/python3.10/site-packages/daft/execution/execution_step.py:374, in WriteFile._handle_file_write(self, input)
    373 def _handle_file_write(self, input: MicroPartition) -> MicroPartition:
--> 374     return table_io.write_tabular(
    375         input,
    376         path=self.root_dir,
    377         schema=self.schema,
    378         file_format=self.file_format,
    379         compression=self.compression,
    380         partition_cols=self.partition_cols,
    381         io_config=self.io_config,
    382     )

File ~/code/venv-demo/lib/python3.10/site-packages/daft/table/table_io.py:490, in write_tabular(table, file_format, path, schema, partition_cols, compression, io_config, partition_null_fallback)
    487         visited_paths.append(written_file.path)
    488         partition_idx.append(i)
--> 490     _write_tabular_arrow_table(
    491         arrow_table=arrow_table,
    492         schema=arrow_table.schema,
    493         full_path=full_path,
    494         format=format,
    495         opts=opts,
    496         fs=fs,
    497         rows_per_file=rows_per_file,
    498         rows_per_row_group=rows_per_row_group,
    499         create_dir=is_local_fs,
    500         file_visitor=file_visitor,
    501     )
    503 data_dict: dict[str, Any] = {
    504     schema.column_names()[0]: Series.from_pylist(visited_paths, name=schema.column_names()[0]).cast(
    505         DataType.string()
    506     )
    507 }
    509 if partition_values is not None:

File ~/code/venv-demo/lib/python3.10/site-packages/daft/table/table_io.py:886, in _write_tabular_arrow_table(arrow_table, schema, full_path, format, opts, fs, rows_per_file, rows_per_row_group, create_dir, file_visitor, version)
    883     ERROR_MSGS = ("InvalidPart", "curlCode: 28, Timeout was reached")
    884     return isinstance(e, OSError) and any(err_str in str(e) for err_str in ERROR_MSGS)
--> 886 _retry_with_backoff(
    887     write_dataset,
    888     full_path,
    889     retry_error=retry_error,
    890 )

File ~/code/venv-demo/lib/python3.10/site-packages/daft/table/table_io.py:819, in _retry_with_backoff(func, path, retry_error, num_tries, jitter_ms, max_backoff_ms)
    817 for attempt in range(num_tries):
    818     try:
--> 819         return func()
    820     except Exception as e:
    821         if retry_error(e):

File ~/code/venv-demo/lib/python3.10/site-packages/daft/table/table_io.py:867, in _write_tabular_arrow_table.<locals>.write_dataset()
    866 def write_dataset():
--> 867     pads.write_dataset(
    868         arrow_table,
    869         schema=schema,
    870         base_dir=full_path,
    871         basename_template=basename_template,
    872         format=format,
    873         partitioning=None,
    874         file_options=opts,
    875         file_visitor=file_visitor,
    876         use_threads=True,
    877         existing_data_behavior="overwrite_or_ignore",
    878         filesystem=fs,
    879         **kwargs,
    880     )

File ~/code/venv-demo/lib/python3.10/site-packages/pyarrow/dataset.py:1018, in write_dataset(data, base_dir, basename_template, format, partitioning, partitioning_flavor, schema, filesystem, file_options, use_threads, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group, file_visitor, existing_data_behavior, create_dir)
   1015         raise ValueError("Cannot specify a schema when writing a Scanner")
   1016     scanner = data
-> 1018 _filesystemdataset_write(
   1019     scanner, base_dir, basename_template, filesystem, partitioning,
   1020     file_options, max_partitions, file_visitor, existing_data_behavior,
   1021     max_open_files, max_rows_per_file,
   1022     min_rows_per_group, max_rows_per_group, create_dir
   1023 )

File ~/code/venv-demo/lib/python3.10/site-packages/pyarrow/_dataset.pyx:3928, in pyarrow._dataset._filesystemdataset_write()

File ~/code/venv-demo/lib/python3.10/site-packages/pyarrow/error.pxi:91, in pyarrow.lib.check_status()

ArrowNotImplementedError: Lists with non-zero length null components are not supported