I am trying to fit a linear regression model from a dask dataframe because my data will not fit into local memory.

import sklearn.datasets as sk_datasets
import dask.dataframe as dd
X, y = sk_datasets.make_classification(n_samples=10000, n_informative=12, 
                                       n_redundant=18, n_features=30)
df_x = dd.from_array(X)
df_y = dd.from_array(y)
dask_x = df_x.values
dask_y = df_y.values
lr = LinearRegression(fit_intercept=True)
lr.fit(dask_x, dask_y)

This throws:

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/dask_glm/utils.py in add_intercept(X)
    145 def add_intercept(X):
    146     if np.isnan(np.sum(X.shape)):
--> 147         raise NotImplementedError("Can not add intercept to array with "
    148                                   "unknown chunk shape")
    149     j, k = X.chunks

NotImplementedError: Can not add intercept to array with unknown chunk shape
lr = LinearRegression(fit_intercept=False)
lr.fit(dask_x, dask_y)

This throws:

AttributeError                            Traceback (most recent call last)
<ipython-input-35-fedc331fe3fb> in <module>()
      1 lr = LinearRegression(fit_intercept=False)
----> 3 lr.fit(X_values, y_values)

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/dask_ml/linear_model/glm.py in fit(self, X, y)
    155         solver_kwargs = self._get_solver_kwargs()
--> 157         self._coef = algorithms._solvers[self.solver](X, y, **solver_kwargs)
    158         if self.fit_intercept:
    159             self.coef_ = self._coef[:-1]

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/dask_glm/utils.py in normalize_inputs(X, y, *args, **kwargs)
     24             mean = mean if len(intercept_idx[0]) else np.zeros(mean.shape)
     25             Xn = (X - mean) / std
---> 26             out = algo(Xn, y, *args, **kwargs).copy()
     27             i_adj = np.sum(out * mean / std)
     28             out[intercept_idx] -= i_adj

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/dask_glm/algorithms.py in admm(X, y, regularizer, lamduh, rho, over_relax, max_iter, abstol, reltol, family, **kwargs)
    262                                            fprime=fprime) for
    263                      xx, yy, bb, uu in zip(XD, yD, betas, u)]
--> 264         new_betas = np.array(da.compute(*new_betas))
    266         beta_hat = over_relax * new_betas + (1 - over_relax) * z

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    400     keys = [x.__dask_keys__() for x in collections]
    401     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 402     results = schedule(dsk, keys, **kwargs)
    403     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs)
     73     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     74                         cache=cache, get_id=_thread_get_id,
---> 75                         pack_exception=pack_exception, **kwargs)
     77     # Cleanup pools associated to dead threads

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    503                         _execute_task(task, data)  # Re-execute locally
    504                     else:
--> 505                         raise_exception(exc, tb)
    506                 res, worker_id = loads(res_info)
    507                 state['cache'][key] = res

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb)
     67         if exc.__traceback__ is not tb:
     68             raise exc.with_traceback(tb)
---> 69         raise exc
     71 else:

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    272     try:
    273         task, data = loads(task_info)
--> 274         result = _execute_task(task, data)
    275         id = get_id()
    276         result = dumps((result, id))

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/dask/local.py in _execute_task(arg, cache, dsk)
    253         func, args = arg[0], arg[1:]
    254         args2 = [_execute_task(a, cache) for a in args]
--> 255         return func(*args2)
    256     elif not ishashable(arg):
    257         return arg

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/dask/compatibility.py in apply(func, args, kwargs)
     48     def apply(func, args, kwargs=None):
     49         if kwargs:
---> 50             return func(*args, **kwargs)
     51         else:
     52             return func(*args)

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/dask_glm/algorithms.py in local_update(X, y, beta, z, u, rho, f, fprime, solver)
    297     beta, f, d = solver(f, beta, fprime=fprime, args=solver_args,
    298                         maxiter=200,
--> 299                         maxfun=250)
    301     return beta

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/scipy/optimize/lbfgsb.py in fmin_l_bfgs_b(func, x0, fprime, args, approx_grad, bounds, m, factr, pgtol, epsilon, iprint, maxfun, maxiter, disp, callback, maxls)
    198     res = _minimize_lbfgsb(fun, x0, args=args, jac=jac, bounds=bounds,
--> 199                            **opts)
    200     d = {'grad': res['jac'],
    201          'task': res['message'],

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/scipy/optimize/lbfgsb.py in _minimize_lbfgsb(fun, x0, args, jac, bounds, disp, maxcor, ftol, gtol, eps, maxfun, maxiter, iprint, callback, maxls, **unknown_options)
    333             # until the completion of the current minimization iteration.
    334             # Overwrite f and g:
--> 335             f, g = func_and_grad(x)
    336         elif task_str.startswith(b'NEW_X'):
    337             # new iteration

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/scipy/optimize/lbfgsb.py in func_and_grad(x)
    283     else:
    284         def func_and_grad(x):
--> 285             f = fun(x, *args)
    286             g = jac(x, *args)
    287             return f, g

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/scipy/optimize/optimize.py in function_wrapper(*wrapper_args)
    291     def function_wrapper(*wrapper_args):
    292         ncalls[0] += 1
--> 293         return function(*(wrapper_args + args))
    295     return ncalls, function_wrapper

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/dask_glm/algorithms.py in wrapped(beta, X, y, z, u, rho)
    231         @functools.wraps(func)
    232         def wrapped(beta, X, y, z, u, rho):
--> 233             return func(beta, X, y) + (rho / 2) * np.dot(beta - z + u,
    234                                                          beta - z + u)
    235         return wrapped

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/dask_glm/families.py in pointwise_loss(beta, X, y)
     62     @staticmethod
     63     def pointwise_loss(beta, X, y):
---> 64         beta, y = beta.ravel(), y.ravel()
     65         Xbeta = X.dot(beta)
     66         return Normal.loglike(Xbeta, y)

AttributeError: 'tuple' object has no attribute 'ravel'
js3711 commented 6 years ago
TomAugspurger commented 6 years ago

Just using df_x and df_y works, correct?

some context: dask dataframe doesn't know its own length,so doing df_x.values results in a dask array with unknown length. We can't concatenate an array of ones to X in that case, since we don't know how long to make the ones.

I plan to implement something like https://github.com/dask/dask/issues/3090 later today. There's the related https://github.com/dask/dask/issues/3293 issue.

mrocklin commented 6 years ago

Would it be possible for dask-ml to add a simple intercept term without having to fully compute things? This seems like the sort of thing that should be possible with map_blocks and a custom function. This seems common enough that forcing computation might be considered a usability bug.

On Mon, Jul 30, 2018 at 9:47 AM, Tom Augspurger notifications@github.com wrote:

Just using df_x and df_y works, correct?

some context: dask dataframe doesn't know its own length,so doing df_x.values results in a dask array with unknown length. We can't concatenate an array of ones to X in that case, since we don't know how long to make the ones.

I plan to implement something like dask/dask#3090 https://github.com/dask/dask/issues/3090 later today. There's the related dask/dask#3293 https://github.com/dask/dask/issues/3293 issue.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/dask-ml/issues/325#issuecomment-408931936, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszCPSyrINhlHmxUrZZiBKOcwbUVCVks5uLzixgaJpZM4Vmwy_ .

TomAugspurger commented 6 years ago

Yeah, using map_blocks should be sufficient here. I can take a look at that now.

js3711 commented 6 years ago

I can perform operations on df_x and df_y but:

lr = LinearRegression(fit_intercept=True)
lr.fit(df_x, df_y)

TypeError                                 Traceback (most recent call last)
<ipython-input-66-7f17fbaf27c3> in <module>()
      1 lr = LinearRegression(fit_intercept=True)
----> 3 lr.fit(df_x, df_y)

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/dask_ml/linear_model/glm.py in fit(self, X, y)
    151         self : objectj
    152         """
--> 153         X = self._check_array(X)
    155         solver_kwargs = self._get_solver_kwargs()

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/dask_ml/linear_model/glm.py in _check_array(self, X)
    167             X = add_intercept(X)
--> 169         return check_array(X, accept_unknown_chunks=True)

~/anaconda3/envs/correlation_exploration/lib/python3.6/site-packages/dask_ml/utils.py in check_array(array, *args, **kwargs)
    139     elif isinstance(array, dd.DataFrame):
    140         if not accept_dask_dataframe:
--> 141             raise TypeError
    143         # TODO: sample?

TomAugspurger commented 6 years ago

Fixed on master @js3711. Thanks for the report.