Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 45 additions & 12 deletions ARX/Regressors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@
from scipy.sparse.linalg import cg
from sklearn.linear_model import LinearRegression as SK_LinearRegression
from sklearn.linear_model import BayesianRidge
from sklearn.cluster import KMeans

class Base:

def __init__(self, N_AR):
def __init__(self, N_AR, basis=None):
"""
Initialises the Regressor object.

Parameters:
N_AR (int): Number of auto-regressive terms to include. Default is 0.
N_AR (int) : Number of auto-regressive terms to include.
basis : is None or "se" for squared exponential
"""
if not isinstance(N_AR, int):
raise ValueError("N_AR must be an integer")
self.N_AR = N_AR
self.basis = basis

def _prepare_arx_data(self, X, y):
"""
Expand Down Expand Up @@ -58,24 +61,33 @@ def predict(self, X, y0=None):
numpy.ndarray: Predicted target values.
"""
assert np.shape(X)[1] == self.D


# Apply basis function
if self.basis is None:
Phi = X
elif self.basis == "se":
Phi = self._se_basis(X, self.centres)
else:
raise ValueError(f"Unknown basis function: {self.basis}. Must be None or 'se'.")
basis_dim = Phi.shape[1]


if self.N_AR == 0:
y_pred = self.model.predict(X)
y_pred = self.model.predict(Phi)
else:
assert len(y0) == self.N_AR

y_pred = []
for t in range(self.N_AR, np.shape(X)[0] + self.N_AR):
for t in range(self.N_AR, np.shape(Phi)[0] + self.N_AR):

# First time step
if t == self.N_AR:
u = np.hstack([X[0], y0])
u = np.hstack([Phi[0], y0])

# Remaining time steps
else:
u[:self.D] = X[t - self.N_AR]
u[self.D:] = np.roll(u[self.D:], 1)
u[:basis_dim] = Phi[t - self.N_AR]
u[basis_dim:] = np.roll(u[basis_dim:], 1)
u[-1] = y

y = self.model.predict(u.reshape(1, -1))[0]
Expand All @@ -86,27 +98,48 @@ def predict(self, X, y0=None):

return y_pred

def _se_basis(self, X, centres, width):
""" Squared exponential basis function
"""
dists = np.linalg.norm(X[:, np.newaxis, :] - centres[np.newaxis, :, :], axis=2)
return np.exp(-0.5 * (dists / width) ** 2)

class Linear(Base):

def train(self, X, y, positive=False):
def train(self, X, y, positive=False, **kwargs):
"""
Trains the regressor using the provided data.

Parameters:
X (numpy.ndarray): Input data of shape (N, D).
y (numpy.ndarray): Target data of shape (N,).
positive : if true, restricts all parameters to be positive
n_clusters : only relevant if using the squared exponential basis function;
is the number of cluster centres we look for in k-means

Returns:
None
"""
# Size checks
self.N, self.D = np.shape(X)
assert y.shape == (self.N,)

# Apply basis function
if self.basis is None:
Phi = X
if self.basis == "se":
self.centres = kwargs['centres']
Phi = self._se_basis(X, centres=self.centres, width=kwargs['width'])

# Get data into auto-regressive format
if self.N_AR > 0:
X, y = self._prepare_arx_data(X, y)
Phi, y = self._prepare_arx_data(Phi, y)

# Initialise
self.model = SK_LinearRegression(positive=positive)
self.model.fit(X, y)

# Train model parameters
self.model.fit(Phi, y)

class LinearBayes(Base):

Expand Down
131 changes: 131 additions & 0 deletions examples/example3_basis_functions.ipynb

Large diffs are not rendered by default.

141 changes: 141 additions & 0 deletions examples/example4_basis_functions_autoregressive.ipynb

Large diffs are not rendered by default.

83 changes: 83 additions & 0 deletions examples/example5_linear_regression_to_Gaussian_Process.ipynb

Large diffs are not rendered by default.

34 changes: 34 additions & 0 deletions tests/test_regressor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import numpy as np
from ARX.Regressors import Linear, LinearBayes
from sklearn.metrics import mean_squared_error

def generate_AR_data():

Expand Down Expand Up @@ -105,6 +106,23 @@ def test_arx_linear():
y_pred = regressor.predict(X[N_AR:], y0=y[:N_AR])
assert np.allclose(y[N_AR:], y_pred)

def test_arx_se_basis():
"""
Tests the Regressor's ability to handle ARX models with auto-regressive terms
and using squared-exponential basis functions.
"""

# Generate example data
X, y, N_AR, true_theta = generate_AR_data()

# Initialize and train the regressor
regressor = Linear(N_AR=N_AR, basis='se')
regressor.train(X, y)

# Check full model predictions
y_pred = regressor.predict(X[N_AR:], y0=y[:N_AR])
np.allclose(y[N_AR:], y_pred, atol=0.1)

def test_arx_linear_Bayes():

# Generate example data
Expand All @@ -125,3 +143,19 @@ def test_arx_linear_Bayes():
assert np.allclose(y[N_AR:], y_mean, atol=1e-4)
assert np.sum(y[N_AR:] < y_mean - 3 * y_std) == 0
assert np.sum(y[N_AR:] > y_mean + 3 * y_std) == 0

def test_se_basis_linear():

# Fix seed
np.random.seed(42)

# Sample 2D data
X = np.random.uniform(0, 5, size=(100, 2))
y = np.sin(X[:, 0]) + np.cos(X[:, 1]) + 0.1 * np.random.randn(100)

model = Linear(N_AR=0, basis='se')
model.train(X, y)

# Check fit
y_pred = model.predict(X)
assert mean_squared_error(y_pred, y) < 0.05