diff --git a/dask_glm/algorithms.py b/dask_glm/algorithms.py index 981bdae..0302204 100644 --- a/dask_glm/algorithms.py +++ b/dask_glm/algorithms.py @@ -15,12 +15,16 @@ from __future__ import absolute_import, division, print_function -from dask import delayed, persist, compute import functools -import numpy as np +import operator + +import dask +from dask import delayed, persist, compute, sharedict +from dask.utils import ensure_dict import dask.array as da +import numpy as np from scipy.optimize import fmin_l_bfgs_b - +import toolz from dask_glm.utils import dot, exp, log1p from dask_glm.families import Logistic @@ -152,7 +156,6 @@ def newton(X, y, max_iter=50, tol=1e-8, family=Logistic): def admm(X, y, regularizer=L1, lamduh=0.1, rho=1, over_relax=1, max_iter=250, abstol=1e-4, reltol=1e-2, family=Logistic): - pointwise_loss = family.pointwise_loss pointwise_gradient = family.pointwise_gradient regularizer = _regularizers.get(regularizer, regularizer) # string @@ -173,15 +176,18 @@ def wrapped(beta, X, y, z, u, rho): f = create_local_f(pointwise_loss) fprime = create_local_gradient(pointwise_gradient) - nchunks = getattr(X, 'npartitions', 1) - # nchunks = X.npartitions + try: + nchunks = len(X.chunks[0]) + except AttributeError: # NumPy array input + nchunks = 1 + (n, p) = X.shape - # XD = X.to_delayed().flatten().tolist() - # yD = y.to_delayed().flatten().tolist() + if isinstance(X, da.Array): - XD = X.rechunk((None, X.shape[-1])).to_delayed().flatten().tolist() + XD = X.to_delayed().tolist() else: XD = [X] + if isinstance(y, da.Array): yD = y.rechunk((None, y.shape[-1])).to_delayed().flatten().tolist() else: @@ -192,7 +198,6 @@ def wrapped(beta, X, y, z, u, rho): betas = np.array([np.ones(p) for i in range(nchunks)]) for k in range(max_iter): - # x-update step new_betas = [delayed(local_update)(xx, yy, bb, z, uu, rho, f=f, fprime=fprime) for @@ -219,21 +224,47 @@ def wrapped(beta, X, y, z, u, rho): reltol * np.linalg.norm(rho * u) if primal_res < eps_pri and dual_res < eps_dual: - print("Converged!", k) break return z def local_update(X, y, beta, z, u, rho, f, fprime, solver=fmin_l_bfgs_b): - beta = beta.ravel() u = u.ravel() z = z.ravel() - solver_args = (X, y, z, u, rho) - beta, f, d = solver(f, beta, fprime=fprime, args=solver_args, - maxiter=200, - maxfun=250) + + if len(X) > 1: + # Construct dask graph for computation + X = [da.from_array(x, chunks=x.shape, name=False, + getitem=operator.getitem) for x in X] + X = da.concatenate(X, axis=1).persist(get=dask.get) + beta = da.from_array(beta, chunks=beta.shape, name=False, + getitem=operator.getitem).persist(get=dask.get) + ff = f(beta, X, y, z, u, rho) + ffprime = fprime(beta, X, y, z, u, rho) + ff, ffprime = dask.delayed(ff), dask.delayed(ffprime) + dsk = ensure_dict(sharedict.merge(ff.dask, ffprime.dask)) + + beta_key = beta._keys()[0] + def f2(beta): + """ Reuse existing graph, just swap in new beta and compute """ + dsk[beta_key] = beta + result, gradient = dask.get(dsk, [ff.key, ffprime.key]) + print(result, gradient) + return result, gradient + + solver_args = () + else: + X = X[0] + solver_args = (X, y, z, u, rho) + + def f2(beta, *args): + result, gradient = f(beta, *args), fprime(beta, *args) + print(result, gradient) + return result, gradient + + beta, _, _ = solver(f2, beta, args=solver_args, maxiter=200, maxfun=250) return beta @@ -371,14 +402,16 @@ def proximal_grad(X, y, regularizer=L1, lamduh=0.1, family=Logistic, break stepSize *= backtrackMult if stepSize == 0: - print('No more progress') + if verbose: + print('No more progress') break df /= max(func, lf) db = 0 if verbose: print('%2d %.6e %9.2e %.2e %.1e' % (k + 1, func, df, db, stepSize)) if df < tol: - print('Converged') + if verbose: + print('Converged') break stepSize *= stepGrowth backtrackMult = nextBacktrackMult diff --git a/dask_glm/tests/test_algos_families.py b/dask_glm/tests/test_algos_families.py index 5c75f83..9f56fb3 100644 --- a/dask_glm/tests/test_algos_families.py +++ b/dask_glm/tests/test_algos_families.py @@ -87,18 +87,20 @@ def test_basic_unreg_descent(func, kwargs, N, nchunks, family): ]) @pytest.mark.parametrize('N', [1000]) @pytest.mark.parametrize('nchunks', [1, 10]) +@pytest.mark.parametrize('mchunks', [1, 2]) @pytest.mark.parametrize('family', [Logistic, Normal]) @pytest.mark.parametrize('lam', [0.01, 1.2, 4.05]) @pytest.mark.parametrize('reg', [L1, L2]) -def test_basic_reg_descent(func, kwargs, N, nchunks, family, lam, reg): +def test_basic_reg_descent(func, kwargs, N, nchunks, mchunks, family, lam, reg): beta = np.random.normal(size=2) M = len(beta) - X = da.random.random((N, M), chunks=(N // nchunks, M)) + X = da.random.random((N, M), chunks=(N // nchunks, M // mchunks)) y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,)) X, y = persist(X, y) - result = func(X, y, family=family, lamduh=lam, regularizer=reg, **kwargs) + with dask.set_options(get=dask.get): # for easy debugging when errors occur + result = func(X, y, family=family, lamduh=lam, regularizer=reg, **kwargs) test_vec = np.random.normal(size=2) f = reg.add_reg_f(family.pointwise_loss, lam) diff --git a/dask_glm/utils.py b/dask_glm/utils.py index e3a9919..687a4ed 100644 --- a/dask_glm/utils.py +++ b/dask_glm/utils.py @@ -6,6 +6,7 @@ import dask.array as da import numpy as np from multipledispatch import dispatch +import toolz def sigmoid(x): @@ -81,7 +82,7 @@ def log1p(A): @dispatch(object, object) def dot(A, B): x = max([A, B], key=lambda x: getattr(x, '__array_priority__', 0)) - module = package_of(x) + module = package_of(type(x)) return module.dot(A, B) @@ -155,13 +156,14 @@ def exp(x): return np.exp(x.todense()) -def package_of(obj): +@toolz.memoize +def package_of(typ): """ Return package containing object's definition Or return None if not found """ # http://stackoverflow.com/questions/43462701/get-package-of-python-object/43462865#43462865 - mod = inspect.getmodule(obj) + mod = inspect.getmodule(typ) if not mod: return base, _sep, _stem = mod.__name__.partition('.')