From a928a2622080e9dd3a596696e18a72b19ebd23f4 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sat, 29 Apr 2017 09:21:46 -0400 Subject: [PATCH 1/4] Use dask.arrays inside admm, supporting mixed sparse-dense inputs Previously admm would rechunk the columns to be in a single chunk, and then pass delayed numpy arrays to the local_update function. If the chunks along columns were of different types, like a numpy array and a sparse array, then these would be inefficiently coerced to a single type. Now we pass a list of numpy arrays to the local_update function. If this list has more than one element then we construct a local dask.array so that operations like dot do the right thing and call two different local dot functions, one for each type. --- dask_glm/algorithms.py | 44 +++++++++++++++++++-------- dask_glm/tests/test_algos_families.py | 8 +++-- 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/dask_glm/algorithms.py b/dask_glm/algorithms.py index 981bdae..8635751 100644 --- a/dask_glm/algorithms.py +++ b/dask_glm/algorithms.py @@ -15,12 +15,15 @@ from __future__ import absolute_import, division, print_function -from dask import delayed, persist, compute import functools -import numpy as np +import operator + +import dask +from dask import delayed, persist, compute import dask.array as da +import numpy as np from scipy.optimize import fmin_l_bfgs_b - +import toolz from dask_glm.utils import dot, exp, log1p from dask_glm.families import Logistic @@ -173,15 +176,20 @@ def wrapped(beta, X, y, z, u, rho): f = create_local_f(pointwise_loss) fprime = create_local_gradient(pointwise_gradient) - nchunks = getattr(X, 'npartitions', 1) - # nchunks = X.npartitions + try: + nchunks = len(X.chunks[0]) + except AttributeError: # NumPy array input + nchunks = 1 + (n, p) = X.shape - # XD = X.to_delayed().flatten().tolist() - # yD = y.to_delayed().flatten().tolist() + if isinstance(X, da.Array): - XD = X.rechunk((None, X.shape[-1])).to_delayed().flatten().tolist() + XD = X.to_delayed().tolist() + if len(XD[0]) == 1: + XD = [x[0] for x in XD] else: XD = [X] + if isinstance(y, da.Array): yD = y.rechunk((None, y.shape[-1])).to_delayed().flatten().tolist() else: @@ -219,19 +227,29 @@ def wrapped(beta, X, y, z, u, rho): reltol * np.linalg.norm(rho * u) if primal_res < eps_pri and dual_res < eps_dual: - print("Converged!", k) break return z +def maybe_compute(x): + return dask.compute(x)[0] + + def local_update(X, y, beta, z, u, rho, f, fprime, solver=fmin_l_bfgs_b): + if isinstance(X, list): + X = [da.from_array(x, chunks=x.shape, name=False, + getitem=operator.getitem) for x in X] + X = da.concatenate(X, axis=1) + + f2 = toolz.compose(maybe_compute, f) + fprime2 = toolz.compose(maybe_compute, fprime) beta = beta.ravel() u = u.ravel() z = z.ravel() solver_args = (X, y, z, u, rho) - beta, f, d = solver(f, beta, fprime=fprime, args=solver_args, + beta, f, d = solver(f2, beta, fprime=fprime2, args=solver_args, maxiter=200, maxfun=250) @@ -371,14 +389,16 @@ def proximal_grad(X, y, regularizer=L1, lamduh=0.1, family=Logistic, break stepSize *= backtrackMult if stepSize == 0: - print('No more progress') + if verbose: + print('No more progress') break df /= max(func, lf) db = 0 if verbose: print('%2d %.6e %9.2e %.2e %.1e' % (k + 1, func, df, db, stepSize)) if df < tol: - print('Converged') + if verbose: + print('Converged') break stepSize *= stepGrowth backtrackMult = nextBacktrackMult diff --git a/dask_glm/tests/test_algos_families.py b/dask_glm/tests/test_algos_families.py index 5c75f83..9f56fb3 100644 --- a/dask_glm/tests/test_algos_families.py +++ b/dask_glm/tests/test_algos_families.py @@ -87,18 +87,20 @@ def test_basic_unreg_descent(func, kwargs, N, nchunks, family): ]) @pytest.mark.parametrize('N', [1000]) @pytest.mark.parametrize('nchunks', [1, 10]) +@pytest.mark.parametrize('mchunks', [1, 2]) @pytest.mark.parametrize('family', [Logistic, Normal]) @pytest.mark.parametrize('lam', [0.01, 1.2, 4.05]) @pytest.mark.parametrize('reg', [L1, L2]) -def test_basic_reg_descent(func, kwargs, N, nchunks, family, lam, reg): +def test_basic_reg_descent(func, kwargs, N, nchunks, mchunks, family, lam, reg): beta = np.random.normal(size=2) M = len(beta) - X = da.random.random((N, M), chunks=(N // nchunks, M)) + X = da.random.random((N, M), chunks=(N // nchunks, M // mchunks)) y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,)) X, y = persist(X, y) - result = func(X, y, family=family, lamduh=lam, regularizer=reg, **kwargs) + with dask.set_options(get=dask.get): # for easy debugging when errors occur + result = func(X, y, family=family, lamduh=lam, regularizer=reg, **kwargs) test_vec = np.random.normal(size=2) f = reg.add_reg_f(family.pointwise_loss, lam) From 49c2f561389078e172389096686b07cf76050021 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sat, 29 Apr 2017 09:30:51 -0400 Subject: [PATCH 2/4] simplify local_update array handling --- dask_glm/algorithms.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_glm/algorithms.py b/dask_glm/algorithms.py index 8635751..db32e8d 100644 --- a/dask_glm/algorithms.py +++ b/dask_glm/algorithms.py @@ -185,8 +185,6 @@ def wrapped(beta, X, y, z, u, rho): if isinstance(X, da.Array): XD = X.to_delayed().tolist() - if len(XD[0]) == 1: - XD = [x[0] for x in XD] else: XD = [X] @@ -237,10 +235,12 @@ def maybe_compute(x): def local_update(X, y, beta, z, u, rho, f, fprime, solver=fmin_l_bfgs_b): - if isinstance(X, list): + if len(X) > 1: X = [da.from_array(x, chunks=x.shape, name=False, getitem=operator.getitem) for x in X] X = da.concatenate(X, axis=1) + else: + X = X[0] f2 = toolz.compose(maybe_compute, f) fprime2 = toolz.compose(maybe_compute, fprime) From 3137811763f9e9ce8f4549ca2adc6e3c02e77c8c Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Mon, 1 May 2017 09:58:41 -0400 Subject: [PATCH 3/4] unite f and fprime computation --- dask_glm/algorithms.py | 15 ++++++--------- dask_glm/utils.py | 8 +++++--- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/dask_glm/algorithms.py b/dask_glm/algorithms.py index db32e8d..ff1cbb1 100644 --- a/dask_glm/algorithms.py +++ b/dask_glm/algorithms.py @@ -230,10 +230,6 @@ def wrapped(beta, X, y, z, u, rho): return z -def maybe_compute(x): - return dask.compute(x)[0] - - def local_update(X, y, beta, z, u, rho, f, fprime, solver=fmin_l_bfgs_b): if len(X) > 1: X = [da.from_array(x, chunks=x.shape, name=False, @@ -242,16 +238,17 @@ def local_update(X, y, beta, z, u, rho, f, fprime, solver=fmin_l_bfgs_b): else: X = X[0] - f2 = toolz.compose(maybe_compute, f) - fprime2 = toolz.compose(maybe_compute, fprime) + def f2(x, *args): + f_eval = f(x, *args) + fprime_eval = fprime(x, *args) + f_eval, fprime_eval = dask.compute(f_eval, fprime_eval) + return f_eval, fprime_eval beta = beta.ravel() u = u.ravel() z = z.ravel() solver_args = (X, y, z, u, rho) - beta, f, d = solver(f2, beta, fprime=fprime2, args=solver_args, - maxiter=200, - maxfun=250) + beta, f, d = solver(f2, beta, args=solver_args, maxiter=200, maxfun=250) return beta diff --git a/dask_glm/utils.py b/dask_glm/utils.py index e3a9919..687a4ed 100644 --- a/dask_glm/utils.py +++ b/dask_glm/utils.py @@ -6,6 +6,7 @@ import dask.array as da import numpy as np from multipledispatch import dispatch +import toolz def sigmoid(x): @@ -81,7 +82,7 @@ def log1p(A): @dispatch(object, object) def dot(A, B): x = max([A, B], key=lambda x: getattr(x, '__array_priority__', 0)) - module = package_of(x) + module = package_of(type(x)) return module.dot(A, B) @@ -155,13 +156,14 @@ def exp(x): return np.exp(x.todense()) -def package_of(obj): +@toolz.memoize +def package_of(typ): """ Return package containing object's definition Or return None if not found """ # http://stackoverflow.com/questions/43462701/get-package-of-python-object/43462865#43462865 - mod = inspect.getmodule(obj) + mod = inspect.getmodule(typ) if not mod: return base, _sep, _stem = mod.__name__.partition('.') From 0164b3f47afb688f502eddd93c5b4e7ceb3546e1 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Tue, 2 May 2017 12:02:17 -0400 Subject: [PATCH 4/4] Construct graph once, evaluate many times This create a dask graph for the local_update computation once and then hands the solver a function that just puts in a new beta and evaluates with the single threaded scheduler. Currently this fails to converge. --- dask_glm/algorithms.py | 44 ++++++++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/dask_glm/algorithms.py b/dask_glm/algorithms.py index ff1cbb1..0302204 100644 --- a/dask_glm/algorithms.py +++ b/dask_glm/algorithms.py @@ -19,7 +19,8 @@ import operator import dask -from dask import delayed, persist, compute +from dask import delayed, persist, compute, sharedict +from dask.utils import ensure_dict import dask.array as da import numpy as np from scipy.optimize import fmin_l_bfgs_b @@ -155,7 +156,6 @@ def newton(X, y, max_iter=50, tol=1e-8, family=Logistic): def admm(X, y, regularizer=L1, lamduh=0.1, rho=1, over_relax=1, max_iter=250, abstol=1e-4, reltol=1e-2, family=Logistic): - pointwise_loss = family.pointwise_loss pointwise_gradient = family.pointwise_gradient regularizer = _regularizers.get(regularizer, regularizer) # string @@ -198,7 +198,6 @@ def wrapped(beta, X, y, z, u, rho): betas = np.array([np.ones(p) for i in range(nchunks)]) for k in range(max_iter): - # x-update step new_betas = [delayed(local_update)(xx, yy, bb, z, uu, rho, f=f, fprime=fprime) for @@ -231,24 +230,41 @@ def wrapped(beta, X, y, z, u, rho): def local_update(X, y, beta, z, u, rho, f, fprime, solver=fmin_l_bfgs_b): + beta = beta.ravel() + u = u.ravel() + z = z.ravel() + if len(X) > 1: + # Construct dask graph for computation X = [da.from_array(x, chunks=x.shape, name=False, getitem=operator.getitem) for x in X] - X = da.concatenate(X, axis=1) + X = da.concatenate(X, axis=1).persist(get=dask.get) + beta = da.from_array(beta, chunks=beta.shape, name=False, + getitem=operator.getitem).persist(get=dask.get) + ff = f(beta, X, y, z, u, rho) + ffprime = fprime(beta, X, y, z, u, rho) + ff, ffprime = dask.delayed(ff), dask.delayed(ffprime) + dsk = ensure_dict(sharedict.merge(ff.dask, ffprime.dask)) + + beta_key = beta._keys()[0] + def f2(beta): + """ Reuse existing graph, just swap in new beta and compute """ + dsk[beta_key] = beta + result, gradient = dask.get(dsk, [ff.key, ffprime.key]) + print(result, gradient) + return result, gradient + + solver_args = () else: X = X[0] + solver_args = (X, y, z, u, rho) - def f2(x, *args): - f_eval = f(x, *args) - fprime_eval = fprime(x, *args) - f_eval, fprime_eval = dask.compute(f_eval, fprime_eval) - return f_eval, fprime_eval + def f2(beta, *args): + result, gradient = f(beta, *args), fprime(beta, *args) + print(result, gradient) + return result, gradient - beta = beta.ravel() - u = u.ravel() - z = z.ravel() - solver_args = (X, y, z, u, rho) - beta, f, d = solver(f2, beta, args=solver_args, maxiter=200, maxfun=250) + beta, _, _ = solver(f2, beta, args=solver_args, maxiter=200, maxfun=250) return beta