lmcinnes / umap

Uniform Manifold Approximation and Projection
BSD 3-Clause "New" or "Revised" License
7.34k stars 798 forks source link

Fitting UMAP in a scikit-learn pipeline with 50k training samples fails with `PicklingError: Could not pickle the task to send it to the workers` #563

Open nbeuchat opened 3 years ago

nbeuchat commented 3 years ago

I have a scikit-learn pipeline that uses UMAP for dimensionality reduction. It works fine for a small dataset (I tried 2k samples with 512 dimensions, 10 UMAP components) but when I use a larger dataset (50k samples, 512 dimensions, 10 UMAP components), it always fails when fitting with the following error: PicklingError: Could not pickle the task to send it to the workers (full error below)

The relevant part of the pipeline is simply (which is fed into a larger pipeline):

n_umap_components = 10

umap_reduction = Pipeline([
    ("sent_vect_umap", UMAP(n_components = n_umap_components)),
    ("umap_scaler", StandardScaler())
])

dimensionality_reduction = ColumnTransformer([
    ("umap_selector", umap_reduction, slice(-512, None)),
], remainder="passthrough")

If I do: dimensionality_reduction.fit_transform(X,y), it fails with the error mentioned above. But if I do: umap_reduction.fit_transform(X,y), it works fine.

I believe that this is due to the parallelization of ColumnTransformer (it would be the same as FeatureUnion) but in my case, n_jobs is None by default and therefore uses a single process.

I have the following packages versions:

umap-learn==0.5.0
numba==0.52.0
numpy==1.18.5
scipy==1.4.1
pynndescent==0.5.1
scikit-learn==0.24.1

I am running this in Jupyter Lab. Note that when I trained other UMAP models without a scikit-learn pipeline, it worked fine even with much larger amount of data.

Given that UMAP is by far the dimensionality reduction method that gave me the best results on my dataset, I'd really like to be able to include it in our pipeline! Any help would be very much appreciated.

Thanks a lot for your help!! Nicolas

Full error log:

---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/nicolas/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/externals/cloudpickle/cloudpickle_fast.py", line 563, in dump
    return Pickler.dump(self, obj)
  File "/home/nicolas/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/numba/core/serialize.py", line 217, in _pickle__CustomPickled
    serialized = dumps((cp.ctor, cp.states))
  File "/home/nicolas/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/numba/core/serialize.py", line 168, in dumps
    p.dump(obj)
  File "/home/nicolas/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/numba/core/serialize.py", line 362, in reducer_override
    return self._custom_reduce_func(obj)
  File "/home/nicolas/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/numba/core/serialize.py", line 373, in _custom_reduce_func
    gls = _get_function_globals_for_reduction(func)
  File "/home/nicolas/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/numba/core/serialize.py", line 66, in _get_function_globals_for_reduction
    func_id = bytecode.FunctionIdentity.from_function(func)
  File "/home/nicolas/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/numba/core/bytecode.py", line 308, in from_function
    pysig = utils.pysignature(func)
  File "/home/nicolas/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/inspect.py", line 3093, in signature
    return Signature.from_callable(obj, follow_wrapped=follow_wrapped)
  File "/home/nicolas/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/inspect.py", line 2842, in from_callable
    return _signature_from_callable(obj, sigcls=cls,
  File "/home/nicolas/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/inspect.py", line 2292, in _signature_from_callable
    return _signature_from_function(sigcls, obj,
  File "/home/nicolas/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/inspect.py", line 2156, in _signature_from_function
    parameters.append(Parameter(name, annotation=annotation,
  File "/home/nicolas/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/inspect.py", line 2479, in __init__
    self._kind = _ParameterKind(kind)
RecursionError: maximum recursion depth exceeded

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/nicolas/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/externals/loky/backend/queues.py", line 153, in _feed
    obj_ = dumps(obj, reducers=reducers)
  File "/home/nicolas/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/externals/loky/backend/reduction.py", line 271, in dumps
    dump(obj, buf, reducers=reducers, protocol=protocol)
  File "/home/nicolas/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/externals/loky/backend/reduction.py", line 264, in dump
    _LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
  File "/home/nicolas/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/externals/cloudpickle/cloudpickle_fast.py", line 570, in dump
    raise pickle.PicklingError(msg) from e
_pickle.PicklingError: Could not pickle object as excessively deep recursion required.
"""

The above exception was the direct cause of the following exception:

PicklingError                             Traceback (most recent call last)
<ipython-input-263-b1951e9afe78> in <module>
      1 print("Start preprocessing fit")
      2 start_time = time.time()
----> 3 pipe["preprocessing"].fit(df_vocab_train[features_columns], df_vocab_train[target_column])
      4 print(f"{time.time() - start_time:.1f} seconds")

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/sklearn/compose/_column_transformer.py in fit(self, X, y)
    469         # we use fit_transform to make sure to set sparse_output_ (for which we
    470         # need the transformed data) to have consistent output type in predict
--> 471         self.fit_transform(X, y=y)
    472         return self
    473 

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/sklearn/compose/_column_transformer.py in fit_transform(self, X, y)
    506         self._validate_remainder(X)
    507 
--> 508         result = self._fit_transform(X, y, _fit_transform_one)
    509 
    510         if not result:

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/sklearn/compose/_column_transformer.py in _fit_transform(self, X, y, func, fitted)
    433             self._iter(fitted=fitted, replace_strings=True))
    434         try:
--> 435             return Parallel(n_jobs=self.n_jobs)(
    436                 delayed(func)(
    437                     transformer=clone(trans) if not fitted else trans,

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/parallel.py in __call__(self, iterable)
   1042                 self._iterating = self._original_iterator is not None
   1043 
-> 1044             while self.dispatch_one_batch(iterator):
   1045                 pass
   1046 

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/parallel.py in dispatch_one_batch(self, iterator)
    857                 return False
    858             else:
--> 859                 self._dispatch(tasks)
    860                 return True
    861 

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/parallel.py in _dispatch(self, batch)
    775         with self._lock:
    776             job_idx = len(self._jobs)
--> 777             job = self._backend.apply_async(batch, callback=cb)
    778             # A job can complete so quickly than its callback is
    779             # called before we get here, causing self._jobs to

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/_parallel_backends.py in apply_async(self, func, callback)
    206     def apply_async(self, func, callback=None):
    207         """Schedule a func to be run"""
--> 208         result = ImmediateResult(func)
    209         if callback:
    210             callback(result)

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/_parallel_backends.py in __init__(self, batch)
    570         # Don't delay the application, to avoid keeping the input
    571         # arguments in memory
--> 572         self.results = batch()
    573 
    574     def get(self):

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/parallel.py in __call__(self)
    260         # change the default number of processes to -1
    261         with parallel_backend(self._backend, n_jobs=self._n_jobs):
--> 262             return [func(*args, **kwargs)
    263                     for func, args, kwargs in self.items]
    264 

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/parallel.py in <listcomp>(.0)
    260         # change the default number of processes to -1
    261         with parallel_backend(self._backend, n_jobs=self._n_jobs):
--> 262             return [func(*args, **kwargs)
    263                     for func, args, kwargs in self.items]
    264 

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/sklearn/utils/fixes.py in __call__(self, *args, **kwargs)
    220     def __call__(self, *args, **kwargs):
    221         with config_context(**self.config):
--> 222             return self.function(*args, **kwargs)

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/sklearn/pipeline.py in _fit_transform_one(transformer, X, y, weight, message_clsname, message, **fit_params)
    752     with _print_elapsed_time(message_clsname, message):
    753         if hasattr(transformer, 'fit_transform'):
--> 754             res = transformer.fit_transform(X, y, **fit_params)
    755         else:
    756             res = transformer.fit(X, y, **fit_params).transform(X)

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/sklearn/pipeline.py in fit_transform(self, X, y, **fit_params)
    376         """
    377         fit_params_steps = self._check_fit_params(**fit_params)
--> 378         Xt = self._fit(X, y, **fit_params_steps)
    379 
    380         last_step = self._final_estimator

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/sklearn/pipeline.py in _fit(self, X, y, **fit_params_steps)
    301                 cloned_transformer = clone(transformer)
    302             # Fit or load from cache the current transformer
--> 303             X, fitted_transformer = fit_transform_one_cached(
    304                 cloned_transformer, X, y, None,
    305                 message_clsname='Pipeline',

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/memory.py in __call__(self, *args, **kwargs)
    350 
    351     def __call__(self, *args, **kwargs):
--> 352         return self.func(*args, **kwargs)
    353 
    354     def call_and_shelve(self, *args, **kwargs):

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/sklearn/pipeline.py in _fit_transform_one(transformer, X, y, weight, message_clsname, message, **fit_params)
    752     with _print_elapsed_time(message_clsname, message):
    753         if hasattr(transformer, 'fit_transform'):
--> 754             res = transformer.fit_transform(X, y, **fit_params)
    755         else:
    756             res = transformer.fit(X, y, **fit_params).transform(X)

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/sklearn/compose/_column_transformer.py in fit_transform(self, X, y)
    506         self._validate_remainder(X)
    507 
--> 508         result = self._fit_transform(X, y, _fit_transform_one)
    509 
    510         if not result:

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/sklearn/compose/_column_transformer.py in _fit_transform(self, X, y, func, fitted)
    433             self._iter(fitted=fitted, replace_strings=True))
    434         try:
--> 435             return Parallel(n_jobs=self.n_jobs)(
    436                 delayed(func)(
    437                     transformer=clone(trans) if not fitted else trans,

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/parallel.py in __call__(self, iterable)
   1039             # remaining jobs.
   1040             self._iterating = False
-> 1041             if self.dispatch_one_batch(iterator):
   1042                 self._iterating = self._original_iterator is not None
   1043 

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/parallel.py in dispatch_one_batch(self, iterator)
    857                 return False
    858             else:
--> 859                 self._dispatch(tasks)
    860                 return True
    861 

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/parallel.py in _dispatch(self, batch)
    775         with self._lock:
    776             job_idx = len(self._jobs)
--> 777             job = self._backend.apply_async(batch, callback=cb)
    778             # A job can complete so quickly than its callback is
    779             # called before we get here, causing self._jobs to

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/_parallel_backends.py in apply_async(self, func, callback)
    206     def apply_async(self, func, callback=None):
    207         """Schedule a func to be run"""
--> 208         result = ImmediateResult(func)
    209         if callback:
    210             callback(result)

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/_parallel_backends.py in __init__(self, batch)
    570         # Don't delay the application, to avoid keeping the input
    571         # arguments in memory
--> 572         self.results = batch()
    573 
    574     def get(self):

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/parallel.py in __call__(self)
    260         # change the default number of processes to -1
    261         with parallel_backend(self._backend, n_jobs=self._n_jobs):
--> 262             return [func(*args, **kwargs)
    263                     for func, args, kwargs in self.items]
    264 

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/parallel.py in <listcomp>(.0)
    260         # change the default number of processes to -1
    261         with parallel_backend(self._backend, n_jobs=self._n_jobs):
--> 262             return [func(*args, **kwargs)
    263                     for func, args, kwargs in self.items]
    264 

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/sklearn/utils/fixes.py in __call__(self, *args, **kwargs)
    220     def __call__(self, *args, **kwargs):
    221         with config_context(**self.config):
--> 222             return self.function(*args, **kwargs)

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/sklearn/pipeline.py in _fit_transform_one(transformer, X, y, weight, message_clsname, message, **fit_params)
    752     with _print_elapsed_time(message_clsname, message):
    753         if hasattr(transformer, 'fit_transform'):
--> 754             res = transformer.fit_transform(X, y, **fit_params)
    755         else:
    756             res = transformer.fit(X, y, **fit_params).transform(X)

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/umap/umap_.py in fit_transform(self, X, y)
   2633             Local radii of data points in the embedding (log-transformed).
   2634         """
-> 2635         self.fit(X, y)
   2636         if self.transform_mode == "embedding":
   2637             if self.output_dens:

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/umap/umap_.py in fit(self, X, y)
   2377                 self._knn_dists,
   2378                 self._knn_search_index,
-> 2379             ) = nearest_neighbors(
   2380                 X[index],
   2381                 self._n_neighbors,

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/umap/umap_.py in nearest_neighbors(X, n_neighbors, metric, metric_kwds, angular, random_state, low_memory, use_pynndescent, n_jobs, verbose)
    326         n_iters = max(5, int(round(np.log2(X.shape[0]))))
    327 
--> 328         knn_search_index = NNDescent(
    329             X,
    330             n_neighbors=n_neighbors,

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/pynndescent/pynndescent_.py in __init__(self, data, metric, metric_kwds, n_neighbors, n_trees, leaf_size, pruning_degree_multiplier, diversify_prob, n_search_trees, tree_init, init_graph, random_state, low_memory, max_candidates, n_iters, delta, n_jobs, compressed, verbose)
    781             if verbose:
    782                 print(ts(), "Building RP forest with", str(n_trees), "trees")
--> 783             self._rp_forest = make_forest(
    784                 data,
    785                 n_neighbors,

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/pynndescent/rp_trees.py in make_forest(data, n_neighbors, n_trees, leaf_size, rng_state, random_state, n_jobs, angular)
    997             )
    998         else:
--> 999             result = joblib.Parallel(n_jobs=n_jobs, prefer="threads")(
   1000                 joblib.delayed(make_dense_tree)(data, rng_states[i], leaf_size, angular)
   1001                 for i in range(n_trees)

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/parallel.py in __call__(self, iterable)
   1052 
   1053             with self._backend.retrieval_context():
-> 1054                 self.retrieve()
   1055             # Make sure that we get a last message telling us we are done
   1056             elapsed_time = time.time() - self._start_time

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/parallel.py in retrieve(self)
    931             try:
    932                 if getattr(self._backend, 'supports_timeout', False):
--> 933                     self._output.extend(job.get(timeout=self.timeout))
    934                 else:
    935                     self._output.extend(job.get())

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/site-packages/joblib/_parallel_backends.py in wrap_future_result(future, timeout)
    540         AsyncResults.get from multiprocessing."""
    541         try:
--> 542             return future.result(timeout=timeout)
    543         except CfTimeoutError as e:
    544             raise TimeoutError from e

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    437                 raise CancelledError()
    438             elif self._state == FINISHED:
--> 439                 return self.__get_result()
    440             else:
    441                 raise TimeoutError()

~/anaconda3/envs/nlp_fb_posts_topics_model/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    386     def __get_result(self):
    387         if self._exception:
--> 388             raise self._exception
    389         else:
    390             return self._result

PicklingError: Could not pickle the task to send it to the workers.
nbeuchat commented 3 years ago

If anybody has the same problem, you can use the following workaround:

This requires the pipeline to be broken up into different parts that can be trained separately but that does the trick.

# First fit the colum transformers with little data
dimensionality_reduction.fit(X[:1000, :], y[:1000])

# Then fit the UMAP selector again with the whole dataset
dimensionality_reduction.named_transformers_["umap_selector"].fit(X, y)
lmcinnes commented 3 years ago

I'm not really quite sure what is going astray here, because, as you note, it works fine outside of the pipeline. My best guess just looking at the code used and the error produced is that there is some catch with the slice(-512, None) resulting in something being passed in to UMAP that isn't a simple numpy array (lazy evaluation perhaps?). One option might be to make an explicit list of columns (using np.arange or list(range(.)).

I'll see if I can reproduce this and dig a little deeper into exactly what aspect of this is unpickleable.

nbeuchat commented 3 years ago

Thanks for your answer! The unexpected thing is that it works for smaller datasets when I do dimensionality_reduction.fit(X[:1000, :], y[:1000])

Let me know if you want me to test anything!

lmcinnes commented 3 years ago

That's not actually unexpected. For small enough dataset sizes (the somewhat arbitrary threshold is 4096 samples) the fancy NNDescent nearest neighbor search is overkill, and it is cheaper to just compute all-pairs distances. That avoids going through the routine that is currently failing, so anything below 4096 samples should work.

nbeuchat commented 3 years ago

Got it, interesting, thanks for the info!

lmcinnes commented 3 years ago

In good news I can reproduce the error. In bad news I am even more puzzled as to what is going on -- The objects that UMAP is getting should be fine.

lmcinnes commented 3 years ago

As far as I can tell it seems to be related to numba/numba#3370 , but seems related to the double nesting of joblib Parallel calls. I'll see if I can get any advice on this from the numba team.

lmcinnes commented 3 years ago

It looks like it is the fact that ColumnTransformer wraps everything in a joblib.Parallel call and, even if n_jobs=1 that changes the parallel context, and thus induces pickling, which doesn't play well with numba recursive functions. Normally this is all fine because the internal joblib.Parallel call inside pynndescent has an explicit prefer="threads" which uses the threading backend and avoid the serialization issues. I think (but still have to verify) that the outer Parallel call is messing that up. As a workaround for now I believe you should be able to do:

from joblib import parallel_backend

n_umap_components = 10

umap_reduction = Pipeline([
    ("sent_vect_umap", UMAP(n_components = n_umap_components)),
    ("umap_scaler", StandardScaler())
])

dimensionality_reduction = ColumnTransformer([
    ("umap_selector", umap_reduction, slice(-512, None)),
], remainder="passthrough")

with parallel_backend('threading'):
    dimensionality_reduction.fit_transform(X, y)

which should enforce the threading backend, and make everything play nice. Of course this is bad if you want actual parallelism in your ColumnTransformer, since parallelisation there may well run afoul of the GIL, but as long as you only need that to run serially (e.g. n_jobs=1) it should be fine.

nbeuchat commented 3 years ago

That's awesome, thanks so much for investigating this that quickly! I'll try your proposed approach, I'll write back how it goes.

lmcinnes commented 3 years ago

You may want to assign the result of the fit transform to something -- otherwise it'll get lost in the with context I realised. I think I have a fix for this now to go into pynndescent that will make everything good again: rather than prefer="threads" I need to use require="sharedmem" to stop the lack of serialization getting overridden from outside.

Thanks for reporting this -- it has certainly been a fun (if occasionally maddening) plunge into a a rabbit hole of complex interactions between different dependencies (joblib, numba).

nbeuchat commented 3 years ago

Your trick worked like a charm!

FWIW, it's only needed for the fit/fit_transform which is great to use the component in production.

Thanks for reporting this -- it has certainly been a fun (if occasionally maddening) plunge into a a rabbit hole of complex interactions between different dependencies (joblib, numba).

I can only imagine the ramifications ;-)

adilosa commented 3 years ago

Serialization support for _rp_forest would also probably fix this – discussion for that is #273 and lmcinnes/pynndescent#93.

(Assuming one really wanted to serialize the whole UMAP object and send it to every process. More important for deserializing from storage for production use.)