Open codinalot opened 3 years ago
Thanks for the report. Let us know if you can provide a reproducible example.
Hi @Jreyno40,
This one would be quite tricky to pin down without a reproducible example.
If your data transformation pipeline is non-confidential, perhaps you could try to provide a full, reproducible example using some sort of artificial or pseudo-random data, via e.g. dask_ml.datasets.make_blobs
?
Hi @Jreyno40,
I had a similar error with a very basic example: https://examples.dask.org/dataframes/01-data-access.html Specifically, at the two lines:
df = dd.read_csv('data/2000-*-*.csv')
df.head()
I end up with the a stack trace similar to yours, but the cause is a bit different "AttributeError: 'tuple' object has no attribute 'head'"
Anyway, I solved it by specifying a scheduler option to the compute method and placing it in between the two lines above.
df = dd.read_csv('data/2000-*-*.csv')
df = df.compute(scheduler='threads')
df.head()
I am only starting to learn how to use Dask, but I guess this problem might be similar to yours, and the culprit could be the scheduler back end (threads vs processes) and how the data serialization works under the hood.
If you try the same example in the link I attached above, but only removing the client option altogether, or changing the processes=False to processes=True, then you don't need the "df = df.compute(scheduler='threads')" line at all and the examples works just fine.
Again, maybe I did some naive mistake as a new user of Dask, but I hope it helps you fix the error you encountered.
I am getting the same exact error right now using logistic regression. I don't think I'll be able to provide a reproducible example but if it is helpful I will describe where I am encountering this issue.
I am using dask version 2021.11.2. I am testing my code by pulling my data in batches of 1000 rows. When I pull 2-4 batches of 1000 rows and load them into separate partitions of a dask dataframe (using from_delayed) I am able to fit a model successfully (spread across 5 workers). When I expand this to 5 batches of 1000 rows (still spread across 5 workers) I start getting this error. It's only when I start to distribute the data to more workers that I receive this error not when my partitions increase in size which is very strange.
I have also tried increasing the number of workers to 6 and and still using 5 batches of 1000 rows as well as 6 batches of 1000 rows. Both failed with this same error.
I know this description might not be helpful without a reproducible example but I figured it was worth posting in case that could point you in the right direction somehow.
Thank you for your help!
As an update:
I tried upgrading the versions of dask==2022.2.0 and dask-ml==2022.5.27 on the client and workers and re-running my code. I no longer receive the attribute error mentioned above but now I receive a different error on the fitting of the logistic regression: "ValueError: shapes (0,) and (739,) not aligned: 0 (dim 0) != 739 (dim 0)". I have printed out the shapes of both arrays before fitting and their shapes do align and neither of them have zero dimensions.
It seems like there might be some sort of communication issue with the dask workers or something else that I do not understand. This code also works perfectly fine with less than 5 partitions of data.
I will try and open up a new issue for this error since it sounds like it is different but could still be related.
Here is the stack trace for this error if it is helpful:
File "xy_querying_data.py", line 346, in run
lr.fit(X_train_arr, y_train_arr)
File "/opt/conda/lib/python3.7/site-packages/dask_ml/linear_model/glm.py", line 188, in fit
self._coef = algorithms._solvers[self.solver](X, y, **solver_kwargs)
File "/opt/conda/lib/python3.7/site-packages/dask_glm/utils.py", line 26, in normalize_inputs
out = algo(Xn, y, *args, **kwargs).copy()
File "/opt/conda/lib/python3.7/site-packages/dask_glm/algorithms.py", line 265, in admm
new_betas = np.array(da.compute(*new_betas))
File "/opt/conda/lib/python3.7/site-packages/dask/base.py", line 573, in compute
results = schedule(dsk, keys, **kwargs)
File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 2994, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 2152, in gather
asynchronous=asynchronous,
File "/opt/conda/lib/python3.7/site-packages/distributed/utils.py", line 310, in sync
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
File "/opt/conda/lib/python3.7/site-packages/distributed/utils.py", line 376, in sync
raise exc.with_traceback(tb)
File "/opt/conda/lib/python3.7/site-packages/distributed/utils.py", line 349, in f
result = yield future
File "/opt/conda/lib/python3.7/site-packages/tornado/gen.py", line 769, in run
value = future.result()
File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 2009, in _gather
raise exception.with_traceback(traceback)
File "/opt/conda/lib/python3.7/site-packages/dask/utils.py", line 39, in apply
return func(*args, **kwargs)
File "/opt/conda/lib/python3.7/site-packages/dask_glm/algorithms.py", line 300, in local_update
maxfun=250)
File "/opt/conda/lib/python3.7/site-packages/scipy/optimize/lbfgsb.py", line 198, in fmin_l_bfgs_b
**opts)
File "/opt/conda/lib/python3.7/site-packages/scipy/optimize/lbfgsb.py", line 308, in _minimize_lbfgsb
finite_diff_rel_step=finite_diff_rel_step)
File "/opt/conda/lib/python3.7/site-packages/scipy/optimize/optimize.py", line 262, in _prepare_scalar_function
finite_diff_rel_step, bounds, epsilon=epsilon)
File "/opt/conda/lib/python3.7/site-packages/scipy/optimize/_differentiable_functions.py", line 140, in __init__
self._update_fun()
File "/opt/conda/lib/python3.7/site-packages/scipy/optimize/_differentiable_functions.py", line 233, in _update_fun
self._update_fun_impl()
File "/opt/conda/lib/python3.7/site-packages/scipy/optimize/_differentiable_functions.py", line 137, in update_fun
self.f = fun_wrapped(self.x)
File "/opt/conda/lib/python3.7/site-packages/scipy/optimize/_differentiable_functions.py", line 134, in fun_wrapped
return fun(np.copy(x), *args)
File "/opt/conda/lib/python3.7/site-packages/dask_glm/algorithms.py", line 234, in wrapped
return func(beta, X, y) + (rho / 2) * np.dot(beta - z + u,
File "/opt/conda/lib/python3.7/site-packages/dask_glm/families.py", line 31, in pointwise_loss
return Logistic.loglike(Xbeta, y)
File "/opt/conda/lib/python3.7/site-packages/dask_glm/families.py", line 24, in loglike
return (Xbeta + log1p(enXbeta)).sum() - dot(y, Xbeta)
File "/opt/conda/lib/python3.7/site-packages/multipledispatch/dispatcher.py", line 278, in __call__
return func(*args, **kwargs)
File "/opt/conda/lib/python3.7/site-packages/dask_glm/utils.py", line 126, in dot
return np.dot(A, B)
File "<__array_function__ internals>", line 6, in dot
ValueError: shapes (0,) and (739,) not aligned: 0 (dim 0) != 739 (dim 0)
What happened: In order to use dask-ml models to train on a dask DataFrame, the DataFrame must be converted to a dask array.
When calling .fit on LinearRegression or LogisticRegression, I'm receiving the following output from the dask cluster:
Followed by the following exception being thrown:
What you expected to happen:
Minimal Complete Verifiable Example: Unfortunately, I'm unable to come up with a minimal example that replicates the existing behavior. The DataFrame above is going through several steps before arriving at the point of being converted to an array (being published in a cluster, having the index reset, potentially undergoing several transformations via task_graphs and delayed(func) to .compute() calls). I'm including a basic example of what's happening above to demonstrate, but when doing this in a dask LocalCluster separate from the other environment I don't see the same issue.
Environment: