fugue-project / fugue

A unified interface for distributed computing. Fugue executes SQL, Python, Pandas, and Polars code on Spark, Dask and Ray without any rewrites.
https://fugue-tutorials.readthedocs.io/
Apache License 2.0
1.92k stars 94 forks source link

[QUESTION] Unable to run Ray example from documentation #514

Closed juanitorduz closed 9 months ago

juanitorduz commented 9 months ago

Hi! I am trying to reproduce https://fugue-tutorials.readthedocs.io/tutorials/beginner/execution_engine.html locally with Ray:

import ray
import pandas as pd
from fugue import transform

ray.init(ignore_reinit_error=True)

df = pd.DataFrame({"col1": [1,2,3,4], "col2": [1,2,3,4]})

# schema: *, col3:int
def add_cols(df:pd.DataFrame) -> pd.DataFrame:
    return df.assign(col3 = df['col1'] + df['col2'])

ray_df = transform(df, add_cols, engine="ray")
ray_df.show(5)

And I am getting

2023-09-18 10:11:36,718 INFO worker.py:1476 -- Calling ray.init() again after it has already been called.
2023-09-18 10:11:36,737 INFO bulk_executor.py:42 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Repartition]
ERROR:root:_1 _State.RUNNING -> _State.FAILED  argument of type 'NoneType' is not iterable
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[22], line 14
     11 def add_cols(df:pd.DataFrame) -> pd.DataFrame:
     12     return df.assign(col3 = df['col1'] + df['col2'])
---> 14 ray_df = transform(df, add_cols, engine="ray")
     15 ray_df.show(5)

File /opt/conda/lib/python3.10/site-packages/fugue/workflow/api.py:174, in transform(df, using, schema, params, partition, callback, ignore_errors, persist, as_local, save_path, checkpoint, engine, engine_conf, as_fugue)
    171     else:
    172         tdf.save(save_path, fmt="parquet")
--> 174 dag.run(make_execution_engine(engine, conf=engine_conf, infer_by=[df]))
    175 if checkpoint:
    176     result = dag.yields["result"].result  # type:ignore

File /opt/conda/lib/python3.10/site-packages/fugue/workflow/workflow.py:1604, in FugueWorkflow.run(self, engine, conf, **kwargs)
   1602             if ctb is None:  # pragma: no cover
   1603                 raise
-> 1604             raise ex.with_traceback(ctb)
   1605         self._computed = True
   1606 return FugueWorkflowResult(self.yields)

File /opt/conda/lib/python3.10/site-packages/fugue_ray/execution_engine.py:51, in RayMapEngine.map_dataframe(self, df, map_func, output_schema, partition_spec, on_init, map_func_format_hint)
     41 def map_dataframe(
     42     self,
     43     df: DataFrame,
   (...)
     48     map_func_format_hint: Optional[str] = None,
     49 ) -> DataFrame:
     50     if len(partition_spec.partition_by) == 0:
---> 51         return self._map(
     52             df=df,
     53             map_func=map_func,
     54             output_schema=output_schema,
     55             partition_spec=partition_spec,
     56             on_init=on_init,
     57         )
     58     else:
     59         return self._group_map(
     60             df=df,
     61             map_func=map_func,
   (...)
     64             on_init=on_init,
     65         )

File /opt/conda/lib/python3.10/site-packages/fugue_ray/execution_engine.py:186, in RayMapEngine._map(self, df, map_func, output_schema, partition_spec, on_init)
    181     n = get_default_partitions(self.execution_engine)
    182     if n > 0 and n != rdf.num_partitions:
    183         # if n==0 or same as the current dataframe partitions
    184         # then no repartition will be done by fugue
    185         # otherwise, repartition the dataset
--> 186         rdf = self.execution_engine.repartition(  # type: ignore
    187             rdf, PartitionSpec(num=n)
    188         )
    189 mb_args: Dict[str, Any] = {}
    190 if FUGUE_RAY_DEFAULT_BATCH_SIZE in self.conf:

File /opt/conda/lib/python3.10/site-packages/fugue_ray/execution_engine.py:263, in RayExecutionEngine.repartition(self, df, partition_spec)
    261     else:  # pragma: no cover
    262         raise NotImplementedError(partition_spec.algo + " is not supported")
--> 263 return RayDataFrame(pdf, schema=rdf.schema, internal_schema=True)

File /opt/conda/lib/python3.10/site-packages/fugue_ray/dataframe.py:55, in RayDataFrame.__init__(self, df, schema, internal_schema)
     53     return
     54 if isinstance(df, rd.Dataset):
---> 55     fmt, df = get_dataset_format(df)
     56     if fmt is None:  # empty:
     57         schema = _input_schema(schema).assert_not_empty()

File /opt/conda/lib/python3.10/site-packages/fugue_ray/_utils/dataframe.py:33, in get_dataset_format(df)
     32 def get_dataset_format(df: rd.Dataset) -> Tuple[Optional[str], rd.Dataset]:
---> 33     df = materialize(df)
     34     if df.count() == 0:
     35         return None, df

File /opt/conda/lib/python3.10/site-packages/fugue_ray/_utils/dataframe.py:26, in materialize(df)
     24 if not is_materialized(df):
     25     if hasattr(df, "materialize"):
---> 26         df = df.materialize()
     27     else:  # pragma: no cover
     28         df = df.fully_executed()

File /opt/conda/lib/python3.10/site-packages/ray/data/dataset.py:4713, in Dataset.materialize(self)
   4694 """Execute and materialize this dataset into object store memory.
   4695 
   4696 This can be used to read all blocks into memory. By default, Dataset
   (...)
   4710     A MaterializedDataset holding the materialized data blocks.
   4711 """
   4712 copy = Dataset.copy(self, _deep_copy=True, _as=MaterializedDataset)
-> 4713 copy._plan.execute(force_read=True)
   4715 blocks = copy._plan._snapshot_blocks
   4716 blocks_with_metadata = blocks.get_blocks_with_metadata() if blocks else []

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/plan.py:591, in ExecutionPlan.execute(self, allow_clear_input_blocks, force_read, preserve_order)
    589 else:
    590     executor = BulkExecutor(copy.deepcopy(context.execution_options))
--> 591 blocks = execute_to_legacy_block_list(
    592     executor,
    593     self,
    594     allow_clear_input_blocks=allow_clear_input_blocks,
    595     dataset_uuid=self._dataset_uuid,
    596     preserve_order=preserve_order,
    597 )
    598 # TODO(ekl) we shouldn't need to set this in the future once we move
    599 # to a fully lazy execution model, unless .materialize() is used. Th
    600 # reason we need it right now is since the user may iterate over a
    601 # Dataset multiple times after fully executing it once.
    602 if not self._run_by_consumer:

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py:118, in execute_to_legacy_block_list(executor, plan, allow_clear_input_blocks, dataset_uuid, preserve_order)
    100 """Execute a plan with the new executor and translate it into a legacy block list.
    101 
    102 Args:
   (...)
    110     The output as a legacy block list.
    111 """
    112 dag, stats = _get_execution_dag(
    113     executor,
    114     plan,
    115     allow_clear_input_blocks,
    116     preserve_order,
    117 )
--> 118 bundles = executor.execute(dag, initial_stats=stats)
    119 block_list = _bundles_to_block_list(bundles)
    120 # Set the stats UUID after execution finishes.

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/bulk_executor.py:85, in BulkExecutor.execute(self, dag, initial_stats)
     80     logger.get_logger(log_to_stdout=context.enable_auto_log_stats).info(
     81         stats_summary_string,
     82     )
     83     return output
---> 85 return OutputIterator(execute_recursive(dag))

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/bulk_executor.py:65, in BulkExecutor.execute.<locals>.execute_recursive(op)
     63         for r in ref_bundles:
     64             op.add_input(r, input_index=i)
---> 65     op.all_inputs_done()
     66     output = _naive_run_until_complete(op)
     67 finally:

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/base_physical_operator.py:88, in AllToAllOperator.all_inputs_done(self)
     83 def all_inputs_done(self) -> None:
     84     ctx = TaskContext(
     85         task_idx=self._next_task_index,
     86         sub_progress_bar_dict=self._sub_progress_bar_dict,
     87     )
---> 88     self._output_buffer, self._stats = self._bulk_fn(self._input_buffer, ctx)
     89     self._next_task_index += 1
     90     self._input_buffer.clear()

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/repartition.py:61, in generate_repartition_fn.<locals>.split_repartition_fn(refs, ctx)
     59 shuffle_spec = ShuffleTaskSpec(random_shuffle=False)
     60 scheduler = SplitRepartitionTaskScheduler(shuffle_spec)
---> 61 return scheduler.execute(refs, num_outputs, ctx)

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py:78, in SplitRepartitionTaskScheduler.execute(self, refs, output_num_blocks, ctx, map_ray_remote_args, reduce_ray_remote_args)
     76 sub_progress_bar_dict = ctx.sub_progress_bar_dict
     77 bar_name = ShuffleTaskSpec.SPLIT_REPARTITION_SUB_PROGRESS_BAR_NAME
---> 78 assert bar_name in sub_progress_bar_dict, sub_progress_bar_dict
     79 reduce_bar = sub_progress_bar_dict[bar_name]
     81 reduce_task = cached_remote_fn(self._exchange_spec.reduce)

TypeError: argument of type 'NoneType' is not iterable

Simple ray operations are working

import time

@ray.remote
def square(x):
    time.sleep(20)  # Simulate a long running task
    return x * x

# Launch four parallel square tasks.
futures = [square.remote(i) for i in range(10)]
print(ray.get(futures))

What am I missing? Thanks!

kvnkho commented 9 months ago

I see you closed the issue, are you good? This might be Ray version right? @goodwanghan

goodwanghan commented 9 months ago

@juanitorduz i guess that is related with the ray versions. Ray changes behaviors in every version, so if you are using an old fugue version + a very new ray version, it could break.

Could you try

pip install fugue[ray]==0.8.6.dev4

and try again? It has the support of the latest ray. We will release 0.8.7 soon

juanitorduz commented 9 months ago

Yeah! It seems something related to the versions. Our cluster runs with Ray 2.6.3 so I made sure now that I have the same version in jupyter hub (the latest Ray version was released yesterday).

I also made sure to pass fugue[ray] to the runtime_env argument of Ray init. I made this example work passing as_local=True .

I’m now grasping the fact that there a way from the examples from the docs to running this in a cluster. I'll keep you posted!

Thank you very much! 💪

juanitorduz commented 9 months ago

I see you closed the issue, are you good? This might be Ray's version right? @goodwanghan

Indeed, seems it was an issue with the Ray version and the need to pass the packages (fugue[ray]) through runtime_env or Ray init.

I'm a Ray and Fugue nubie so I closed it because it seems something I could fix.

fyi: my end goal is to use statsforecast with Ray in jupyter hub.

juanitorduz commented 9 months ago

I think I solved now with the right versions thanks! 🙏