From 876897f5b7d39685f1dc925a8db04aa604dd8de2 Mon Sep 17 00:00:00 2001 From: Scott Sievert Date: Sun, 11 Mar 2018 16:25:00 -0500 Subject: [PATCH 1/5] ENH: Basic implementation of SGD BUG: dataframe size info not exact and indexing needed squash Getting rid of ASGD --- dask_glm/algorithms.py | 66 +++++++++++++++++++++++++++++++++++++++++- dask_glm/estimators.py | 7 +++-- 2 files changed, 69 insertions(+), 4 deletions(-) diff --git a/dask_glm/algorithms.py b/dask_glm/algorithms.py index fe92cbc..7cf270d 100644 --- a/dask_glm/algorithms.py +++ b/dask_glm/algorithms.py @@ -6,7 +6,9 @@ from dask import delayed, persist, compute, set_options import functools import numpy as np +import numpy.linalg as LA import dask.array as da +import dask.dataframe as dd from scipy.optimize import fmin_l_bfgs_b @@ -138,6 +140,67 @@ def gradient_descent(X, y, max_iter=100, tol=1e-14, family=Logistic, **kwargs): return beta +def _choose_step_sgd(initial, k): + return initial / (k + 1) + + +@normalize +def sgd(X, y, max_iter=1e3, tol=1e-2, family=Logistic, batch_size=64, + initial_step=1.0, **kwargs): + """Stochastic Gradient Descent. + + Parameters + ---------- + X : array-like, shape (n_samples, n_features) + y : array-like, shape (n_samples,) + max_iter : int, float + maximum number of iterations to attempt before declaring + failure to converge + tol : float + Maximum allowed change from prior iteration required to + declare convergence + batch_size : int + The batch size used to approximate the gradient. Larger batch sizes + will approximate the gradient better. + initial_step : float + Initial step size used in the optimization. The step size decays like + initial_step/(1 + iter_count). + family : Family + + Returns + ------- + beta : array-like, shape (n_features,) + """ + gradient = family.gradient + n, p = X.shape + if np.isnan(n): + raise ValueError('SGD needs shape information to allow indexing. ' + 'Possible by passing a computed array in (`X.compute()` ' + 'or `X.values.compute()`), then doing using ' + '`dask.array.from_array ') + + beta = np.zeros(p) + + iter_count = 0 + converged = False + + while not converged: + beta_old = beta.copy() + iter_count += 1 + + i = np.random.choice(n, size=(batch_size,)) + Xbeta = dot(X[i], beta) + + grad = gradient(Xbeta, X[i], y[i]).compute() + + beta -= _choose_step_sgd(initial_step, iter_count) * grad / batch_size + + rel_error = LA.norm(beta_old - beta) / LA.norm(beta) + converged = (rel_error < tol) or (iter_count > max_iter) + + return beta + + @normalize def newton(X, y, max_iter=50, tol=1e-8, family=Logistic, **kwargs): """Newtons Method for Logistic Regression. @@ -430,5 +493,6 @@ def proximal_grad(X, y, regularizer='l1', lamduh=0.1, family=Logistic, 'gradient_descent': gradient_descent, 'newton': newton, 'lbfgs': lbfgs, - 'proximal_grad': proximal_grad + 'proximal_grad': proximal_grad, + 'sgd': sgd } diff --git a/dask_glm/estimators.py b/dask_glm/estimators.py index aa05425..4833b05 100644 --- a/dask_glm/estimators.py +++ b/dask_glm/estimators.py @@ -32,7 +32,7 @@ def family(self): def __init__(self, fit_intercept=True, solver='admm', regularizer='l2', max_iter=100, tol=1e-4, lamduh=1.0, rho=1, - over_relax=1, abstol=1e-4, reltol=1e-2): + over_relax=1, abstol=1e-4, reltol=1e-2, **kwargs): self.fit_intercept = fit_intercept self.solver = solver self.regularizer = regularizer @@ -61,9 +61,10 @@ def __init__(self, fit_intercept=True, solver='admm', regularizer='l2', self._fit_kwargs = {k: getattr(self, k) for k in fit_kwargs} - def fit(self, X, y=None): + def fit(self, X, y=None, **kwargs): X_ = self._maybe_add_intercept(X) - self._coef = algorithms._solvers[self.solver](X_, y, **self._fit_kwargs) + self._coef = algorithms._solvers[self.solver](X_, y, **self._fit_kwargs, + **kwargs) if self.fit_intercept: self.coef_ = self._coef[:-1] From b6963af6e7a01115b93b30f0a2dc659dc5ef0e00 Mon Sep 17 00:00:00 2001 From: Scott Sievert Date: Thu, 22 Mar 2018 09:52:53 -0500 Subject: [PATCH 2/5] ENH: clean up SGD implementation --- dask_glm/algorithms.py | 74 +++++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 29 deletions(-) diff --git a/dask_glm/algorithms.py b/dask_glm/algorithms.py index 7cf270d..4a583b4 100644 --- a/dask_glm/algorithms.py +++ b/dask_glm/algorithms.py @@ -140,22 +140,17 @@ def gradient_descent(X, y, max_iter=100, tol=1e-14, family=Logistic, **kwargs): return beta -def _choose_step_sgd(initial, k): - return initial / (k + 1) - - @normalize -def sgd(X, y, max_iter=1e3, tol=1e-2, family=Logistic, batch_size=64, - initial_step=1.0, **kwargs): +def sgd(X, y, epochs=100, tol=1e-3, family=Logistic, batch_size=64, + initial_step=1e-4, callback=None, average=True): """Stochastic Gradient Descent. Parameters ---------- X : array-like, shape (n_samples, n_features) y : array-like, shape (n_samples,) - max_iter : int, float - maximum number of iterations to attempt before declaring - failure to converge + epochs : int, float + maximum number of passes through the dataset tol : float Maximum allowed change from prior iteration required to declare convergence @@ -163,13 +158,19 @@ def sgd(X, y, max_iter=1e3, tol=1e-2, family=Logistic, batch_size=64, The batch size used to approximate the gradient. Larger batch sizes will approximate the gradient better. initial_step : float - Initial step size used in the optimization. The step size decays like - initial_step/(1 + iter_count). + The initial step size. The step size is decays like 1/k. + callback : callable + A callback to call every iteration that accepts keyword arguments + `X`, `y`, `beta`, `grad`, `nit` (number of iterations) and `family` + average : bool + To average the parameters found or not. See [1]_. family : Family Returns ------- beta : array-like, shape (n_features,) + + .. _1: https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Averaging """ gradient = family.gradient n, p = X.shape @@ -180,24 +181,39 @@ def sgd(X, y, max_iter=1e3, tol=1e-2, family=Logistic, batch_size=64, '`dask.array.from_array ') beta = np.zeros(p) - - iter_count = 0 - converged = False - - while not converged: - beta_old = beta.copy() - iter_count += 1 - - i = np.random.choice(n, size=(batch_size,)) - Xbeta = dot(X[i], beta) - - grad = gradient(Xbeta, X[i], y[i]).compute() - - beta -= _choose_step_sgd(initial_step, iter_count) * grad / batch_size - - rel_error = LA.norm(beta_old - beta) / LA.norm(beta) - converged = (rel_error < tol) or (iter_count > max_iter) - + if average: + beta_sum = np.zeros(p) + + nit = 0 + for epoch in range(epochs): + j = np.random.permutation(n) + X = X[j] + y = y[j] + for k in range(n // batch_size): + beta_old = beta.copy() + nit += 1 + + i = slice(batch_size * k, batch_size * (k + 1)) + Xbeta = dot(X[i], beta) + grad = gradient(Xbeta, X[i], y[i]).compute() + + # step_size = O(1/sqrt(k)) from "Non-asymptotic analysis of + # stochastic approximation algorithms for machine learning" by + # Moulines, Eric and Bach, Francis Rsgd + step_size = initial_step / np.sqrt(nit + 1) + beta -= step_size * (n / batch_size) * grad + if average: + beta_sum += beta + if callback: + callback(X=X[i], y=y[i], grad=grad, nit=nit, family=family, + beta=beta if not average else beta_sum / nit) + + rel_error = LA.norm(beta_old - beta) / LA.norm(beta) + converged = (rel_error < tol) or (nit / n > epochs) + if converged: + break + if average: + return beta_sum / nit return beta From b8757350a20eef48d0333f044e97a3bc8fb46e8f Mon Sep 17 00:00:00 2001 From: Scott Sievert Date: Mon, 26 Mar 2018 16:03:08 -0500 Subject: [PATCH 3/5] MAINT: allow user to pass n in kwargs and warn --- dask_glm/algorithms.py | 84 ++++++++++++++++++++++++++++-------------- 1 file changed, 56 insertions(+), 28 deletions(-) diff --git a/dask_glm/algorithms.py b/dask_glm/algorithms.py index 4a583b4..52a4e41 100644 --- a/dask_glm/algorithms.py +++ b/dask_glm/algorithms.py @@ -2,6 +2,8 @@ """ from __future__ import absolute_import, division, print_function +import time +from warnings import warn from dask import delayed, persist, compute, set_options import functools @@ -140,55 +142,85 @@ def gradient_descent(X, y, max_iter=100, tol=1e-14, family=Logistic, **kwargs): return beta +def _get_n(n, kwargs): + if np.isnan(n): + if 'n' in kwargs: + return kwargs.get('n') + raise ValueError('Could not get the number of examples `n`. Pass the ' + 'number of examples in as a keyword argument: ' + '`sgd(..., n=n)`. If using a distributed dataframe, ' + '`sgd(..., n=len(ddf))` works') + return n + + +def _shuffle_blocks(x, seed=42): + rng = np.random.RandomState(seed) + i = rng.permutation(x.shape[0]).astype(int) + y = x[i] + return y + + @normalize def sgd(X, y, epochs=100, tol=1e-3, family=Logistic, batch_size=64, - initial_step=1e-4, callback=None, average=True): - """Stochastic Gradient Descent. + initial_step=1e-4, callback=None, average=True, maxiter=np.inf, **kwargs): + r"""Stochastic Gradient Descent. Parameters ---------- - X : array-like, shape (n_samples, n_features) - y : array-like, shape (n_samples,) - epochs : int, float + X: array - like, shape(n_samples, n_features) + y: array - like, shape(n_samples,) + epochs: int, float maximum number of passes through the dataset - tol : float + tol: float Maximum allowed change from prior iteration required to declare convergence - batch_size : int + batch_size: int The batch size used to approximate the gradient. Larger batch sizes will approximate the gradient better. - initial_step : float - The initial step size. The step size is decays like 1/k. - callback : callable + initial_step: float + The initial step size. The step size is decays like 1 / k. + callback: callable A callback to call every iteration that accepts keyword arguments `X`, `y`, `beta`, `grad`, `nit` (number of iterations) and `family` - average : bool - To average the parameters found or not. See [1]_. - family : Family + average: bool + To average the parameters found or not. See[1]_. + family: Family Returns ------- beta : array-like, shape (n_features,) + Notes + ----- + + The current implementation assumes that the dataset is "well shuffled", or + each block is a representative example of the gradient. More formally, we + assume the gradient approximation is an unbiased approximation regardless + of which block is sampled. + .. _1: https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Averaging """ gradient = family.gradient n, p = X.shape - if np.isnan(n): - raise ValueError('SGD needs shape information to allow indexing. ' - 'Possible by passing a computed array in (`X.compute()` ' - 'or `X.values.compute()`), then doing using ' - '`dask.array.from_array ') + n = _get_n(n, kwargs) beta = np.zeros(p) if average: beta_sum = np.zeros(p) nit = 0 - for epoch in range(epochs): - j = np.random.permutation(n) - X = X[j] - y = y[j] + + # step_size = O(1/sqrt(k)) from "Non-asymptotic analysis of + # stochastic approximation algorithms for machine learning" by + # Moulines, Eric and Bach, Francis Rsgd + # but, this may require many iterations. Using + # step_size = lambda init, nit, decay: init * decay**(nit//n) + # is used in practice but not testing now + step_size = lambda init, nit: init / np.sqrt(nit + 1) + while True: + seed = int(time.time() * 1000) % 2**32 # millisecond timings + X = X.map_blocks(_shuffle_blocks, seed=seed, dtype=X.dtype) + y = y.map_blocks(_shuffle_blocks, seed=seed, dtype=y.dtype) for k in range(n // batch_size): beta_old = beta.copy() nit += 1 @@ -197,11 +229,7 @@ def sgd(X, y, epochs=100, tol=1e-3, family=Logistic, batch_size=64, Xbeta = dot(X[i], beta) grad = gradient(Xbeta, X[i], y[i]).compute() - # step_size = O(1/sqrt(k)) from "Non-asymptotic analysis of - # stochastic approximation algorithms for machine learning" by - # Moulines, Eric and Bach, Francis Rsgd - step_size = initial_step / np.sqrt(nit + 1) - beta -= step_size * (n / batch_size) * grad + beta -= step_size(initial_step, nit) * (n / batch_size) * grad if average: beta_sum += beta if callback: @@ -209,7 +237,7 @@ def sgd(X, y, epochs=100, tol=1e-3, family=Logistic, batch_size=64, beta=beta if not average else beta_sum / nit) rel_error = LA.norm(beta_old - beta) / LA.norm(beta) - converged = (rel_error < tol) or (nit / n > epochs) + converged = (rel_error < tol) or (nit / n > epochs) or (nit > maxiter) if converged: break if average: From d90f70e436825f3671dc71fbdf15496377c70dde Mon Sep 17 00:00:00 2001 From: Scott Sievert Date: Tue, 27 Mar 2018 12:58:03 -0500 Subject: [PATCH 4/5] MAINT: choose random batch. These are some strong assumptions on shuffling... --- dask_glm/algorithms.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dask_glm/algorithms.py b/dask_glm/algorithms.py index 52a4e41..0fe1314 100644 --- a/dask_glm/algorithms.py +++ b/dask_glm/algorithms.py @@ -225,7 +225,8 @@ def sgd(X, y, epochs=100, tol=1e-3, family=Logistic, batch_size=64, beta_old = beta.copy() nit += 1 - i = slice(batch_size * k, batch_size * (k + 1)) + start = np.random.choice(n - batch_size) + i = slice(start, start + batch_size) Xbeta = dot(X[i], beta) grad = gradient(Xbeta, X[i], y[i]).compute() From 2ba82a67e5159399b5eaab00730e3e1923d8b3d3 Mon Sep 17 00:00:00 2001 From: Scott Sievert Date: Thu, 26 Jul 2018 22:45:07 -0500 Subject: [PATCH 5/5] MAINT: rework --- dask_glm/algorithms.py | 85 +++++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 39 deletions(-) diff --git a/dask_glm/algorithms.py b/dask_glm/algorithms.py index 0fe1314..3f63116 100644 --- a/dask_glm/algorithms.py +++ b/dask_glm/algorithms.py @@ -2,16 +2,14 @@ """ from __future__ import absolute_import, division, print_function -import time -from warnings import warn from dask import delayed, persist, compute, set_options import functools import numpy as np import numpy.linalg as LA import dask.array as da -import dask.dataframe as dd from scipy.optimize import fmin_l_bfgs_b +import sklearn.utils.random from dask_glm.utils import dot, normalize @@ -153,16 +151,27 @@ def _get_n(n, kwargs): return n -def _shuffle_blocks(x, seed=42): - rng = np.random.RandomState(seed) +def _shuffle_blocks(x, random_state=None): + rng = sklearn.utils.random.check_random_state(random_state) i = rng.permutation(x.shape[0]).astype(int) y = x[i] return y +def _index_full_to_chunk(index, chunks): + index.sort() + boundaries = np.cumsum(chunks) + out = [] + for lower, upper in zip(boundaries, boundaries[1:]): + i = (lower <= index) & (index < upper) + out += [index[i]] + return out + + @normalize -def sgd(X, y, epochs=100, tol=1e-3, family=Logistic, batch_size=64, - initial_step=1e-4, callback=None, average=True, maxiter=np.inf, **kwargs): +def sgd(X, y, epochs=100, tol=1e-3, family=Logistic, batch_size=1000, + initial_step=1e-4, callback=None, average=True, maxiter=np.inf, + **kwargs): r"""Stochastic Gradient Descent. Parameters @@ -189,26 +198,17 @@ def sgd(X, y, epochs=100, tol=1e-3, family=Logistic, batch_size=64, Returns ------- beta : array-like, shape (n_features,) - - Notes - ----- - - The current implementation assumes that the dataset is "well shuffled", or - each block is a representative example of the gradient. More formally, we - assume the gradient approximation is an unbiased approximation regardless - of which block is sampled. - - .. _1: https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Averaging """ + # import ipdb; ipdb.set_trace() gradient = family.gradient n, p = X.shape n = _get_n(n, kwargs) beta = np.zeros(p) - if average: - beta_sum = np.zeros(p) - - nit = 0 + beta_sum = np.zeros_like(beta) + beta = da.from_array(beta, chunks=p) + beta_sum = da.from_array(beta_sum, chunks=p) + iters = 0 # step_size = O(1/sqrt(k)) from "Non-asymptotic analysis of # stochastic approximation algorithms for machine learning" by @@ -217,33 +217,40 @@ def sgd(X, y, epochs=100, tol=1e-3, family=Logistic, batch_size=64, # step_size = lambda init, nit, decay: init * decay**(nit//n) # is used in practice but not testing now step_size = lambda init, nit: init / np.sqrt(nit + 1) - while True: - seed = int(time.time() * 1000) % 2**32 # millisecond timings - X = X.map_blocks(_shuffle_blocks, seed=seed, dtype=X.dtype) - y = y.map_blocks(_shuffle_blocks, seed=seed, dtype=y.dtype) + rng = np.random.RandomState(42) + finished = False + while not finished: for k in range(n // batch_size): beta_old = beta.copy() - nit += 1 + iters += 1 - start = np.random.choice(n - batch_size) - i = slice(start, start + batch_size) - Xbeta = dot(X[i], beta) - grad = gradient(Xbeta, X[i], y[i]).compute() + i = rng.choice(n, size=batch_size).astype(int) + i.sort() + i = da.from_array(i, chunks=len(i)) + X_batch = X[i] + y_batch = y[i] - beta -= step_size(initial_step, nit) * (n / batch_size) * grad + Xbeta = dot(X_batch, beta) + grad = gradient(Xbeta, X_batch, y_batch) + beta -= step_size(initial_step, iters) * (n / batch_size) * grad if average: beta_sum += beta - if callback: - callback(X=X[i], y=y[i], grad=grad, nit=nit, family=family, - beta=beta if not average else beta_sum / nit) - rel_error = LA.norm(beta_old - beta) / LA.norm(beta) - converged = (rel_error < tol) or (nit / n > epochs) or (nit > maxiter) - if converged: + too_long = (iters / n > epochs) or (iters > maxiter) + if too_long: + finished = True break + if iters % 1000 == 0: + rel_error = da.linalg.norm(beta_old - beta) + rel_error /= da.linalg.norm(beta) + converged = rel_error < tol + if converged.compute(): + finished = True + break + finished = True if average: - return beta_sum / nit - return beta + return beta_sum.compute() / iters + return beta.compute() @normalize