Eventual-Inc / Daft

Distributed data engine for Python/SQL designed for the cloud, powered by Rust
https://getdaft.io
Apache License 2.0
2.35k stars 166 forks source link

the new csv reader is unstable #3307

Closed djouallah closed 5 hours ago

djouallah commented 6 days ago

Describe the bug

a pipeline using daft has stopped working , no idea what change

To Reproduce

I don't know, if this useful

OSError                                   Traceback (most recent call last)
Cell In[65], line 5
      3 list_files=[os.path.basename(x) for x in glob.glob(Destination+'*.CSV')]
      4 files_to_upload_full_Path = [Destination + i for i in list_files][:total_files]
----> 5 run_test(engine,max_chunk,files_to_upload_full_Path)

Cell In[62], line 8, in run_test(engine, chunk_len, files_to_upload_full_Path)
      6 chunk = files_to_upload_full_Path[i:i + chunk_len]
      7 start = time.time()
----> 8 eval(f"{engine}_clean_csv(chunk)")
      9 print(f'{engine} :' + str(time.time()-start))
     10 appended_data.append(pd.DataFrame([[start_time,engine,i,total_files,time.time()-start]], columns=results.columns))

File <string>:1

Cell In[44], line 31, in daft_clean_csv(files_to_upload_full_Path)
     29 df = df.with_column('DATE', col('SETTLEMENTDATE').cast(DataType.date()))
     30 df = df.with_column('year', col('SETTLEMENTDATE').dt.year())
---> 31 df.write_deltalake(f"/lakehouse/default/Tables/T{total_files}/daft", mode="overwrite" ,  partition_cols=["year"],allow_unsafe_rename=True)
     32 return "done"

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/api_annotations.py:26, in DataframePublicAPI.<locals>._wrap(*args, **kwargs)
     24 type_check_function(func, *args, **kwargs)
     25 timed_method = time_df_method(func)
---> 26 return timed_method(*args, **kwargs)

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/analytics.py:199, in time_df_method.<locals>.tracked_method(*args, **kwargs)
    197 start = time.time()
    198 try:
--> 199     result = method(*args, **kwargs)
    200 except Exception as e:
    201     _ANALYTICS_CLIENT.track_df_method_call(
    202         method_name=method.__name__, duration_seconds=time.time() - start, error=str(type(e).__name__)
    203     )

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/dataframe/dataframe.py:929, in DataFrame.write_deltalake(self, table, partition_cols, mode, schema_mode, name, description, configuration, custom_metadata, dynamo_table_name, allow_unsafe_rename, io_config)
    920 builder = self._builder.write_deltalake(
    921     table_uri,
    922     mode,
   (...)
    926     partition_cols=partition_cols,
    927 )
    928 write_df = DataFrame(builder)
--> 929 write_df.collect()
    931 write_result = write_df.to_pydict()
    932 assert "add_action" in write_result

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/api_annotations.py:26, in DataframePublicAPI.<locals>._wrap(*args, **kwargs)
     24 type_check_function(func, *args, **kwargs)
     25 timed_method = time_df_method(func)
---> 26 return timed_method(*args, **kwargs)

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/analytics.py:199, in time_df_method.<locals>.tracked_method(*args, **kwargs)
    197 start = time.time()
    198 try:
--> 199     result = method(*args, **kwargs)
    200 except Exception as e:
    201     _ANALYTICS_CLIENT.track_df_method_call(
    202         method_name=method.__name__, duration_seconds=time.time() - start, error=str(type(e).__name__)
    203     )

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/dataframe/dataframe.py:2535, in DataFrame.collect(self, num_preview_rows)
   2522 @DataframePublicAPI
   2523 def collect(self, num_preview_rows: Optional[int] = 8) -> "DataFrame":
   2524     """Executes the entire DataFrame and materializes the results
   2525 
   2526     .. NOTE::
   (...)
   2533         DataFrame: DataFrame with materialized results.
   2534     """
-> 2535     self._materialize_results()
   2537     assert self._result is not None
   2538     dataframe_len = len(self._result)

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/dataframe/dataframe.py:2517, in DataFrame._materialize_results(self)
   2515 context = get_context()
   2516 if self._result is None:
-> 2517     self._result_cache = context.get_or_create_runner().run(self._builder)
   2518     result = self._result
   2519     assert result is not None

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/runners/pyrunner.py:329, in PyRunner.run(self, builder)
    328 def run(self, builder: LogicalPlanBuilder) -> PartitionCacheEntry:
--> 329     results = list(self.run_iter(builder))
    331     result_pset = LocalPartitionSet()
    332     for i, result in enumerate(results):

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/runners/pyrunner.py:396, in PyRunner.run_iter(self, builder, results_buffer_size)
    394 with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"):
    395     results_gen = self._physical_plan_to_partitions(execution_id, tasks)
--> 396     yield from results_gen

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/runners/pyrunner.py:570, in PyRunner._physical_plan_to_partitions(self, execution_id, plan)
    568 for done_future in done_set:
    569     done_task = local_futures_to_task.pop(done_future)
--> 570     materialized_results = done_future.result()
    572     pbar.mark_task_done(done_task)
    573     del self._inflight_futures[(execution_id, done_task.id())]

File ~/cluster-env/trident_env/lib/python3.11/concurrent/futures/_base.py:449, in Future.result(self, timeout)
    447     raise CancelledError()
    448 elif self._state == FINISHED:
--> 449     return self.__get_result()
    451 self._condition.wait(timeout)
    453 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/cluster-env/trident_env/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self)
    399 if self._exception:
    400     try:
--> 401         raise self._exception
    402     finally:
    403         # Break a reference cycle with the exception in self._exception
    404         self = None

File ~/cluster-env/trident_env/lib/python3.11/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/runners/pyrunner.py:609, in PyRunner.build_partitions(self, instruction_stack, partitions, final_metadata)
    602 def build_partitions(
    603     self,
    604     instruction_stack: list[Instruction],
    605     partitions: list[MicroPartition],
    606     final_metadata: list[PartialPartitionMetadata],
    607 ) -> list[MaterializedResult[MicroPartition]]:
    608     for instruction in instruction_stack:
--> 609         partitions = instruction.run(partitions)
    611     results: list[MaterializedResult[MicroPartition]] = [
    612         LocalMaterializedResult(part, PartitionMetadata.from_table(part).merge_with_partial(partial))
    613         for part, partial in zip(partitions, final_metadata)
    614     ]
    615     return results

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/execution/execution_step.py:443, in WriteDeltaLake.run(self, inputs)
    442 def run(self, inputs: list[MicroPartition]) -> list[MicroPartition]:
--> 443     return self._write_deltalake(inputs)

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/execution/execution_step.py:447, in WriteDeltaLake._write_deltalake(self, inputs)
    445 def _write_deltalake(self, inputs: list[MicroPartition]) -> list[MicroPartition]:
    446     [input] = inputs
--> 447     partition = self._handle_file_write(
    448         input=input,
    449     )
    450     return [partition]

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/execution/execution_step.py:462, in WriteDeltaLake._handle_file_write(self, input)
    461 def _handle_file_write(self, input: MicroPartition) -> MicroPartition:
--> 462     return table_io.write_deltalake(
    463         input,
    464         large_dtypes=self.large_dtypes,
    465         base_path=self.base_path,
    466         version=self.version,
    467         partition_cols=self.partition_cols,
    468         io_config=self.io_config,
    469     )

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/table/table_io.py:641, in write_deltalake(table, large_dtypes, base_path, version, partition_cols, io_config)
    638     target_row_groups = max(math.ceil(size_bytes / target_row_group_size / inflation_factor), 1)
    639     rows_per_row_group = max(min(math.ceil(num_rows / target_row_groups), rows_per_file), 1)
--> 641     _write_tabular_arrow_table(
    642         arrow_table=part_table,
    643         schema=None,
    644         full_path=part_path,
    645         format=format,
    646         opts=opts,
    647         fs=fs,
    648         rows_per_file=rows_per_file,
    649         rows_per_row_group=rows_per_row_group,
    650         create_dir=is_local_fs,
    651         file_visitor=visitors.visitor(part_values),
    652         version=version,
    653     )
    655 return visitors.to_metadata()

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/table/table_io.py:765, in _write_tabular_arrow_table(arrow_table, schema, full_path, format, opts, fs, rows_per_file, rows_per_row_group, create_dir, file_visitor, version)
    762     ERROR_MSGS = ("InvalidPart", "curlCode: 28, Timeout was reached")
    763     return isinstance(e, OSError) and any(err_str in str(e) for err_str in ERROR_MSGS)
--> 765 _retry_with_backoff(
    766     write_dataset,
    767     full_path,
    768     retry_error=retry_error,
    769 )

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/table/table_io.py:696, in _retry_with_backoff(func, path, retry_error, num_tries, jitter_ms, max_backoff_ms)
    694 for attempt in range(num_tries):
    695     try:
--> 696         return func()
    697     except Exception as e:
    698         if retry_error(e):

File ~/cluster-env/trident_env/lib/python3.11/site-packages/daft/table/table_io.py:746, in _write_tabular_arrow_table.<locals>.write_dataset()
    745 def write_dataset():
--> 746     pads.write_dataset(
    747         arrow_table,
    748         schema=schema,
    749         base_dir=full_path,
    750         basename_template=basename_template,
    751         format=format,
    752         partitioning=None,
    753         file_options=opts,
    754         file_visitor=file_visitor,
    755         use_threads=True,
    756         existing_data_behavior="overwrite_or_ignore",
    757         filesystem=fs,
    758         **kwargs,
    759     )

File ~/cluster-env/trident_env/lib/python3.11/site-packages/pyarrow/dataset.py:1018, in write_dataset(data, base_dir, basename_template, format, partitioning, partitioning_flavor, schema, filesystem, file_options, use_threads, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group, file_visitor, existing_data_behavior, create_dir)
   1015         raise ValueError("Cannot specify a schema when writing a Scanner")
   1016     scanner = data
-> 1018 _filesystemdataset_write(
   1019     scanner, base_dir, basename_template, filesystem, partitioning,
   1020     file_options, max_partitions, file_visitor, existing_data_behavior,
   1021     max_open_files, max_rows_per_file,
   1022     min_rows_per_group, max_rows_per_group, create_dir
   1023 )

File ~/cluster-env/trident_env/lib/python3.11/site-packages/pyarrow/_dataset.pyx:3919, in pyarrow._dataset._filesystemdataset_write()

File ~/cluster-env/trident_env/lib/python3.11/site-packages/pyarrow/types.pxi:88, in pyarrow.lib._datatype_to_pep3118()

OSError: Input/output error (os error 5)

Expected behavior

No response

Component(s)

Python Runner

Additional context

No response

jaychia commented 6 days ago

Hi @djouallah!

Were there any changes here that could have triggered this regression in behavior of your data pipeline? I'm curious specifically about:

  1. Version of getdaft
  2. Version of pyarrow
  3. Any environmental changes in your storage, credentials etc
djouallah commented 6 days ago

Hi @djouallah!

Were there any changes here that could have triggered this regression in behavior of your data pipeline? I'm curious specifically about:

  1. Version of getdaft
  2. Version of pyarrow
  3. Any environmental changes in your storage, credentials etc

I tried with multiple version of getdaft, same issue, it seems the runtime was upgraded to python 3.11 and pyarrow to version 14, I use deltalake 0.17.4 as the rust writer don't support writing in batch, it does load everything in memory

jaychia commented 6 days ago

Gotcha. I'm guessing that this isn't actually a regression with Daft given that switching the version of Daft doesn't solve it.

2 suggestions:

djouallah commented 6 days ago

actually you are right, it seems the issue is pyarrow 14, doing an update, fixed the issue

jaychia commented 5 days ago

Thanks for confirming @djouallah !

djouallah commented 2 days ago

after further testing, I think I find the issue, it seems the new parallel csv reader is a bit unstable when reading and writing to different partition, workaround for me is to pin daft version to 0.3.9

desmondcheongzx commented 7 hours ago

Hi @djouallah, sorry to hear about the issues with the new csv reader. It does max out concurrency, which maybe is hammering IO too hard leading to the issues that you see.

We're going to add some rate limiting like we're doing with the parallel parquet reader here which might alleviate this.

In the meantime I would like to make a repro to see if we can help you sooner. Could you describe the workload and data sizes that are leading to the errors?

djouallah commented 6 hours ago

same thile this but my data is around 2300 files not 60

https://colab.research.google.com/drive/1HRbkztwjAhHR6bAQQIsAlLPaZNqs9eVG#scrollTo=PccFouvE6N9w

djouallah commented 5 hours ago

maybe it is not daft issue after all, I am writing directly to abfss and it works great, maybe the issue is when using a mounted storage