diff --git a/bfast/models.py b/bfast/models.py index 6e8bc16..e9e7ca7 100644 --- a/bfast/models.py +++ b/bfast/models.py @@ -1,13 +1,12 @@ -from bfast.monitor import BFASTMonitorPython -from bfast.monitor import BFASTMonitorOpenCL - +from bfast.monitor import * class BFASTMonitor(): """ BFASTMonitor implements the BFASTMonitor approach and provides two backends/implementations: - - A pure Python implementation (based on the Numpy package). + - A pure Numpy implementation (based on the Numpy package) + - A pure Cupy implementation suited for GPU devices - An optimized OpenCL implementation suited for massively-parallel devices. The interface follows the one of the corresponding R package, @@ -52,8 +51,8 @@ class BFASTMonitor(): Specifies the OpenCL platform id. device_id int, default 0 - Only relevant if backend='opencl'. - Specified the OpenCL device id. + Only relevant if backend='opencl' or 'cupy'. + Specified the GPU device id. detailed_results : bool, default False Only relevant if backend='opencl'. @@ -129,6 +128,18 @@ def __init__( period=self.period, verbose=self.verbose, ) + + elif self.backend == 'cupy' and gpu_available: + self._model = BFASTMonitorCuPy( + start_monitor=self.start_monitor, + freq=self.freq, + k=self.k, + hfrac=self.hfrac, + trend=self.trend, + level=self.level, + period=self.period, + verbose=self.verbose, + ) elif self.backend == 'python-mp': self._model = BFASTMonitorPython( diff --git a/bfast/monitor/__init__.py b/bfast/monitor/__init__.py index 736de20..a816f42 100644 --- a/bfast/monitor/__init__.py +++ b/bfast/monitor/__init__.py @@ -1,2 +1,13 @@ from .opencl import BFASTMonitorOpenCL from .python import BFASTMonitorPython + +# only import cupy related bfast if a gpu is available +try: + import cupy + cupy.cuda.Device() + gpu_available = True +except: + gpu_available = False + +if gpu_available: + from .cupy import BFASTMonitorCuPy diff --git a/bfast/monitor/cupy/__init__.py b/bfast/monitor/cupy/__init__.py new file mode 100644 index 0000000..f0ac7c4 --- /dev/null +++ b/bfast/monitor/cupy/__init__.py @@ -0,0 +1 @@ +from .base import BFASTMonitorCuPy \ No newline at end of file diff --git a/bfast/monitor/cupy/base.py b/bfast/monitor/cupy/base.py new file mode 100644 index 0000000..a4b39d7 --- /dev/null +++ b/bfast/monitor/cupy/base.py @@ -0,0 +1,269 @@ +''' +Created on June 15, 2021 + +@author: Pierrick Rambaud +''' + +import multiprocessing as mp +from functools import partial + +import cupy as cp +from sklearn import linear_model + +from bfast.base import BFASTMonitorBase +from bfast.monitor.utils import compute_end_history, compute_lam, map_indices + + +class BFASTMonitorCuPy(BFASTMonitorBase): + """ BFAST Monitor implementation based on Python and Numpy. The + interface follows the one of the corresponding R package + (see https://cran.r-project.org/web/packages/bfast) + + def __init__(self, + start_monitor, + freq=365, + k=3, + hfrac=0.25, + trend=True, + level=0.05, + verbose=0, + device_id=0 + ): + + Parameters + ---------- + + start_monitor : datetime object + A datetime object specifying the start of + the monitoring phase. + + freq : int, default 365 + The frequency for the seasonal model. + + k : int, default 3 + The number of harmonic terms. + + hfrac : float, default 0.25 + Float in the interval (0,1) specifying the + bandwidth relative to the sample size in + the MOSUM/ME monitoring processes. + + trend : bool, default True + Whether a tend offset term shall be used or not + + level : float, default 0.05 + Significance level of the monitoring (and ROC, + if selected) procedure, i.e., probability of + type I error. + + period : int, default 10 + Maximum time (relative to the history period) + that will be monitored. + + verbose : int, optional (default=0) + The verbosity level (0=no output, 1=output) + + device_id : int, optional (default=1) + The GPU device id number + """ + def __init__(self, + start_monitor, + freq=365, + k=3, + hfrac=0.25, + trend=True, + level=0.05, + period=10, + verbose=0, + device_id=0 + ): + + super().__init__(start_monitor, + freq, + k=k, + hfrac=hfrac, + trend=trend, + level=level, + period=period, + verbose=verbose) + + self._timers = {} + + # use the specified cuda device (default to 0) + cp.cuda.device.Device(device_id).use() + + def fit(self, data, dates, nan_value=0, **kwargs): + """ Fits the models for the ndarray 'data' + + Parameters + ---------- + data: ndarray of shape (N, W, H), + where N is the number of time + series points per pixel and W + and H the width and the height + of the image, respectively. + dates : list of datetime objects + Specifies the dates of the elements + in data indexed by the first axis + nan_value : int, default 0 + Specified the NaN value used in + the array data + + Returns + ------- + self : instance of BFASTMonitor + The object itself. + """ + + data_ints = cp.copy(data) + data = cp.array(cp.copy(data_ints)).astype(cp.float32) + + # set NaN values + data[data_ints==nan_value] = cp.nan + + self.n = compute_end_history(dates, self.start_monitor) + + # create (complete) seasonal matrix ("patterns" as columns here!) + self.mapped_indices = np.array(map_indices(dates)).astype(cp.int32) + self.X = self._create_data_matrix(self.mapped_indices) + + # period = data.shape[0] / cp.float(self.n) + self.lam = cp.array(compute_lam(data.shape[0], self.hfrac, self.level, self.period)) + + results = cp.apply_along_axis(self.fit_single, axis=0, arr=data) + + self.breaks = results[0].get() + self.means = results[1].get() + self.magnitudes = results[2].get() + self.valids = results[3].get() + + return self + + def fit_single(self, y): + """ Fits the BFAST model for the 1D array y. + + Parameters + ---------- + y : array + 1d array of length N + + Returns + ------- + self : instance of BFASTCPU + The object itself + """ + + N = y.shape[0] + + # compute nan mappings + nans = cp.isnan(y) + num_nans = cp.cumsum(nans) + val_inds = cp.array(range(N))[~nans] + + # compute new limits (in data NOT containing missing values) + # ns = n - num_nans[self.n] + ns = self.n - num_nans[self.n - 1] + h = cp.int(float(ns) * self.hfrac) + Ns = N - num_nans[N - 1] + + if ns <= 5 or Ns - ns <= 5: + brk = -2 + mean = 0.0 + magnitude = 0.0 + if self.verbose > 1: + print("WARNING: Not enough observations: ns={ns}, Ns={Ns}".format(ns=ns, Ns=Ns)) + + return cp.array([brk, mean, magnitude, Ns], dtype=cp.float) + + val_inds = val_inds[ns:] + val_inds -= self.n + + # remove nan values from patterns+targets + X_nn = self.X[:, ~nans] + y_nn = y[~nans] + + # split data into history and monitoring phases + X_nn_h = X_nn[:, :ns] + X_nn_m = X_nn[:, ns:] + y_nn_h = y_nn[:ns] + y_nn_m = y_nn[ns:] + + # (1) fit linear regression model for history period + coef = cp.linalg.pinv(X_nn_h@X_nn_h.T)@X_nn_h@y_nn_h + + # get predictions for all non-nan points + y_pred = X_nn.T@coef + + y_error = y_nn - y_pred + + # (2) evaluate model on monitoring period mosum_nn process + err_cs = cp.cumsum(y_error[ns - h:Ns + 1]) + mosum_nn = err_cs[h:] - err_cs[:-h] + + sigma = cp.sqrt(cp.sum(y_error[:ns] ** 2) / (ns - (2 + 2 * self.k))) + mosum_nn = 1.0 / (sigma * cp.sqrt(ns)) * mosum_nn + + mosum = cp.full(N - self.n, cp.nan) + mosum[val_inds[:Ns - ns]] = mosum_nn + if self.verbose: + print("MOSUM process", mosum_nn.shape) + + # compute mean + mean = cp.mean(mosum_nn) + + # compute magnitude + magnitude = cp.median(y_error[ns:]) + + # boundary and breaks + a = self.mapped_indices[self.n:] / self.mapped_indices[self.n - 1].astype(cp.float) + bounds = self.lam * cp.sqrt(self._log_plus(a)) + + if self.verbose: + print("lambda", self.lam) + print("bounds", bounds) + + breaks = cp.abs(mosum) > bounds + first_break = cp.where(breaks)[0] + + if first_break.shape[0] > 0: + first_break = first_break[0].item() + else: + first_break = -1 + + return cp.array([first_break, mean.item(), magnitude.item(), Ns.item()], dtype=cp.float) + + def get_timers(self): + """ Returns runtime measurements for the + different phases of the fitting process. + + Returns + ------- + dict : An array containing the runtimes + for the different phases. + """ + return self._timers + + def _create_data_matrix(self, mapped_indices): + + # cast to cp array + mapped_indices = cp.array(mapped_indices) + N = mapped_indices.shape[0] + temp = 2 * cp.pi * mapped_indices / cp.float(self.freq) + + if self.trend: + X = cp.vstack((cp.ones(N), mapped_indices)) + else: + X = cp.ones(N) + + for j in cp.arange(1, self.k + 1): + X = cp.vstack((X, cp.sin(j * temp))) + X = cp.vstack((X, cp.cos(j * temp))) + + return X + + def _log_plus(self, a): + retval = cp.ones(a.shape, dtype=cp.float) + fl = a > cp.e + retval[fl] = cp.log(a[fl]) + + return retval diff --git a/bfast/monitor/python/base.py b/bfast/monitor/python/base.py index 622ec8c..03447d2 100644 --- a/bfast/monitor/python/base.py +++ b/bfast/monitor/python/base.py @@ -162,15 +162,22 @@ def fit(self, data, dates, nan_value=0): for j in range(data.shape[2]): y = data[:,i,j] + + print(y.shape) + (pix_break, pix_mean, pix_magnitude, pix_num_valid) = self.fit_single(y) + print(self.fit_single(y)) + breaks_global[i,j] = pix_break means_global[i,j] = pix_mean magnitudes_global[i,j] = pix_magnitude valids_global[i,j] = pix_num_valid - + + raise Exception('stop') + self.breaks = breaks_global self.means = means_global self.magnitudes = magnitudes_global @@ -257,7 +264,7 @@ def fit_single(self, y): sigma = np.sqrt(np.sum(y_error[:ns] ** 2) / (ns - (2 + 2 * self.k))) mosum_nn = 1.0 / (sigma * np.sqrt(ns)) * mosum_nn - mosum = np.repeat(np.nan, N - self.n) + mosum = np.full(N - self.n, np.nan) mosum[val_inds[:Ns - ns]] = mosum_nn if self.verbose: print("MOSUM process", mosum_nn.shape) diff --git a/requirements.txt b/requirements.txt index 4d0a45b..89e7d92 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,11 @@ -numpy>=1.16.3 -pandas>=1.0.0 -pyopencl>=2018.2.5 -scikit-learn>=0.20.3 -scipy>=1.2.1 -matplotlib>=2.2.2 -wget>=3.2 -Sphinx>=2.2.0 -sphinx-bootstrap-theme>=0.7.1 -numpydoc>=1.0.0 +numpy==1.16.3 +pandas==1.0.0 +pyopencl==2018.2.5 +scikit-learn==0.20.3 +scipy==1.2.1 +matplotlib==2.2.2 +wget==3.2 +Sphinx==2.2.0 +sphinx-bootstrap-theme==0.7.1 +numpydoc==1.0.0 +cupy==9.2.0 diff --git a/setup.py b/setup.py index 2426520..1a4ff26 100644 --- a/setup.py +++ b/setup.py @@ -93,6 +93,7 @@ def setup_package(): 'scipy>=1.2.1', 'matplotlib>=2.2.2', 'wget>=3.2', + 'cupy>=9.2.0' ], classifiers=['Intended Audience :: Science/Research', 'Intended Audience :: Developers',