fugue-project / fugue

A unified interface for distributed computing. Fugue executes SQL, Python, Pandas, and Polars code on Spark, Dask and Ray without any rewrites.
https://fugue-tutorials.readthedocs.io/
Apache License 2.0
1.92k stars 94 forks source link

[QUESTION] Statsforecast + Ray + JupyterHub #515

Closed juanitorduz closed 9 months ago

juanitorduz commented 9 months ago

Hi! I am trying to run a simple example using Statsforecast + Ray + JupyterHub. It works locally but not on the cluster. Do you have any tips?

Details:

# !pip install polars
# !pip install "ray[air]==2.6.3"
# !pip install "fugue[polars,ray]==0.8.7.dev4"
# !pip install ipywidgets
# !pip install statsforecast

import ray
import pandas as pd
from fugue import transform 

from statsforecast.core import StatsForecast
from statsforecast.models import (
    AutoARIMA,
    AutoETS,
)
from statsforecast.utils import generate_series

ray.init(address="ray://XXX", runtime_env = {"pip": ["fugue[ray]==0.8.7.dev4", "statsforecast"]})

n_series = 2
horizon = 15

series = generate_series(n_series, engine="pandas")
series = series.reset_index()
series["unique_id"] = series["unique_id"].astype(str)

def forecast_series(df: pd.DataFrame, models: list, h: int) -> pd.DataFrame:
    tdf = df.set_index("unique_id")
    model = StatsForecast(df=tdf, models=models, freq="D")
    return model.forecast(h).reset_index()

transform(
    series,
    forecast_series,
    params=dict(models=[AutoETS(season_length=7), AutoARIMA(season_length=7)], h=horizon),
    schema="unique_id:str, ds:date, AutoETS:float ,AutoARIMA:float",
    partition={"by": "unique_id"},
    engine="ray",
)

This simple example from the documentation work locally but when running on the cluster I get

---------------------------------------------------------------------------
RaySystemError                            Traceback (most recent call last)
Cell In[12], line 6
      3     model = StatsForecast(df=tdf, models=models, freq="D")
      4     return model.forecast(h).reset_index()
----> 6 transform(
      7     series,
      8     forecast_series,
      9     params=dict(models=[AutoETS(season_length=7), AutoARIMA(season_length=7)], h=horizon),
     10     schema="unique_id:str, ds:date, AutoETS:float ,AutoARIMA:float",
     11     partition={"by": "unique_id"},
     12     engine="ray",
     13     #as_local=True,
     14 )

File /opt/conda/lib/python3.10/site-packages/fugue/workflow/api.py:174, in transform(df, using, schema, params, partition, callback, ignore_errors, persist, as_local, save_path, checkpoint, engine, engine_conf, as_fugue)
    171     else:
    172         tdf.save(save_path, fmt="parquet")
--> 174 dag.run(make_execution_engine(engine, conf=engine_conf, infer_by=[df]))
    175 if checkpoint:
    176     result = dag.yields["result"].result  # type:ignore

File /opt/conda/lib/python3.10/site-packages/fugue/workflow/workflow.py:1604, in FugueWorkflow.run(self, engine, conf, **kwargs)
   1602             if ctb is None:  # pragma: no cover
   1603                 raise
-> 1604             raise ex.with_traceback(ctb)
   1605         self._computed = True
   1606 return FugueWorkflowResult(self.yields)

File /opt/conda/lib/python3.10/site-packages/fugue_ray/execution_engine.py:59, in RayMapEngine.map_dataframe(self, df, map_func, output_schema, partition_spec, on_init, map_func_format_hint)
     51     return self._map(
     52         df=df,
     53         map_func=map_func,
   (...)
     56         on_init=on_init,
     57     )
     58 else:
---> 59     return self._group_map(
     60         df=df,
     61         map_func=map_func,
     62         output_schema=output_schema,
     63         partition_spec=partition_spec,
     64         on_init=on_init,
     65     )

File /opt/conda/lib/python3.10/site-packages/fugue_ray/execution_engine.py:144, in RayMapEngine._group_map(self, df, map_func, output_schema, partition_spec, on_init)
    138 gdf = rdf.groupby(_RAY_PARTITION_KEY)
    139 sdf = gdf.map_groups(
    140     _udf,
    141     batch_format="pyarrow",
    142     **self.execution_engine._get_remote_args(),  # type: ignore
    143 )
--> 144 return RayDataFrame(sdf, schema=output_schema, internal_schema=True)

File /opt/conda/lib/python3.10/site-packages/fugue_ray/dataframe.py:56, in RayDataFrame.__init__(self, df, schema, internal_schema)
     54     return
     55 if isinstance(df, rd.Dataset):
---> 56     fmt, df = get_dataset_format(df)
     57     if fmt is None:  # empty:
     58         schema = _input_schema(schema).assert_not_empty()

File /opt/conda/lib/python3.10/site-packages/fugue_ray/_utils/dataframe.py:31, in get_dataset_format(df)
     30 def get_dataset_format(df: rd.Dataset) -> Tuple[Optional[str], rd.Dataset]:
---> 31     df = materialize(df)
     32     if df.count() == 0:
     33         return None, df

File /opt/conda/lib/python3.10/site-packages/fugue_ray/_utils/dataframe.py:24, in materialize(df)
     22 if not is_materialized(df):
     23     if hasattr(df, "materialize"):
---> 24         df = df.materialize()
     25     else:  # pragma: no cover
     26         df = df.fully_executed()

File /opt/conda/lib/python3.10/site-packages/ray/data/dataset.py:4149, in Dataset.materialize(self)
   4130 """Execute and materialize this dataset into object store memory.
   4131 
   4132 This can be used to read all blocks into memory. By default, Dataset
   (...)
   4146     A MaterializedDataset holding the materialized data blocks.
   4147 """
   4148 copy = Dataset.copy(self, _deep_copy=True, _as=MaterializedDataset)
-> 4149 copy._plan.execute(force_read=True)
   4151 blocks = copy._plan._snapshot_blocks
   4152 blocks_with_metadata = blocks.get_blocks_with_metadata() if blocks else []

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/plan.py:591, in ExecutionPlan.execute(self, allow_clear_input_blocks, force_read, preserve_order)
    589 else:
    590     executor = BulkExecutor(copy.deepcopy(context.execution_options))
--> 591 blocks = execute_to_legacy_block_list(
    592     executor,
    593     self,
    594     allow_clear_input_blocks=allow_clear_input_blocks,
    595     dataset_uuid=self._dataset_uuid,
    596     preserve_order=preserve_order,
    597 )
    598 # TODO(ekl) we shouldn't need to set this in the future once we move
    599 # to a fully lazy execution model, unless .materialize() is used. Th
    600 # reason we need it right now is since the user may iterate over a
    601 # Dataset multiple times after fully executing it once.
    602 if not self._run_by_consumer:

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py:116, in execute_to_legacy_block_list(executor, plan, allow_clear_input_blocks, dataset_uuid, preserve_order)
    109 dag, stats = _get_execution_dag(
    110     executor,
    111     plan,
    112     allow_clear_input_blocks,
    113     preserve_order,
    114 )
    115 bundles = executor.execute(dag, initial_stats=stats)
--> 116 block_list = _bundles_to_block_list(bundles)
    117 # Set the stats UUID after execution finishes.
    118 _set_stats_uuid_recursive(executor.get_stats(), dataset_uuid)

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py:349, in _bundles_to_block_list(bundles)
    347 blocks, metadata = [], []
    348 owns_blocks = True
--> 349 for ref_bundle in bundles:
    350     if not ref_bundle.owns_blocks:
    351         owns_blocks = False

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces.py:548, in OutputIterator.__next__(self)
    547 def __next__(self) -> RefBundle:
--> 548     return self.get_next()

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py:129, in StreamingExecutor.execute.<locals>.StreamIterator.get_next(self, output_split_idx)
    127         raise StopIteration
    128 elif isinstance(item, Exception):
--> 129     raise item
    130 else:
    131     # Otherwise return a concrete RefBundle.
    132     if self._outer._global_info:

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py:187, in StreamingExecutor.run(self)
    181 """Run the control loop in a helper thread.
    182 
    183 Results are returned via the output node's outqueue.
    184 """
    185 try:
    186     # Run scheduling loop until complete.
--> 187     while self._scheduling_loop_step(self._topology) and not self._shutdown:
    188         pass
    189 except Exception as e:
    190     # Propagate it to the result iterator.

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py:235, in StreamingExecutor._scheduling_loop_step(self, topology)
    230     logger.get_logger().info("Scheduling loop step...")
    232 # Note: calling process_completed_tasks() is expensive since it incurs
    233 # ray.wait() overhead, so make sure to allow multiple dispatch per call for
    234 # greater parallelism.
--> 235 process_completed_tasks(topology)
    237 # Dispatch as many operators as we can for completed tasks.
    238 limits = self._get_or_refresh_resource_limits()

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py:333, in process_completed_tasks(topology)
    331     for ref in completed:
    332         op = active_tasks.pop(ref)
--> 333         op.notify_work_completed(ref)
    335 # Pull any operator outputs into the streaming op state.
    336 for op, op_state in topology.items():

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/task_pool_map_operator.py:66, in TaskPoolMapOperator.notify_work_completed(self, ref)
     64 task: _TaskState = self._tasks.pop(ref)
     65 task.output = self._map_ref_to_ref_bundle(ref)
---> 66 self._handle_task_done(task)

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py:274, in MapOperator._handle_task_done(self, task)
    272 # Notify output queue that this task is complete.
    273 self._output_queue.notify_task_completed(task)
--> 274 task.inputs.destroy_if_owned()
    275 # Update object store metrics.
    276 allocated = task.output.size_bytes()

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces.py:91, in RefBundle.destroy_if_owned(self)
     89 should_free = self.owns_blocks and DataContext.get_current().eager_free
     90 for b in self.blocks:
---> 91     trace_deallocation(b[0], "RefBundle.destroy_if_owned", free=should_free)
     92 return self.size_bytes() if should_free else 0

File /opt/conda/lib/python3.10/site-packages/ray/data/_internal/memory_tracing.py:48, in trace_deallocation(ref, loc, free)
     39 """Record that an object has been deleted (and delete if free=True).
     40 
     41 Args:
   (...)
     45         reference counting to kick in.
     46 """
     47 if free:
---> 48     ray._private.internal_api.free(ref, local_only=False)
     49 ctx = DataContext.get_current()
     50 if ctx.trace_allocations:

File /opt/conda/lib/python3.10/site-packages/ray/_private/internal_api.py:220, in free(object_refs, local_only)
    214     if not isinstance(object_ref, ray.ObjectRef):
    215         raise TypeError(
    216             "Attempting to call `free` on the value {}, "
    217             "which is not an ray.ObjectRef.".format(object_ref)
    218         )
--> 220 worker.check_connected()
    221 with profiling.profile("ray.free"):
    222     if len(object_refs) == 0:

File /opt/conda/lib/python3.10/site-packages/ray/_private/worker.py:622, in Worker.check_connected(self)
    616 """Check if the worker is connected.
    617 
    618 Raises:
    619   Exception: An exception is raised if the worker is not connected.
    620 """
    621 if not self.connected:
--> 622     raise RaySystemError(
    623         "Ray has not been started yet. You can start Ray with 'ray.init()'."
    624     )

RaySystemError: System error: Ray has not been started yet. You can start Ray with 'ray.init()'.
kvnkho commented 9 months ago

sf.forecast(ray_df, ...) directly will work now by the way (as soon as Ray works here). You don't need to write all this extra code unless you have some specific logic you are trying to inject like experiment tracking or partitioning.

Hard to say here. Was reading this, doesn't have a clear solution but did you try vanilla ray code on the cluster (without Fugue and StatsForecast), and did you try the environment variable mentioned here. I am not optimistic it will work but worth a shot.

Then also check if you can access the head node? Using the comment after the one I linked with netstat

goodwanghan commented 9 months ago

Also, try to use context manager

with ray.init(...):
    sf.forecast(ray.data.from_pandas(pd_df), ...)

you don't need transform directly, and you don't need to specify engine or partition or schema, just pass in a ray df to statsforecast

juanitorduz commented 9 months ago

Thanks for the feedback πŸ’ͺ! I am getting the same error from the vanilla sf.forecast approach and that is the reason I'm using transform directly (to see if there is something that I can modify).

I'll keep working on it following your tips (e.g. Context manager).

Thanks πŸ˜ƒ

goodwanghan commented 9 months ago

I tried on a clean environment with

statsforecast==1.6.0
fugue[ray]==0.8.7.dev4
ray[data]==2.7.0

It works without any problem, here is the code:

import ray
import pandas as pd
from fugue import transform 
import fugue.api as fa

from statsforecast.core import StatsForecast
from statsforecast.models import (
    AutoARIMA,
    AutoETS,
)
from statsforecast.utils import generate_series

n_series = 2
horizon = 15

series = generate_series(n_series, engine="pandas")
series = series.reset_index()
series["unique_id"] = series["unique_id"].astype(str)

with ray.init():
    models=[AutoETS(season_length=7), AutoARIMA(season_length=7)]
    st = StatsForecast(models=models, freq="D")
    res = fa.as_pandas(st.forecast(horizon, ray.data.from_pandas(series)))
    print(res)
juanitorduz commented 9 months ago

Thank you! It indeed works locally :) However, then we have a Ray cluster on K8s, we still get the error before ... Do you maybe have any tips?

(This has been already quite helpful!)

goodwanghan commented 9 months ago
with ray.init():
    df = ray.data.from_pandas(series)
    df.to_pandas()

Can you run this on your k8s cluster?

juanitorduz commented 9 months ago

Hey! I think there is something we need to fix in our JupyterHub integration. Let me close this issue while we figure things out! I will come back with the learnings to share with the community! Thank you for being so supportive and keep up the great work πŸš€