diff --git a/dask_glm/algorithms.py b/dask_glm/algorithms.py index fe92cbc..3f63116 100644 --- a/dask_glm/algorithms.py +++ b/dask_glm/algorithms.py @@ -6,8 +6,10 @@ from dask import delayed, persist, compute, set_options import functools import numpy as np +import numpy.linalg as LA import dask.array as da from scipy.optimize import fmin_l_bfgs_b +import sklearn.utils.random from dask_glm.utils import dot, normalize @@ -138,6 +140,119 @@ def gradient_descent(X, y, max_iter=100, tol=1e-14, family=Logistic, **kwargs): return beta +def _get_n(n, kwargs): + if np.isnan(n): + if 'n' in kwargs: + return kwargs.get('n') + raise ValueError('Could not get the number of examples `n`. Pass the ' + 'number of examples in as a keyword argument: ' + '`sgd(..., n=n)`. If using a distributed dataframe, ' + '`sgd(..., n=len(ddf))` works') + return n + + +def _shuffle_blocks(x, random_state=None): + rng = sklearn.utils.random.check_random_state(random_state) + i = rng.permutation(x.shape[0]).astype(int) + y = x[i] + return y + + +def _index_full_to_chunk(index, chunks): + index.sort() + boundaries = np.cumsum(chunks) + out = [] + for lower, upper in zip(boundaries, boundaries[1:]): + i = (lower <= index) & (index < upper) + out += [index[i]] + return out + + +@normalize +def sgd(X, y, epochs=100, tol=1e-3, family=Logistic, batch_size=1000, + initial_step=1e-4, callback=None, average=True, maxiter=np.inf, + **kwargs): + r"""Stochastic Gradient Descent. + + Parameters + ---------- + X: array - like, shape(n_samples, n_features) + y: array - like, shape(n_samples,) + epochs: int, float + maximum number of passes through the dataset + tol: float + Maximum allowed change from prior iteration required to + declare convergence + batch_size: int + The batch size used to approximate the gradient. Larger batch sizes + will approximate the gradient better. + initial_step: float + The initial step size. The step size is decays like 1 / k. + callback: callable + A callback to call every iteration that accepts keyword arguments + `X`, `y`, `beta`, `grad`, `nit` (number of iterations) and `family` + average: bool + To average the parameters found or not. See[1]_. + family: Family + + Returns + ------- + beta : array-like, shape (n_features,) + """ + # import ipdb; ipdb.set_trace() + gradient = family.gradient + n, p = X.shape + n = _get_n(n, kwargs) + + beta = np.zeros(p) + beta_sum = np.zeros_like(beta) + beta = da.from_array(beta, chunks=p) + beta_sum = da.from_array(beta_sum, chunks=p) + iters = 0 + + # step_size = O(1/sqrt(k)) from "Non-asymptotic analysis of + # stochastic approximation algorithms for machine learning" by + # Moulines, Eric and Bach, Francis Rsgd + # but, this may require many iterations. Using + # step_size = lambda init, nit, decay: init * decay**(nit//n) + # is used in practice but not testing now + step_size = lambda init, nit: init / np.sqrt(nit + 1) + rng = np.random.RandomState(42) + finished = False + while not finished: + for k in range(n // batch_size): + beta_old = beta.copy() + iters += 1 + + i = rng.choice(n, size=batch_size).astype(int) + i.sort() + i = da.from_array(i, chunks=len(i)) + X_batch = X[i] + y_batch = y[i] + + Xbeta = dot(X_batch, beta) + grad = gradient(Xbeta, X_batch, y_batch) + beta -= step_size(initial_step, iters) * (n / batch_size) * grad + if average: + beta_sum += beta + + too_long = (iters / n > epochs) or (iters > maxiter) + if too_long: + finished = True + break + if iters % 1000 == 0: + rel_error = da.linalg.norm(beta_old - beta) + rel_error /= da.linalg.norm(beta) + converged = rel_error < tol + if converged.compute(): + finished = True + break + finished = True + if average: + return beta_sum.compute() / iters + return beta.compute() + + @normalize def newton(X, y, max_iter=50, tol=1e-8, family=Logistic, **kwargs): """Newtons Method for Logistic Regression. @@ -430,5 +545,6 @@ def proximal_grad(X, y, regularizer='l1', lamduh=0.1, family=Logistic, 'gradient_descent': gradient_descent, 'newton': newton, 'lbfgs': lbfgs, - 'proximal_grad': proximal_grad + 'proximal_grad': proximal_grad, + 'sgd': sgd } diff --git a/dask_glm/estimators.py b/dask_glm/estimators.py index aa05425..4833b05 100644 --- a/dask_glm/estimators.py +++ b/dask_glm/estimators.py @@ -32,7 +32,7 @@ def family(self): def __init__(self, fit_intercept=True, solver='admm', regularizer='l2', max_iter=100, tol=1e-4, lamduh=1.0, rho=1, - over_relax=1, abstol=1e-4, reltol=1e-2): + over_relax=1, abstol=1e-4, reltol=1e-2, **kwargs): self.fit_intercept = fit_intercept self.solver = solver self.regularizer = regularizer @@ -61,9 +61,10 @@ def __init__(self, fit_intercept=True, solver='admm', regularizer='l2', self._fit_kwargs = {k: getattr(self, k) for k in fit_kwargs} - def fit(self, X, y=None): + def fit(self, X, y=None, **kwargs): X_ = self._maybe_add_intercept(X) - self._coef = algorithms._solvers[self.solver](X_, y, **self._fit_kwargs) + self._coef = algorithms._solvers[self.solver](X_, y, **self._fit_kwargs, + **kwargs) if self.fit_intercept: self.coef_ = self._coef[:-1]