huggingface / datasets

🤗 The largest hub of ready-to-use datasets for ML models with fast, easy-to-use and efficient data manipulation tools
https://huggingface.co/docs/datasets
Apache License 2.0
19.33k stars 2.7k forks source link

Support parallelized downloading and processing in load_dataset with Spark #5798

Open es94129 opened 1 year ago

es94129 commented 1 year ago

Feature request

When calling load_dataset for datasets that have multiple files, support using Spark to distribute the downloading and processing job to worker nodes when cache_dir is a cloud file system shared among nodes.

load_dataset(..., use_spark=True)

Motivation

Further speed up dl_manager.download and _prepare_split by distributing the workloads to worker nodes.

Your contribution

I can submit a PR to support this.

lhoestq commented 1 year ago

Hi ! We're using process pools for parallelism right now. I was wondering if there's a package that implements the same API as a process pool but runs with Spark under the hood ? That or something similar would be cool because users could use whatever distributed framework they want this way.

Feel free to ping us when you'd like to open PRs for this kind of things, so that we can discuss this before you start working on it ^^

es94129 commented 1 year ago

Hi, thanks for taking a look and providing your input! I don't know of such packages, and even it exists, I don't think with the process pool API it's possible to run Spark as backend properly; otherwise I understand a unified API would be preferable.

The process pool API requires splitting the workload to a fixed number parts for multiprocessing; meanwhile distributed framework such as Spark has sophisticated scheduler to distribute the workload to the processes on multiple machines in a cluster, so the way of splitting things for multiprocessing.pool would not suit / be as flexible as directly calling the sparkContext.parallelize API.

I think this could be a good addition to scale the datasets implementation to distributed workers, and from my benchmark results so far it looks promising compared with multiprocessing.

lhoestq commented 1 year ago

I see ! I think we only need an equivalent of pool.map. We use it to run download and conversion of data files on disk. That would require less changes in the internal code - and therefore less tests to write ;)

We also use pool.apply_async in some places with a Queue to get progress updates of the running jobs. I'm mentioning this in case there's a way to get a python generator from a running spark job ? This is less important though

es94129 commented 1 year ago

For Spark, rdd.map (where rdd can be created by sparkContext.parallelize) is the most similar as pool.map, but it requires creating a Spark RDD first that is used for distributing the iterable and the actual parallelization is managed by the Spark framework; pool.map takes the splits of iterable that are split into num_proc parts by the Python code. You can also check my PR #5807 in the src/datasets/utils/py_utils.py file to compare the differences of the APIs, it might make more sense than the the above description.

Given the different inputs and mechanisms of calling the map functions, this is why I think it's not that feasible to reuse most of the multiprocessing code.

Progress bar updating might be challenging with Spark, I'll consider it as a followup work.

lhoestq commented 1 year ago

Indeed I think the current use of multiprocessing.Pool in map_nested can be rewritten to work like sparkContext.parallelize - without splitting the iterable.

Maybe from the user's perspective it's ok to let multiprocessing.Pool or spark distribute the load on their own, as long as it takes a list and runs jobs in parallel in the end :)

es94129 commented 1 year ago

From your feedback, seems to me there are two paths to consider now for supporting spark's map function in map_nested now:

  1. Keep the current pool.map implementation, and add an if statement for the spark's map code (which is what I did in my current PR) -- the code change is just a few lines in the map_nested function, and it has been tested by unit tests + manual testing on real Spark clusters; if you have other concerns I'd also be happy to address them.
  2. Rewrite the current pool.map implementation to remove splitting the iterable, and we will still need to add an if statement to use either
    with Pool(...) as pool:
    mapped = pool.map(_single_map_nested, iterable)

    or

    rdd = spark.sparkContext.parallelize(iterable)
    mapped = rdd.map(lambda obj: _single_map_nested((function, obj, types, None, True, None))).collect()

    because there is no unified API that supports both pool.map and rdd.map. This can be more unified and flexible in the long run, but might require more work, and it will change the existing multiprocessing behavior, which is why I'm not leaning towards this option.

Am I understanding correctly?

lhoestq commented 1 year ago

Yup correct ! I think it's a nice path because it would be possible for users to define whatever parallel processing backend they want. I think we still need to discuss how that would look like in the datasets API : how to specify it has to use the "spark" parallel backend ? And how to specify the spark session parameters (number of executors etc.) ? Maybe there is something more practical than use_spark=True

I'll check with the team internally if they have some ideas, but feel free to share your thoughts here !

es94129 commented 1 year ago

Sure, please let me know if you have more updates regarding the API and implementation from the team.

For parameters we don't need to worry about setting them for Spark, because Spark will figure out the environment / number of worker nodes by itself, so it's preferable to just provide some parameter such as use_spark to use the RDD map function.

es94129 commented 1 year ago

Hi! I wanted to check in to see if there is any update from the team.

A potential change of API I can think of is change the argument to distributed_backend=..., which accepts str, such as load_dataset(..., distributed_backend="spark").

Implementation wise, we can add a class / function to abstract away the details of using multiprocessing vs. spark vs. other parallel processing frameworks in map_nested and _prepare_split.

lhoestq commented 1 year ago

I found this quite interesting: https://github.com/joblib/joblib-spark with this syntax:

with parallel_backend('spark', n_jobs=3):
    ...

cc @lu-wang-dl who might know better

lu-wang-dl commented 1 year ago

Joblib spark is providing Spark backend for joblib. We can implement a general parallel backend like

with parallel_backend("<parallel-backedn>", n_jobs=..):

It can support multiprocessing , spark, ray, and etc. https://joblib.readthedocs.io/en/latest/parallel.html#joblib.parallel_backend

es94129 commented 1 year ago

Thank you @lhoestq for finding this repo. I validated that it can distribute downloading jobs with Spark to arbitrary cluster worker nodes evenly with n_jobs=-1.

For the API, I think it makes sense to define it as

load_dataset(..., parallel_backend=<str>)

where parallel_backend can be spark, multiprocessing, and potentially other supported joblib backends including ray and dask.

Implementation-wise, do you think it is better to just use joblib for spark backend in map_nested, or also migrate the multiprocessing.Pool code to use joblib?

es94129 commented 1 year ago

Hello @lhoestq, I wanted to follow up on my previous comment with some prototyping code that demonstrates how map_nested would be like if we unify multiprocessing and spark with joblib. The snippet hasn't hashed out the details such as dealing with tqdm yet.

In terms of API, the way of using multiprocessing is still the same; for Spark, the user sets parallel_backend='spark' can reuse the num_proc argument to pass in the number of executors, or preferably, just set num_proc=-1 and joblib is able to decide it (I've validated it by running it on a Spark cluster).

def map_nested(
  # ... same args
  parallel_backend: Optional[str] = None, # proposed new argument
):

  # ... same code

  # allow user to specify num_proc=-1, so that joblib will optimize it
  if (num_proc <= 1 and num_proc != -1) or len(iterable) < parallel_min_length:
    # same code
    mapped = [
      _single_map_nested((function, obj, types, None, True, None))
      for obj in logging.tqdm(iterable, disable=disable_tqdm, desc=desc)
    ]
  else:
    if not parallel_backend:
      parallel_backend = 'loky' # 'loky' is joblib's own implementation of robust multiprocessing

    n_jobs = min(num_proc, len(iterable))

    if parallel_backend == 'spark':
      n_jobs = -1 # 'loky' is joblib's own implementation of robust multiprocessing
      from joblibspark import register_spark
      register_spark()

    # parallelized with the same API
    with joblib.parallel_backend(parallel_backend, n_jobs=n_jobs):
      mapped = joblib.Parallel()(
        joblib.delayed(
          _single_map_nested((function, obj, types, None, True, None))
        )(obj) for obj in logging.tqdm(iterable, disable=disable_tqdm, desc=desc)
      )

  # ... same code

We can always joblib for Spark and other distributed backends such as Ray if people want to support them later. It's worth noting that some distributed backends do not currently have joblib implementations.

I would appreciate your thoughts on this proposed new API. We can also discuss the pros and cons of migrating the multiprocessing code to joblib later.

lhoestq commented 1 year ago

Nice ! It should be quite easy to make the change then :)

I think adding spark support can actually be less than 20 lines of code and would roughly require one line of code to change in map_nested:

Maybe we can define a new datasets.parallel submodule that has the parallel_backend() context manager and a parallel_map() function that uses Pool.map by default and joblib otherwise.

joblib would be an optional dependency, and joblib-spark as well.

Then whenever someone wants to use Spark, they can do something like this (similar to scikit-learn parallel_backend):

from datasets.parallel import parallel_backend

with parallel_backend("spark"):
    ds = load_dataset(...)

What do you think ?

lhoestq commented 1 year ago

Although until we've switched to all the steps in load_dataset to use datasets.parallel, I would require the user to explicitly say which step should use Spark. Maybe something like this, but I'm not sure yet:

from datasets.parallel import parallel_backend

with parallel_backend("spark", steps=["download"]):
    ds = load_dataset(...)

for now some steps can be NotImplemented:

from datasets.parallel import parallel_backend

with parallel_backend("spark", steps=["download", "prepare"]):
# NotImplementedError: the "prepare" step that converts the raw data files to Arrow is not compatible with the "spark" backend yet

This way we can progressively roll out Spark support for the other data loading/processing steps without breaking changes between datasets versions

es94129 commented 1 year ago

Sounds good! I like the partial rollout idea. So for example map_nested would call parallel_map under the hood if num_proc != 1 or parallel_backend is specified right? I would be happy to start a PR next week to explore this path.

lhoestq commented 1 year ago

Awesome ! I think map_nested can call parallel_map() if num_proc > 1, and parallel_map can be responsible to use Pool.map by default or joblib.