diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 99bbc905..0f76d434 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased changes (available on the `dev` branch) ### Added +- Added functionality and examples for constrained ICA methods (arc-ERBM, arc-EBM), by [Jacqueline Behrendt](https://github.com/jackybehrendt12). ([#133](https://github.com/ibs-lab/cedalion/pull/133)) - An example notebook for ICA source extraction was added, by [Jacqueline Behrendt](https://github.com/jackybehrendt12). ([#112](https://github.com/ibs-lab/cedalion/pull/112)) - Added `TwoSurfaceHeadmodel.scale_to_headsize` and `TwoSurfaceHeadmodel.scale_to_landmarks` to adjust the head model's size to the head circumferences or digitized landmarks, respectively. By [Eike Middell](https://github.com/emiddell). diff --git a/docs/examples/Makefile b/docs/examples/Makefile index 32832bc8..c361fa3d 100644 --- a/docs/examples/Makefile +++ b/docs/examples/Makefile @@ -7,6 +7,7 @@ EXAMPLE_NOTEBOOKS = getting_started_io/00_test_installation.ipynb \ plots_visualization/12_plots_example.ipynb \ machine_learning/50_finger_tapping_lda_classification.ipynb \ machine_learning/52_ica_erbm_fingertapping_example.ipynb \ + machine_learning/53_constrained_ICA_example.ipynb \ modeling/31_glm_basis_functions.ipynb \ modeling/32_glm_fingertapping_example.ipynb \ modeling/33_glm_illustrative_example.ipynb \ diff --git a/docs/machine_learning/index.rst b/docs/machine_learning/index.rst index e63d4646..219c73b4 100644 --- a/docs/machine_learning/index.rst +++ b/docs/machine_learning/index.rst @@ -21,8 +21,7 @@ Decomposition Methods :recursive: :nosignatures: - cedalion.sigdecomp.ERBM - cedalion.sigdecomp.ICA_EBM + cedalion.sigdecomp.unimodal cedalion.sigdecomp.multimodal Examples diff --git a/docs/references.bib b/docs/references.bib index af906259..ef65f5fb 100644 --- a/docs/references.bib +++ b/docs/references.bib @@ -427,3 +427,15 @@ @article{Schaefer2018 year={2018}, publisher={Oxford University Press} } + + +@article{yang2025flexible, + title={A Flexible Constrained ICA Approach for Multisubject fMRI Analysis}, + author={Yang, Hanlu and Vu, Trung and Dhrubo, Ehsan Ahmed and Calhoun, Vince D and Adali, T{\"u}lay}, + journal={International Journal of Biomedical Imaging}, + volume={2025}, + number={1}, + pages={2064944}, + year={2025}, + publisher={Wiley Online Library} +} \ No newline at end of file diff --git a/examples/machine_learning/53_constrained_ICA_example.ipynb b/examples/machine_learning/53_constrained_ICA_example.ipynb new file mode 100644 index 00000000..98ad74aa --- /dev/null +++ b/examples/machine_learning/53_constrained_ICA_example.ipynb @@ -0,0 +1,481 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "0", + "metadata": {}, + "outputs": [], + "source": [ + "# This cells setups the environment when executed in Google Colab.\n", + "try:\n", + " import google.colab\n", + " !curl -s https://raw.githubusercontent.com/ibs-lab/cedalion/dev/scripts/colab_setup.py -o colab_setup.py\n", + " # Select branch with --branch \"branch name\" (default is \"dev\")\n", + " %run colab_setup.py\n", + "except ImportError:\n", + " pass" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1", + "metadata": {}, + "outputs": [], + "source": [ + "import cedalion\n", + "import cedalion.sigproc.quality as quality\n", + "import matplotlib.pyplot as plt\n", + "import numpy as np\n", + "import scipy as sp\n", + "import xarray as xr\n", + "from cedalion import units\n", + "from cedalion.sigdecomp.multimodal import arc_ebm, arc_erbm\n", + "import cedalion.data" + ] + }, + { + "cell_type": "markdown", + "id": "2", + "metadata": {}, + "source": [ + "# Constrained Independent Component Analysis (ICA)\n", + "\n", + "In this notebook, we demonstrate how constrained ICA methods can be applied to separate physiological sources from resting-state fNIRS data using auxiliary measurements. Specifically, we focus on adaptive-reverse constrained ICA-ERBM (arc-ERBM) and adaptive-reverse constrained ICA-EBM (arc-EBM).\n", + "\n", + "arc-ERBM and arc-EBM are constrained versions of the methods [Independent Component Analysis by Entropy Rate Bound Minimization](https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=6845364) (ICA-ERBM) and [by Entropy Bound Minimization](https://ieeexplore.ieee.org/abstract/document/5499122) (ICA-EBM). The general assumption in [Independent Component Analysis](https://en.wikipedia.org/wiki/Independent_component_analysis) is that the dataset $X \\in \\mathbb R^{N\\times T}$, with $N$ channels and $T$ sample points, is generated from a set of independent latent sources $S \\in \\mathbb R^{N\\times T}$, mixed by an unknown mixing matrix $A \\in \\mathbb R^{N \\times N}$.\n", + "\n", + "$$\n", + "X = A \\cdot S.\n", + "$$\n", + "\n", + "ICA methods aim to undo this mixing by determining a demixing matrix $W \\in \\mathbb{R}^{N \\times N}$, such that the estimated underlying sources $Y = W \\cdot X$ are maximally independent. The optimization of the demixing matrix is based on minimizing the mutual information $I$ in the case of ICA-EBM, and the mutual information rate $I_r$ in the case of ICA-ERBM. In both methods, this is done by minimizing a cost function $J$ that is equivalent to either $I$ or $I_r$ for each row vector $w_n$, $n = 1, ..., N$.\n", + "\n", + "In the constrained methods arc-EBM and arc-ERBM, we assume that there are $M \\leq N$ reference signals $r_n$, $n = 1, ..., M$, that correspond to $M$ latent sources. For each source estimate $y_n = w_n^T X$ that corresponds to a reference signal, the minimization of the cost function $J$ is extended through a constraint that uses a reference signal $r_n$:\n", + "\n", + "$$\n", + "\\min_{w_n} J(w_n) \\quad \\text{s.t.} \\quad \\varepsilon(r_n, y_n) \\geq \\rho_n\n", + "$$\n", + "\n", + "Here, $\\varepsilon$ is a similarity measure that operates in the frequency domain and enforces similar spectral characteristics between the source estimate $y_n$ and the reference signal $r_n$.\n", + "\n", + "In the following example, $X$ represents our resting-state fNIRS data, and as reference signals $r_n$, we use auxiliary PPG, respiration, and mean arterial pressure (MAP) measurements. After applying the constrained ICA methods and obtaining $W$, we can compute estimates of the separated sources as $y_n = w_n^T X$.\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "id": "3", + "metadata": {}, + "source": [ + "## Loading Resting-State fNIRS Data\n", + "\n", + "We load the resting-state fNIRS data, including the auxiliary physiological measurements from the SNIRF file. For the demixing problem, we use middle-distance channels of approximately 15 mm in length to ensure that physiological noise signals are present in the data." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4", + "metadata": {}, + "outputs": [], + "source": [ + "# Load data\n", + "rec = cedalion.data.get_spa_fnirs()\n", + " \n", + "# Read fnirs data \n", + "fnirs_amp = rec['amp']\n", + "\n", + "\n", + "# Define middle distance channels \n", + "middle_channels = ['S1D7', 'S1D8', 'S1D13', 'S1D14', 'S1D15', 'S1D16', 'S2D8', 'S2D11', 'S2D12', \n", + " 'S3D7', 'S3D9', 'S3D10', 'S4D1', 'S4D5', 'S4D10', 'S4D16', 'S5D4', 'S5D5', 'S5D11', \n", + " 'S5D15', 'S6D3', 'S6D6', 'S6D12', 'S6D14', 'S7D2', 'S7D6', 'S7D9', 'S7D13', 'S8D22', \n", + " 'S8D23', 'S8D24', 'S8D29', 'S8D30', 'S8D31', 'S9D24', 'S9D27', 'S9D28', 'S10D23', 'S10D25',\n", + " 'S10D26', 'S11D19', 'S11D26', 'S11D31', 'S12D18', 'S12D19', 'S12D22', 'S12D28', 'S13D17', \n", + " 'S13D20', 'S13D27', 'S13D29', 'S14D20', 'S14D21', 'S14D25', 'S14D30']\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5", + "metadata": {}, + "outputs": [], + "source": [ + "# Plot three middle distance channels\n", + "fig, ax = plt.subplots(3, 1, sharex=True, figsize=(10, 5))\n", + "for i, ch in enumerate(['S1D7', 'S1D8', 'S1D13']):\n", + " ax[i].plot(fnirs_amp.time, fnirs_amp.sel(channel=ch, wavelength=\"760\"), \"r-\", label=\"760nm\")\n", + " ax[i].plot(fnirs_amp.time, fnirs_amp.sel(channel=ch, wavelength=\"850\"), \"b-\", label=\"850nm\")\n", + " ax[i].set_title(f\"Channel {ch}\")\n", + "\n", + "ax[0].legend()\n", + "ax[2].set_xlim(2400,2500)\n", + "ax[2].set_xlabel(\"Time (seconds)\")\n", + "plt.tight_layout()" + ] + }, + { + "cell_type": "markdown", + "id": "6", + "metadata": {}, + "source": [ + "## Conversion to Optical Density" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7", + "metadata": {}, + "outputs": [], + "source": [ + "# Convert to OD\n", + "fnirs_od = cedalion.nirs.cw.int2od(fnirs_amp)" + ] + }, + { + "cell_type": "markdown", + "id": "8", + "metadata": {}, + "source": [ + "## Select Resting-State Session\n", + "\n", + "Our data contain a resting-state session of 75 seconds. We select this session and crop the first 10 seconds to remove non-stationarities in the data. From the remaining session, we select a 60-second interval for our analysis using the middle-distance channels." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9", + "metadata": {}, + "outputs": [], + "source": [ + "# Select the onset of the resting state interval (pre_sitting) \n", + "onset_resting = rec.stim[rec.stim.trial_type == 'pre_sitting'].onset.values[0]\n", + "\n", + "# We cropp the first 10 seconds of the resting state interval to \n", + "# avoid transient effects and select a 60 second interval for the analysis.\n", + "interval = [onset_resting + 10, onset_resting + 70]\n", + "\n", + "# Select interval and channels \n", + "interval_fnirs_od = fnirs_od.sel(time=slice(interval[0], interval[1]))\n", + "interval_fnirs_od = interval_fnirs_od.sel(channel= middle_channels)" + ] + }, + { + "cell_type": "markdown", + "id": "10", + "metadata": {}, + "source": [ + "## Channel Quality Assessment and Pruning\n", + "\n", + "We compute the Scalp Coupling Index (SCI) and Peak Spectral Power (PSP) for channel quality assessment. SCI and PSP are computed for each channel, and we then select the 40 channels with the highest percentage of clean time." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "11", + "metadata": {}, + "outputs": [], + "source": [ + "# Define parameters for quality metrics\n", + "window_length = 5 * units.s\n", + "sci_thresh = 0.75\n", + "psp_thresh = 0.1\n", + "sci_psp_percentage_thresh = 0.75\n", + "\n", + "# Compute SCI and PSP \n", + "sci, sci_mask = quality.sci(interval_fnirs_od, window_length, sci_thresh)\n", + "psp, psp_mask = quality.psp(interval_fnirs_od, window_length, psp_thresh)\n", + "sci_x_psp_mask = sci_mask & psp_mask\n", + "perc_time_clean = sci_x_psp_mask.sum(dim=\"time\") / len(sci.time)\n", + "\n", + "# Set the number of channels to include in the ICA analysis\n", + "num_ch = 40\n", + "\n", + "# Select the best channels \n", + "id_best_channels = np.argsort(perc_time_clean)[-num_ch:]\n", + "best_channels = id_best_channels['channel']\n", + "best_middle_channels = interval_fnirs_od.sel(channel=best_channels)" + ] + }, + { + "cell_type": "markdown", + "id": "12", + "metadata": {}, + "source": [ + "## Convert Optical Density to Concentration Changes " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "13", + "metadata": {}, + "outputs": [], + "source": [ + "# Convert optical density to concentration changes \n", + "montage = rec.geo3d\n", + "dpf = xr.DataArray(\n", + " [6, 6],\n", + " dims=\"wavelength\",\n", + " coords={\"wavelength\": fnirs_od.wavelength},)\n", + " \n", + "fnirs_con = cedalion.nirs.cw.od2conc(fnirs_od, montage, dpf)" + ] + }, + { + "cell_type": "markdown", + "id": "14", + "metadata": {}, + "source": [ + "## High-Pass Filtering and Selection of HbO" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "15", + "metadata": {}, + "outputs": [], + "source": [ + "# Apply high-pass filter\n", + "y_filt = fnirs_con.cd.freq_filter(fmin= 0.01, fmax= 0, butter_order=4)\n", + "\n", + "# Select resting state interval\n", + "y_filt = y_filt.sel(time = slice(interval[0], interval[1]))\n", + "\n", + "# Select middle distance channels\n", + "y_filt = y_filt.sel(channel=best_middle_channels.channel.values)\n", + "\n", + "# Select only HbO signal \n", + "y_filt = y_filt.sel(chromo = 'HbO')\n", + "\n", + "# Turn to numpy array \n", + "data = y_filt.values" + ] + }, + { + "cell_type": "markdown", + "id": "16", + "metadata": {}, + "source": [ + "## Prepare the Auxiliary Signals\n", + "\n", + "We now extract the respiration ('Resp'), PPG ('Pleth'), or mean arterial pressure ('MAP') signals from the recording. These signals must be downsampled to match the fNIRS sampling frequency. We first select the resting-state interval with an additional buffer and apply a band-pass filter to the data to avoid aliasing effects. The MAP signal may contain missing samples, which we address using an interpolation step." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "17", + "metadata": {}, + "outputs": [], + "source": [ + "# Select the auxiliary signal from the recording \n", + "aux_name = 'Resp' # use 'MAP', 'Pleth' or 'Resp' \n", + "aux_signal = rec.aux_ts[aux_name]\n", + "\n", + "# Select the interval of the auxiliary signal with a 100 second buffer \n", + "# before and after the resting state interval to avoid edge effects in the filtering step\n", + "buffer = 100 \n", + "aux_signal = aux_signal.sel(time = slice(interval[0]- buffer, interval[1] + buffer ))\n", + "\n", + "# Add a new coordinate called samples and add unit \n", + "aux_signal['time'].attrs['units'] = 'seconds'\n", + "samples = np.arange(aux_signal.sizes['time'])\n", + "aux_signal = aux_signal.assign_coords(samples=('time', samples))\n", + "\n", + "# Fix missing samples in the MAP signal \n", + "if aux_name == 'MAP':\n", + " aux_signal = aux_signal.interpolate_na(dim = 'time' ,method = 'cubic',\n", + " fill_value='extrapolate')\n", + "\n", + "# Apply bandpass filter to the auxiliary signal to avaoid aliasing effects after the downsampling step.\n", + "aux_signal = aux_signal.cd.freq_filter(fmin= 0.01, fmax= 2.5 , butter_order=4) \n", + "\n", + "# Downsample the auxiliary signal by interpolating it to the time points of the fNIRS signal\n", + "time_line = fnirs_con.sel(time = slice(interval[0]- buffer,interval[1]+buffer))\n", + "aux_signal = aux_signal.drop_duplicates(dim='time')\n", + "aux_signal = aux_signal.interp(time=time_line.time)\n", + "aux_signal = aux_signal.dropna(dim=\"time\", how=\"any\")\n", + "\n", + "# Remove buffer \n", + "aux_signal = aux_signal.sel(time = slice(interval[0], interval[1]))\n", + "\n", + "# Turn to numpy array and reshape \n", + "aux_signal = np.array(aux_signal.values, dtype=np.float64).T\n", + "aux_signal = aux_signal.reshape(1, -1) \n" + ] + }, + { + "cell_type": "markdown", + "id": "18", + "metadata": {}, + "source": [ + "## Z-Transform Normalization" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "19", + "metadata": {}, + "outputs": [], + "source": [ + "# z-transform the data and auxiliary signal\n", + "data = sp.stats.zscore(data, axis=1) \n", + "aux_signal = sp.stats.zscore(aux_signal, axis=1) " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "20", + "metadata": {}, + "outputs": [], + "source": [ + "# Plot the data and the auxiliary signal\n", + "fig, ax = plt.subplots(2, 1, figsize=(10, 5))\n", + "\n", + "x_time = np.arange(data.shape[1]) * 1/(7.4)\n", + "ax[0].plot(x_time, data.T)\n", + "ax[0].set_title('fNIRS data (HbO)')\n", + "ax[1].plot(x_time, aux_signal[0])\n", + "ax[1].set_title(f'Auxiliary signal ({aux_name})')\n", + "ax[1].set_xlabel('Time (seconds)')\n", + "\n", + "plt.tight_layout()\n", + "plt.show()\n" + ] + }, + { + "cell_type": "markdown", + "id": "21", + "metadata": {}, + "source": [ + "## Apply Constrained ICA Methods\n", + "\n", + "We define a frequency reference signal by computing the power spectral density of the reference signal. We then apply the constrained ICA methods to the data." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "22", + "metadata": {}, + "outputs": [], + "source": [ + "# Create the time domain and frequency domain reference signals \n", + "ref = np.copy(aux_signal)\n", + "ref_psd = (2/ ref.shape[1] ) * np.abs(np.fft.rfft(ref, axis = 1 )**2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "23", + "metadata": {}, + "outputs": [], + "source": [ + "# Set the filter length for arc-ERBM \n", + "p = 11\n", + "\n", + "# Apply ICA methods \n", + "W1 = arc_erbm.arc_erbm(data, ref_psd.T, p)\n", + "W2 = arc_erbm.arc_erbm(data, ref_psd.T, p, ref.T)\n", + "W3 = arc_ebm.arc_ebm(data, ref_psd.T, 'psd')" + ] + }, + { + "cell_type": "markdown", + "id": "24", + "metadata": {}, + "source": [ + "## Compute Source Estimates\n", + "\n", + "For each constrained method, the first row of the demixing matrix corresponds to the referenced source. We therefore select the first row and compute the source estimate as $y = w_0^T X$." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "25", + "metadata": {}, + "outputs": [], + "source": [ + "# Compute the estimated sources\n", + "source_arc_erbm = W1[0].dot(data)\n", + "source_arc_erbm_pr = W2[0].dot(data)\n", + "source_arc_ebm = W3[0].dot(data)\n", + "\n", + "# z-transform the estimated sources\n", + "source_arc_erbm = sp.stats.zscore(source_arc_erbm)\n", + "source_arc_erbm_pr = sp.stats.zscore(source_arc_erbm_pr)\n", + "source_arc_ebm = sp.stats.zscore(source_arc_ebm)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "26", + "metadata": {}, + "outputs": [], + "source": [ + "# Plot source estimates and reference signal\n", + "fig, ax = plt.subplots(3, 1, figsize=(10, 7))\n", + "\n", + "estimates = [source_arc_erbm, source_arc_erbm_pr, source_arc_ebm]\n", + "labels = ['arc-ERBM estimate', 'arc-ERBM (PR) estimate', 'arc-EBM estimate']\n", + "for i in range(3):\n", + " # Compute peak cross correlation for +/- 2 seconds lag \n", + " lags = np.arange(-15, 16, 1)\n", + " cross_corr = [np.corrcoef(ref, np.roll(estimates[i], lag, axis=0))[0, 1] for lag in lags]\n", + " max_corr = np.max(np.abs(cross_corr))\n", + " best_lag = lags[np.argmax(np.abs(cross_corr))]\n", + "\n", + " # Copmute RMSE between reference and estimate \n", + " rmse = np.sqrt(np.mean((ref - np.roll(estimates[i], best_lag, axis=0))**2)) \n", + " ax[i].set_title(f'{labels[i]} (correlation with reference: {max_corr:.2f}, RMSE: {rmse:.2f} )', \n", + " fontsize=10)\n", + "\n", + " # Plot estimate and reference \n", + " signal = np.roll(estimates[i], best_lag, axis=0)\n", + " signal = np.sign(np.corrcoef(ref, signal)[0, 1]) * signal\n", + " ax[i].plot(x_time, signal, label = labels[i])\n", + " ax[i].plot(x_time, ref.T, label = 'Reference signal', alpha = 0.7)\n", + " ax[i].legend( loc='upper left', bbox_to_anchor=(1, 1))\n", + "\n", + "\n", + "ax[2].set_xlabel('Time (seconds)')\n", + "plt.tight_layout()\n", + "plt.show()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "cedalion_250922", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/cedalion/data/__init__.py b/src/cedalion/data/__init__.py index 9b7f91d9..1e7df754 100644 --- a/src/cedalion/data/__init__.py +++ b/src/cedalion/data/__init__.py @@ -57,6 +57,8 @@ "snirf2bids_example_dataset.zip" : "f14508e332c7d259c13b9717ac3c490ab2cabfd7b30fdf97b347d5ba59b783d1", # noqa:E501 "fieldtrip_standard1005.elc" : "sha256:1ee59197946d62de872db2ac7f2243a596662c231427366f6dc5d84ed237f853", # noqa:E501 + + "spafNIRS_example_sub179.zip" : "sha256:0a247be5bfa3c7b5bc12d19203e2bd5432df964d72646945891601d0ba944141", # noqa:E501 }, urls={ "fieldtrip_standard1005.elc" : "https://raw.githubusercontent.com/fieldtrip/fieldtrip/refs/heads/master/template/electrode/standard_1005.elc" @@ -300,3 +302,11 @@ def get_snirf2bids_example_dataset() -> tuple[Path, Path]: def get_fieldtrip_colin27_landmarks() -> cdt.LabeledPoints: fname = DATASETS.fetch("fieldtrip_standard1005.elc") return cedalion.io.read_fieldtrip_elc(fname) + + +def get_spa_fnirs() -> cdc.Recording: + fnames = DATASETS.fetch("spafNIRS_example_sub179.zip", processor=pooch.Unzip()) + fname = [Path(i) for i in fnames if i.endswith(".snirf")][0] + rec = cedalion.io.read_snirf(fname)[0] + + return rec diff --git a/src/cedalion/sigdecomp/multimodal/arc_ebm.py b/src/cedalion/sigdecomp/multimodal/arc_ebm.py new file mode 100644 index 00000000..d2f7ef40 --- /dev/null +++ b/src/cedalion/sigdecomp/multimodal/arc_ebm.py @@ -0,0 +1,1191 @@ +"""Independent Component Analysis by Entropy Bound Minimization (ICA-EBM). + +This code is based on :cite:t:`Li2010A` and converted matlab versions provided by the +MLSP Lab at the University of Maryland, which is available here: +https://mlsp.umbc.edu/resources.html. + + +(:cite:t:`yang2025flexible`) H. Yang, T. Vu, Ehsan Ahmed Dhrubo, V. D. Calhoun, and Tülay Adali, +“A Flexible Constrained ICA Approach for Multisubject fMRI Analysis,” +International Journal of Biomedical Imaging, vol. 2025, no. 1, Jan. 2025, +doi: https://doi.org/10.1155/ijbi/2064944. +""" + + +import numpy as np +import cedalion.data + +def arc_ebm(X: np.ndarray, guess_mat, constraint = 'correlation') -> np.ndarray: + """Adaptive-reverse Constrained ICA by Entropy Bound Minimization (arc-EBM) is a constrained ICA algorithm. + arc-EBM calculates the blind source separation demixing matrix corresponding to X, + using the reference signals in guess_mat and the constraint specified by constraint. + + Args: + X (np.ndarray, (Channels, Time Points)): the [N x T] input multivariate time series with dimensionality N observations/channels and T time points + guess_mat (np.ndarray, (Time Points, Referenced Channels)), (np.ndarray, (Time Points/2, Referenced Channels)): Time or frequency domain reference signals. The number of reference signals should be less than or equal to the number of channels in X. The first dimension should be T for time domain signals and T/2 for frequency domain signals. + constraint (str): the constraint to be used for the gradient step, either 'correlation' (default) or 'psd' + + Returns: + W (np.ndarray, (Channels, Channels)): the [N x N] demixing matrix with weights for N channels/sources. + To obtain the independent components, the demixed signals can be calculated as S = W @ X. + + Initial Contributors: + - Jacqueline Behrendt | j.behrendt@tu-berlin.de | 2026 + + References: + This code is based on the matlab version by Xi-Lin Li (:cite:t:`Li2010A`) + Xi-Lin Li and Tulay Adali, "Independent component analysis by entropy bound minimization," + IEEE Trans. Signal Processing, vol. 58, no. 10, pp. 5151-5164, Oct. 2010. + + (:cite:t:`yang2025flexible`) H. Yang, T. Vu, Ehsan Ahmed Dhrubo, V. D. Calhoun, and Tülay Adali, + “A Flexible Constrained ICA Approach for Multisubject fMRI Analysis,” + International Journal of Biomedical Imaging, vol. 2025, no. 1, Jan. 2025, + doi: https://doi.org/10.1155/ijbi/2064944. + + """ + + ############################################################################################################### + # Part 0: Preprocessing + ############################################################################################################### + + rho = np.arange(0, 1.01, 0.01) + max_iter_fastica = 100 + max_iter_orth = 1000 + max_iter_orth_refine = 1000 + max_iter_nonorth = 1000 + saddle_test_enable = True + tolerance = 1e-6 + max_cost_increase_number = 5 + stochastic_search_factor = 1 + eps = np.finfo(np.float64).eps + gam = 100 + + # report the progress if verbose == True + verbose = False + + # Load 8 measuring functions. But we only use 4 of them. + K = 8 + file_path = cedalion.data.get("measfunc_table.npy") + table = np.load(file_path, allow_pickle=True) + + nf1, nf3, nf5, nf7 = table[0], table[2], table[4], table[6] + + N = X.shape[0] # number of channels + T = X.shape[1] # number of time points + X, P = pre_processing(X) + + # Define the epsilon function based on the constraint + if constraint == 'correlation': + def epsilon(a,b): + # correlation coefficient + return np.corrcoef(a, b)[0, 1] + + def epsilon_grad(): + # gradient for correlation constraint + mu_signed[n] = np.sign(e_pair) * mu_c[n] + c_grad = mu_signed[n] * (X.dot(r_n_c) ) * (1/np.sqrt(T)) + return np.reshape(c_grad, (-1, 1)) + + + if constraint == 'psd': + # compute psd of X + X_hat = (2/T) * np.fft.rfft(X, axis = 1) + + # compute cross psd of X + C_hat = np.zeros((X_hat.shape[1], N , N ), dtype=complex) + C_hat = (X_hat[:, None, :] * np.conjugate(X_hat[None, :, :])).transpose(2, 0, 1) + + # center C_hat + C_hat_mean = np.mean(C_hat, axis = 0) + C_hat = C_hat - np.reshape(C_hat_mean, (1, N, N)) + + # store real matrix for gradient computation + C_tilde = np.real(C_hat + np.transpose(C_hat, (0, 2, 1)) ) + + # define correlation between psd of estimated source and reference psd + def epsilon(a,b): + # power spectral density + # compute psd for real signal a + # b is already a psd + psd_a = (2/T) * np.abs(np.fft.rfft(a))**2 + psd_correlation = np.corrcoef(psd_a, b)[0,1] + return psd_correlation + + # define dot product between psd of estimated source and reference psd + def epsilon_dot(a,b): + # abs of dot product + a = a / np.linalg.norm(a) + psd_a = (2/T) * np.abs(np.fft.rfft(a))**2 + psd_a = psd_a - np.mean(psd_a) + b = b - np.mean(b) + b = b / np.linalg.norm(b) + abs_dot = np.abs(np.dot(psd_a, b)) + return abs_dot + + # define gradient function for psd constraint + def epsilon_grad(): + # compute gradient of epsilon for the psd constraint + psd_s = (2/T) * np.abs(np.fft.rfft(w.T.dot(X)))**2 + sign = np.sign(np.corrcoef(psd_s, r_n_c)[0,1]) + r = np.reshape(r_n_c, (-1, 1, 1)) + c_grad = sign * mu_c[n] * np.sum( np.multiply(np.dot(C_tilde, w), r), axis = 0 ) + return c_grad + + + # make initial guess for demixing matrix W + W = np.random.rand(N, N) + + # symmetric decorrelation + W = symdecor(W) + + num_guess = guess_mat.shape[1] # number of reference signals + mu_c = np.ones((num_guess, 1)) + corr_w_guess = np.zeros((num_guess, N)) + num_W = np.shape(W)[0] + corr_w_guess = np.zeros((num_guess, num_W)) + + # resort W based on correlation with reference signals + for kl in range(num_guess): + r_n_c = guess_mat[:, kl] + for lp in range(num_W): + w = W[lp, :].T + corr_w_guess[kl, lp] = epsilon(X.T.dot(w), r_n_c) + + + # may need auction to auction to choose order + max_index = np.argmax(np.abs(corr_w_guess), axis = 1) + if len(np.unique(max_index)) != num_guess: + colsol, _ = auction((1- np.abs(corr_w_guess)).T) + max_index = colsol.T + + c = np.arange(0, num_W) + c = np.setdiff1d(c, max_index) + sort_order = np.concatenate((max_index, c)) + W = W[sort_order, :] + + last_W = np.copy(W) + best_W = np.copy(W) + Cost = np.zeros((max_iter_fastica, 1)) + min_cost = np.inf + cost_increaser_counter = 0 + negentropy_array = np.zeros((N,1 )) + + + for iter in range(max_iter_fastica): + Y = np.copy(W.dot(X)) + for n in range(N): + y = np.copy(Y[n, :]) + # evaluate the upper bound of negentropy of the nth component + NE_Bound = np.zeros((K, 1)) + EGx = np.zeros((K, 1)) + # we only need to calculate these quantities once + yy = y* y + sign_y = np.sign(y) + abs_y = np.abs(y) + inv_pabs_y = 1/(1 + abs_y) + inv_pabs_yy = 1/(1+ yy) + inv_p10abs_y = 1/(10+abs_y) + + # G1(x) = x^4 + EGx[0] = np.sum(yy*yy)/T + if EGx[0] < nf1['min_EGx']: + NE_Bound[0] = simplified_ppval(nf1['pp_slope'], nf1['min_EGx'] ) * (EGx[0] - nf1['min_EGx']) + NE_Bound[0] = simplified_ppval(nf1['pp'],nf1['min_EGx']) + np.abs(NE_Bound[0] ) + else: + if EGx[0] > nf1['max_EGx']: + NE_Bound[0] = 0 + else: + NE_Bound[0] = simplified_ppval(nf1['pp'], EGx[0] ) + + # G3(x) = np.abs(x)/ (1 + np.abs(x)) + EGx[2] = 1 - np.sum(inv_pabs_y)/T + if EGx[2] < nf3['min_EGx']: + NE_Bound[2] = simplified_ppval(nf3['pp_slope'], nf3['min_EGx'] ) * (EGx[2] - nf3['min_EGx']) + NE_Bound[2] = simplified_ppval(nf3['pp'], nf3['min_EGx']) + np.abs(NE_Bound[2]) + else: + if EGx[2] > nf3['max_EGx']: + NE_Bound[2] = simplified_ppval(nf3['pp_slope'], nf3['max_EGx'] ) * (EGx[2] - nf3['max_EGx']) + NE_Bound[2] = simplified_ppval(nf3['pp'], nf3['max_EGx']) + np.abs(NE_Bound[2]) + + else: + NE_Bound[2] = simplified_ppval(nf3['pp'], EGx[2] ) + + # G5(x) = x* np.abs(x) /(10 + np.abs(x)) + EGx[4] = np.sum( y * abs_y * inv_p10abs_y )/T + if EGx[4] < nf5['min_EGx']: + NE_Bound[4] = simplified_ppval(nf5['pp_slope'], nf5['min_EGx'] ) * (EGx[4] - nf5['min_EGx']) + NE_Bound[4] = simplified_ppval(nf5['pp'], nf5['min_EGx']) + np.abs(NE_Bound[4]) + else: + if EGx[4] > nf5['max_EGx']: + NE_Bound[4] = simplified_ppval(nf5['pp_slope'], nf5['max_EGx'] ) * (EGx[4] - nf5['max_EGx']) + NE_Bound[4] = simplified_ppval(nf5['pp'], nf5['max_EGx']) + np.abs(NE_Bound[4]) + else: + NE_Bound[4] = simplified_ppval(nf5['pp'], EGx[4] ) + + # G7(x) = x / (1 + x**2) + EGx[6] = np.sum(y*inv_pabs_yy)/T + if EGx[6] < nf7['min_EGx']: + NE_Bound[6] = simplified_ppval(nf7['pp_slope'], nf7['min_EGx'] ) * (EGx[6] - nf7['min_EGx']) + NE_Bound[6] = simplified_ppval(nf7['pp'], nf7['min_EGx']) + np.abs(NE_Bound[6]) + else: + if EGx[6] > nf7['max_EGx']: + NE_Bound[6] = simplified_ppval(nf7['pp_slope'], nf7['max_EGx'] ) * (EGx[6] - nf7['max_EGx']) + NE_Bound[6] = simplified_ppval(nf7['pp'], nf7['max_EGx']) + np.abs(NE_Bound[6]) + else: + NE_Bound[6] = simplified_ppval(nf7['pp'], EGx[6] ) + + # select the tightest upper bound + max_NE, max_i = np.max(NE_Bound), np.argmax(NE_Bound) + negentropy_array[n] = np.copy(max_NE) + Cost[iter] = np.copy(Cost[iter] - max_NE) + + + if Cost[iter] < min_cost: + min_cost = np.copy(Cost[iter]) + best_W = np.copy(last_W) + cost_increaser_counter = 0 + else: + cost_increaser_counter = cost_increaser_counter + 1 + + W = np.multiply(np.multiply(Y, Y), Y).dot(X.T) / T - 3 * W + W = symdecor(W) + + + if 1 - np.min(np.abs(np.diag(W.dot(last_W.T)))) < tolerance: + break + else : + last_W = np.copy(W) + if cost_increaser_counter > max_cost_increase_number: + break + + W = np.copy(best_W) +############################################################################################################## +# Part 1: Orthogonal ICA +# varying step size, stochastic gradient search +############################################################################################################## + + if verbose: + print('Orthogonal ICA stage.') + + + # resort existing W based on correlation with reference signals + for kl in range(num_guess): + r_n_c = guess_mat[:, kl] + for lp in range(num_W): + w = W[lp, :].T + corr_w_guess[kl, lp] = epsilon(X.T.dot(w), r_n_c) + + # may need auction to auction to choose order + max_index = np.argmax(np.abs(corr_w_guess), axis = 1) + + if len(np.unique(max_index)) != num_guess: + colsol, _ = auction((1- np.abs(corr_w_guess)).T) + max_index = colsol.T + + c = np.arange(0, num_W) + c = np.setdiff1d(c, max_index) + sort_order = np.concatenate((max_index, c)) + + W = W[sort_order, :] + + + last_W = np.copy(W) + best_W = np.copy(W) + Cost = np.zeros((max_iter_orth, 1)) + min_cost = np.inf + min_cost_queue = np.copy(min_cost* np.ones((max_iter_orth, 1))) + mu = 1/6.25 + min_mu = 1/50 + cost_increaser_counter = 0 + fastica_on = True + max_negentropy = np.zeros((N, 1)) + negentropy_array = np.zeros((N, 1)) + + for iter in range(max_iter_orth): + Y = np.copy(W.dot(X)) + for n in range(N): + w = np.copy(W[n, :].T) + y = np.copy(Y[n, :] ) + + # evaluate the upper bound of negentropy of the nth component + NE_Bound = np.zeros((K, 1)) + EGx = np.zeros((K, 1)) + # we only need to calculate these quantities once + yy = y* y + sign_y = np.sign(y) + abs_y = np.abs(y) + inv_pabs_y = 1/(1 + abs_y) + inv_pabs_yy = 1/(1+ yy) + inv_p10abs_y = 1/(10+abs_y) + + # G1(x) = x^4 + EGx[0] = np.sum(yy*yy)/T + if EGx[0] < nf1['min_EGx']: + NE_Bound[0] = simplified_ppval(nf1['pp_slope'], nf1['min_EGx'] ) * (EGx[0] - nf1['min_EGx']) + NE_Bound[0] = simplified_ppval(nf1['pp'],nf1['min_EGx']) + np.abs(NE_Bound[0] ) + else: + if EGx[0] > nf1['max_EGx']: + NE_Bound[0] = 0 + else: + NE_Bound[0] = simplified_ppval(nf1['pp'], EGx[0] ) + + # G3(x) = np.abs(x)/ (1 + np.abs(x)) + EGx[2] = 1 - np.sum(inv_pabs_y)/T + if EGx[2] < nf3['min_EGx']: + NE_Bound[2] = simplified_ppval(nf3['pp_slope'], nf3['min_EGx'] ) * (EGx[2] - nf3['min_EGx']) + NE_Bound[2] = simplified_ppval(nf3['pp'], nf3['min_EGx']) + np.abs(NE_Bound[2]) + else: + if EGx[2] > nf3['max_EGx']: + NE_Bound[2] = simplified_ppval(nf3['pp_slope'], nf3['max_EGx'] ) * (EGx[2] - nf3['max_EGx']) + NE_Bound[2] = simplified_ppval(nf3['pp'], nf3['max_EGx']) + np.abs(NE_Bound[2]) + + else: + NE_Bound[2] = simplified_ppval(nf3['pp'], EGx[2] ) + + # G5(x) = x* np.abs(x) /(10 + np.abs(x)) + EGx[4] = np.sum( y * abs_y * inv_p10abs_y )/T + if EGx[4] < nf5['min_EGx']: + NE_Bound[4] = simplified_ppval(nf5['pp_slope'], nf5['min_EGx'] ) * (EGx[4] - nf5['min_EGx']) + NE_Bound[4] = simplified_ppval(nf5['pp'], nf5['min_EGx']) + np.abs(NE_Bound[4]) + else: + if EGx[4] > nf5['max_EGx']: + NE_Bound[4] = simplified_ppval(nf5['pp_slope'], nf5['max_EGx'] ) * (EGx[4] - nf5['max_EGx']) + NE_Bound[4] = simplified_ppval(nf5['pp'], nf5['max_EGx']) + np.abs(NE_Bound[4]) + else: + NE_Bound[4] = simplified_ppval(nf5['pp'], EGx[4] ) + + # G7(x) = x / (1 + x**2) + EGx[6] = np.sum(y*inv_pabs_yy)/T + if EGx[6] < nf7['min_EGx']: + NE_Bound[6] = simplified_ppval(nf7['pp_slope'], nf7['min_EGx'] ) * (EGx[6] - nf7['min_EGx']) + NE_Bound[6] = simplified_ppval(nf7['pp'], nf7['min_EGx']) + np.abs(NE_Bound[6]) + else: + if EGx[6] > nf7['max_EGx']: + NE_Bound[6] = simplified_ppval(nf7['pp_slope'], nf7['max_EGx'] ) * (EGx[6] - nf7['max_EGx']) + NE_Bound[6] = simplified_ppval(nf7['pp'], nf7['max_EGx']) + np.abs(NE_Bound[6]) + else: + NE_Bound[6] = simplified_ppval(nf7['pp'], EGx[6] ) + + # select the tightest upper bound + max_NE, max_i = np.max(NE_Bound), np.argmax(NE_Bound) + negentropy_array[n] = np.copy(max_NE) + Cost[iter] = np.copy(Cost[iter] - max_NE) + + if ~fastica_on: + weight = np.random.rand(1, T) + + # Perform orthogonal ICA + if max_i == 0: + # G1(x) = x^4 + if fastica_on : + grad = X.dot( (4* y* yy).T )/T + Edgx = 12 + else : + grad = X.dot((4 * weight * y * yy ).T ) / np.sum(weight) + vEGx = 2 * (EGx[0] > nf1['critical_point']) -1 + elif max_i == 2: + # G3(x) = np.abs(x)/ (1 + np.abs(x)) + if fastica_on : + grad = X.dot( (sign_y * inv_pabs_y * inv_pabs_y).T )/T + Edgx = np.sum(-2 * inv_pabs_y * inv_pabs_y * inv_pabs_y)/T + else : + grad = X.dot((weight * sign_y * inv_pabs_y * inv_pabs_y).T ) / np.sum(weight) + vEGx = 2 * (EGx[2] > nf3['critical_point']) -1 + elif max_i == 4: + # G5(x) = x* np.abs(x) /(10 + np.abs(x)) + if fastica_on : + grad = X.dot((abs_y *(20 + abs_y) * inv_p10abs_y * inv_p10abs_y).T )/T + Edgx = np.sum(200 * sign_y * inv_p10abs_y * inv_p10abs_y * inv_p10abs_y)/T + else : + grad = X.dot((weight * abs_y * (20 + abs_y) * inv_p10abs_y**2 ).T ) / np.sum(weight) + vEGx = 2 * (EGx[4] > nf5['critical_point']) -1 + elif max_i == 6: + # G7(x) = x / (1 + x**2) + if fastica_on : + grad = X.dot(((1 - yy)* inv_pabs_yy**2).T )/T + Edgx = np.sum(2 * y * (yy-3)* inv_pabs_yy* inv_pabs_yy* inv_pabs_yy)/T + else : + grad = X.dot((weight * (1 - yy) * inv_pabs_yy**2 ).T ) / np.sum(weight) + vEGx = 2 * (EGx[6] > nf7['critical_point']) -1 + if fastica_on : + w1 = grad - Edgx * w + else : + grad = vEGx * grad + w = np.reshape(w, (-1, 1)) + grad = grad - ((w.T).dot(grad)) * w + grad = grad / np.linalg.norm(grad) + w1 = w + mu * grad + + W[n, :] = np.copy(w1.T) + + W = np.copy(symdecor(W)) + + if Cost[iter] < min_cost: + cost_increaser_counter = 0 + min_cost = np.copy(Cost[iter]) + best_W = np.copy(last_W) + max_negentropy = np.copy(negentropy_array) + else: + cost_increaser_counter = cost_increaser_counter + 1 + + min_cost_queue[iter] = np.copy(min_cost) + + if fastica_on : + if cost_increaser_counter >= max_cost_increase_number or 1- np.min(np.abs(np.diag(W.dot(last_W.T)))) < tolerance: + cost_increaser_counter = 0 + W = np.copy(best_W ) + last_W = np.copy(W) + fastica_on = False + continue + else : + if cost_increaser_counter > stochastic_search_factor * max_cost_increase_number: + if mu > min_mu: + cost_increaser_counter = 0 + W = np.copy(best_W ) + last_W = np.copy(W) + mu = mu/2 + continue + else: + break + last_W = np.copy(W) + + # End of Part 1 + W = np.copy(best_W) + ############################################################################################################## + # Part 2: check for saddle points + ############################################################################################################## + if saddle_test_enable : + if verbose: + print('Saddle point detection.') + SADDLE_TESTED = False + saddle_tested = True + + while saddle_tested: + saddle_tested = False + Y = np.copy(W.dot(X)) + for m in range(N): + w1 = np.copy(W[m, :].T ) + ym = np.copy(Y[m, :]) + for n in range(m+1, N): + w2 = np.copy(W[n, :].T ) + yn = np.copy(Y[n, :]) + + yr1 = (ym + yn)/ np.sqrt(2) + yr2 = (ym - yn)/ np.sqrt(2) + y = np.copy(yr1) + # we only need to calculate these quantities once + yy = y* y + sign_y = np.sign(y) + abs_y = np.abs(y) + inv_pabs_y = 1/(1 + abs_y) + inv_pabs_yy = 1/(1+ yy) + inv_p10abs_y = 1/(10+abs_y) + + # G1(x) = x^4 + EGx[0] = np.sum(yy*yy)/T + if EGx[0] < nf1['min_EGx']: + NE_Bound[0] = simplified_ppval(nf1['pp_slope'], nf1['min_EGx'] ) * (EGx[0] - nf1['min_EGx']) + NE_Bound[0] = simplified_ppval(nf1['pp'],nf1['min_EGx']) + np.abs(NE_Bound[0] ) + else: + if EGx[0] > nf1['max_EGx']: + NE_Bound[0] = 0 + else: + NE_Bound[0] = simplified_ppval(nf1['pp'], EGx[0] ) + + # G3(x) = np.abs(x)/ (1 + np.abs(x)) + EGx[2] = 1 - np.sum(inv_pabs_y)/T + if EGx[2] < nf3['min_EGx']: + NE_Bound[2] = simplified_ppval(nf3['pp_slope'], nf3['min_EGx'] ) * (EGx[2] - nf3['min_EGx']) + NE_Bound[2] = simplified_ppval(nf3['pp'], nf3['min_EGx']) + np.abs(NE_Bound[2]) + else: + if EGx[2] > nf3['max_EGx']: + NE_Bound[2] = simplified_ppval(nf3['pp_slope'], nf3['max_EGx'] ) * (EGx[2] - nf3['max_EGx']) + NE_Bound[2] = simplified_ppval(nf3['pp'], nf3['max_EGx']) + np.abs(NE_Bound[2]) + + else: + NE_Bound[2] = simplified_ppval(nf3['pp'], EGx[2] ) + + # G5(x) = x* np.abs(x) /(10 + np.abs(x)) + EGx[4] = np.sum( y * abs_y * inv_p10abs_y )/T + if EGx[4] < nf5['min_EGx']: + NE_Bound[4] = simplified_ppval(nf5['pp_slope'], nf5['min_EGx'] ) * (EGx[4] - nf5['min_EGx']) + NE_Bound[4] = simplified_ppval(nf5['pp'], nf5['min_EGx']) + np.abs(NE_Bound[4]) + else: + if EGx[4] > nf5['max_EGx']: + NE_Bound[4] = simplified_ppval(nf5['pp_slope'], nf5['max_EGx'] ) * (EGx[4] - nf5['max_EGx']) + NE_Bound[4] = simplified_ppval(nf5['pp'], nf5['max_EGx']) + np.abs(NE_Bound[4]) + else: + NE_Bound[4] = simplified_ppval(nf5['pp'], EGx[4] ) + + # G7(x) = x / (1 + x**2) + EGx[6] = np.sum(y*inv_pabs_yy)/T + if EGx[6] < nf7['min_EGx']: + NE_Bound[6] = simplified_ppval(nf7['pp_slope'], nf7['min_EGx'] ) * (EGx[6] - nf7['min_EGx']) + NE_Bound[6] = simplified_ppval(nf7['pp'], nf7['min_EGx']) + np.abs(NE_Bound[6]) + else: + if EGx[6] > nf7['max_EGx']: + NE_Bound[6] = simplified_ppval(nf7['pp_slope'], nf7['max_EGx'] ) * (EGx[6] - nf7['max_EGx']) + NE_Bound[6] = simplified_ppval(nf7['pp'], nf7['max_EGx']) + np.abs(NE_Bound[6]) + else: + NE_Bound[6] = simplified_ppval(nf7['pp'], EGx[6] ) + + # select the tightest upper bound + max_NE, max_i = np.max(NE_Bound), np.argmax(NE_Bound) + negentropy1 = max_NE + + y = np.copy(yr2) + # we only need to calculate these quantities once + yy = y* y + sign_y = np.sign(y) + abs_y = np.abs(y) + inv_pabs_y = 1/(1 + abs_y) + inv_pabs_yy = 1/(1+ yy) + inv_p10abs_y = 1/(10+abs_y) + + # G1(x) = x^4 + EGx[0] = np.sum(yy*yy)/T + if EGx[0] < nf1['min_EGx']: + NE_Bound[0] = simplified_ppval(nf1['pp_slope'], nf1['min_EGx'] ) * (EGx[0] - nf1['min_EGx']) + NE_Bound[0] = simplified_ppval(nf1['pp'],nf1['min_EGx']) + np.abs(NE_Bound[0] ) + else: + if EGx[0] > nf1['max_EGx']: + NE_Bound[0] = 0 + else: + NE_Bound[0] = simplified_ppval(nf1['pp'], EGx[0] ) + + # G3(x) = np.abs(x)/ (1 + np.abs(x)) + EGx[2] = 1 - np.sum(inv_pabs_y)/T + if EGx[2] < nf3['min_EGx']: + NE_Bound[2] = simplified_ppval(nf3['pp_slope'], nf3['min_EGx'] ) * (EGx[2] - nf3['min_EGx']) + NE_Bound[2] = simplified_ppval(nf3['pp'], nf3['min_EGx']) + np.abs(NE_Bound[2]) + else: + if EGx[2] > nf3['max_EGx']: + NE_Bound[2] = simplified_ppval(nf3['pp_slope'], nf3['max_EGx'] ) * (EGx[2] - nf3['max_EGx']) + NE_Bound[2] = simplified_ppval(nf3['pp'], nf3['max_EGx']) + np.abs(NE_Bound[2]) + + else: + NE_Bound[2] = simplified_ppval(nf3['pp'], EGx[2] ) + + # G5(x) = x* np.abs(x) /(10 + np.abs(x)) + EGx[4] = np.sum( y * abs_y * inv_p10abs_y )/T + if EGx[4] < nf5['min_EGx']: + NE_Bound[4] = simplified_ppval(nf5['pp_slope'], nf5['min_EGx'] ) * (EGx[4] - nf5['min_EGx']) + NE_Bound[4] = simplified_ppval(nf5['pp'], nf5['min_EGx']) + np.abs(NE_Bound[4]) + else: + if EGx[4] > nf5['max_EGx']: + NE_Bound[4] = simplified_ppval(nf5['pp_slope'], nf5['max_EGx'] ) * (EGx[4] - nf5['max_EGx']) + NE_Bound[4] = simplified_ppval(nf5['pp'], nf5['max_EGx']) + np.abs(NE_Bound[4]) + else: + NE_Bound[4] = simplified_ppval(nf5['pp'], EGx[4] ) + + # G7(x) = x / (1 + x**2) + EGx[6] = np.sum(y*inv_pabs_yy)/T + if EGx[6] < nf7['min_EGx']: + NE_Bound[6] = simplified_ppval(nf7['pp_slope'], nf7['min_EGx'] ) * (EGx[6] - nf7['min_EGx']) + NE_Bound[6] = simplified_ppval(nf7['pp'], nf7['min_EGx']) + np.abs(NE_Bound[6]) + else: + if EGx[6] > nf7['max_EGx']: + NE_Bound[6] = simplified_ppval(nf7['pp_slope'], nf7['max_EGx'] ) * (EGx[6] - nf7['max_EGx']) + NE_Bound[6] = simplified_ppval(nf7['pp'], nf7['max_EGx']) + np.abs(NE_Bound[6]) + else: + NE_Bound[6] = simplified_ppval(nf7['pp'], EGx[6] ) + + # select the tightest upper bound + max_NE, max_i = np.max(NE_Bound), np.argmax(NE_Bound) + negentropy2 = max_NE + + if negentropy1 + negentropy2 > max_negentropy[m] + max_negentropy[n]+ eps : + if verbose: + print('Rotationg %d and %d.' % (m, n)) + max_negentropy[m] = np.copy(negentropy1) + max_negentropy[n] = np.copy(negentropy2) + W[m, : ] = np.copy((w1+ w2).T/np.sqrt(2)) + W[n, : ] = np.copy((w1- w2).T/np.sqrt(2) ) + Y[m, :] = yr1 + Y[n, :] = yr2 + ym = yr1 + w1 = np.copy(W[m, :].T ) + saddle_tested = True + SADDLE_TESTED = True + + + else: + SADDLE_TESTED = False + + if SADDLE_TESTED : + ############################################################################################################## + # Part 3: if saddle points are detected, refine orthogonal ICA + # fix step size gradient search + ############################################################################################################## + if verbose: + print('Orthogonal ICA refinement is required because saddle points are detected.') + last_W = np.copy(W) + best_W = np.copy(W) + Cost = np.zeros((max_iter_orth_refine, 1)) + min_cost = np.inf + min_cost_queue = min_cost * np.ones((max_iter_orth_refine, 1)) + mu = 1/ 50 + cost_increaser_counter = 0 + fastica_on = True + + for iter in range(max_iter_orth_refine): + for n in range(N): + w = np.copy(W[n, :].T) + y = np.copy(w.T.dot(X)) + # evaluate the upper bound of negentropy of the nth component + NE_Bound = np.zeros((K, 1)) + EGx = np.zeros((K, 1)) + # we only need to calculate these quantities once + yy = y* y + sign_y = np.sign(y) + abs_y = np.abs(y) + inv_pabs_y = 1/(1 + abs_y) + inv_pabs_yy = 1/(1+ yy) + inv_p10abs_y = 1/(10+abs_y) + + # G1(x) = x^4 + EGx[0] = np.sum(yy*yy)/T + if EGx[0] < nf1['min_EGx']: + NE_Bound[0] = simplified_ppval(nf1['pp_slope'], nf1['min_EGx'] ) * (EGx[0] - nf1['min_EGx']) + NE_Bound[0] = simplified_ppval(nf1['pp'],nf1['min_EGx']) + np.abs(NE_Bound[0] ) + else: + if EGx[0] > nf1['max_EGx']: + NE_Bound[0] = 0 + else: + NE_Bound[0] = simplified_ppval(nf1['pp'], EGx[0] ) + + # G3(x) = np.abs(x)/ (1 + np.abs(x)) + EGx[2] = 1 - np.sum(inv_pabs_y)/T + if EGx[2] < nf3['min_EGx']: + NE_Bound[2] = simplified_ppval(nf3['pp_slope'], nf3['min_EGx'] ) * (EGx[2] - nf3['min_EGx']) + NE_Bound[2] = simplified_ppval(nf3['pp'], nf3['min_EGx']) + np.abs(NE_Bound[2]) + else: + if EGx[2] > nf3['max_EGx']: + NE_Bound[2] = simplified_ppval(nf3['pp_slope'], nf3['max_EGx'] ) * (EGx[2] - nf3['max_EGx']) + NE_Bound[2] = simplified_ppval(nf3['pp'], nf3['max_EGx']) + np.abs(NE_Bound[2]) + + else: + NE_Bound[2] = simplified_ppval(nf3['pp'], EGx[2] ) + + # G5(x) = x* np.abs(x) /(10 + np.abs(x)) + EGx[4] = np.sum( y * abs_y * inv_p10abs_y )/T + if EGx[4] < nf5['min_EGx']: + NE_Bound[4] = simplified_ppval(nf5['pp_slope'], nf5['min_EGx'] ) * (EGx[4] - nf5['min_EGx']) + NE_Bound[4] = simplified_ppval(nf5['pp'], nf5['min_EGx']) + np.abs(NE_Bound[4]) + else: + if EGx[4] > nf5['max_EGx']: + NE_Bound[4] = simplified_ppval(nf5['pp_slope'], nf5['max_EGx'] ) * (EGx[4] - nf5['max_EGx']) + NE_Bound[4] = simplified_ppval(nf5['pp'], nf5['max_EGx']) + np.abs(NE_Bound[4]) + else: + NE_Bound[4] = simplified_ppval(nf5['pp'], EGx[4] ) + + # G7(x) = x / (1 + x**2) + EGx[6] = np.sum(y*inv_pabs_yy)/T + if EGx[6] < nf7['min_EGx']: + NE_Bound[6] = simplified_ppval(nf7['pp_slope'], nf7['min_EGx'] ) * (EGx[6] - nf7['min_EGx']) + NE_Bound[6] = simplified_ppval(nf7['pp'], nf7['min_EGx']) + np.abs(NE_Bound[6]) + else: + if EGx[6] > nf7['max_EGx']: + NE_Bound[6] = simplified_ppval(nf7['pp_slope'], nf7['max_EGx'] ) * (EGx[6] - nf7['max_EGx']) + NE_Bound[6] = simplified_ppval(nf7['pp'], nf7['max_EGx']) + np.abs(NE_Bound[6]) + else: + NE_Bound[6] = simplified_ppval(nf7['pp'], EGx[6] ) + + # select the tightest upper bound + max_NE, max_i = np.max(NE_Bound), np.argmax(NE_Bound) + negentropy_array[n] = max_NE + Cost[iter] = np.copy(Cost[iter] - max_NE) + + # Perform orthogonal ICA + if max_i == 0: + # G1(x) = x^4 + if fastica_on : + grad = X.dot( (4* y* yy).T )/T + Edgx = 12 + else : + grad = X.dot((4 * weight * y * yy ).T ) / np.sum(weight) + vEGx = 2 * (EGx[0] > nf1['critical_point']) -1 + elif max_i == 2: + # G3(x) = np.abs(x)/ (1 + np.abs(x)) + if fastica_on : + grad = X.dot( (sign_y * inv_pabs_y * inv_pabs_y).T )/T + Edgx = np.sum(-2 * inv_pabs_y * inv_pabs_y * inv_pabs_y)/T + else : + grad = X.dot((weight * sign_y * inv_pabs_y * inv_pabs_y).T ) / np.sum(weight) + vEGx = 2 * (EGx[2] > nf3['critical_point']) -1 + elif max_i == 4: + # G5(x) = x* np.abs(x) /(10 + np.abs(x)) + if fastica_on : + grad = X.dot((abs_y *(20 + abs_y) * inv_p10abs_y * inv_p10abs_y).T )/T + Edgx = np.sum(200 * sign_y * inv_p10abs_y * inv_p10abs_y * inv_p10abs_y)/T + else : + grad = X.dot((weight * abs_y * (20 + abs_y) * inv_p10abs_y**2 ).T ) / np.sum(weight) + vEGx = 2 * (EGx[4] > nf5['critical_point']) -1 + elif max_i == 6: + # G7(x) = x / (1 + x**2) + if fastica_on : + grad = X.dot(((1 - yy)* inv_pabs_yy**2).T )/T + Edgx = np.sum(2 * y * (yy-3)* inv_pabs_yy* inv_pabs_yy* inv_pabs_yy)/T + else : + grad = X.dot((weight * (1 - yy) * inv_pabs_yy**2 ).T ) / np.sum(weight) + vEGx = 2 * (EGx[6] > nf7['critical_point']) -1 + if fastica_on : + w1 = grad - Edgx * w + else : + grad = vEGx * grad + w = np.reshape(w, (-1, 1)) + grad = grad - ((w.T).dot(grad)) * w + grad = grad / np.linalg.norm(grad) + w1 = w + mu * grad + + W[n, :] = np.copy(w1.T) + + W = np.copy(symdecor(W) ) + + if Cost[iter] < min_cost: + cost_increaser_counter = 0 + min_cost = np.copy(Cost[iter]) + best_W = np.copy(last_W) + max_negentropy = np.copy(negentropy_array) + else: + cost_increaser_counter = cost_increaser_counter + 1 + + min_cost_queue[iter] = np.copy(min_cost) + + + if fastica_on : + if cost_increaser_counter >= max_cost_increase_number or 1- np.min(np.abs(np.diag(W.dot(last_W.T)))) < tolerance: + cost_increaser_counter = 0 + W = np.copy(best_W ) + last_W = np.copy(W) + fastica_on = False + continue + else : + if cost_increaser_counter > stochastic_search_factor * max_cost_increase_number: + break + last_W = np.copy(W) + + W = np.copy(best_W) +############################################################################################################## +# Part 4: non-orthogonal ICA +# fix small step size for refinement, gradient search +############################################################################################################## + if verbose: + print('Non-orthogonal ICA stage.') + + # resort W based on correlation with reference signals + for kl in range(num_guess): + r_n_c = guess_mat[:, kl] + for lp in range(num_W): + w = W[lp, :].T + corr_w_guess[kl, lp] = epsilon( X.T.dot(w), r_n_c) + + + # may need auction to to choose order + max_index = np.argmax(np.abs(corr_w_guess), axis = 1) + + if len(np.unique(max_index)) != num_guess: + colsol, _ = auction((1- np.abs(corr_w_guess)).T) + max_index = colsol.T + + c = np.arange(0, num_W) + c = np.setdiff1d(c, max_index) + sort_order = np.concatenate((max_index, c)) + W = np.copy(W[sort_order, :]) + + last_W = np.copy(W) + best_W = np.copy(W) + Cost = np.zeros((max_iter_nonorth, 1)) + min_cost_queue = np.copy(min_cost * np.ones((max_iter_nonorth, 1))) + mu = 1 + min_mu = 1/200 + max_cost_increase_number = 3 + cost_increaser_counter = 0 + mu_idx = np.full(mu_c.shape, False) + + decaying_factor = 0.95 + min_change = 1e-4 + min_iter = 100 + mu_old = np.copy(mu_c) + rho_n_arr = np.zeros((max_iter_nonorth, N)) + mu_signed = np.zeros((N, 1 )) + for iter in range(max_iter_nonorth): + Cost[iter] = np.copy(- np.log(np.abs(np.linalg.det(W)))) + + for n in range(N): + if N > 7: + if n == 0: + Wn = np.copy(W[1:N, :]) + inv_Q = np.copy(np.linalg.inv(Wn.dot(Wn.T))) + else: + n_last = np.copy(n-1) + Wn_last = np.copy(np.delete(W, n_last, axis = 0)) + w_current = np.copy(W[n, :].T ) + w_last = np.copy(W[n_last, :].T) + + c = Wn_last.dot(w_last- w_current) + c[n_last ] = 0.5* ((w_last.T).dot(w_last) - (w_current.T).dot(w_current) ) + e_last = np.zeros((N-1, 1)) + e_last[n_last] = 1 + + temp1 = np.reshape(inv_Q.dot(c), (-1, 1 )) + temp2 = np.reshape(inv_Q[:, n_last ], (-1, 1)) + inv_Q_plus = inv_Q - (temp1.dot(temp2.T) / (1 + temp1[n_last])) + + temp1 = np.reshape(inv_Q_plus.T.dot(c), (-1, 1)) + temp2 = np.reshape(inv_Q_plus[:, n_last ], (-1, 1 )) + inv_Q = inv_Q_plus - (temp1.dot(temp2.T) / (1 + c.T.dot(temp2))) + # make inv_Q hermitian + inv_Q = np.copy((inv_Q + inv_Q.T )/2 ) + + + + temp1 = np.random.rand(N, 1) + W_n = np.copy(np.delete(W, n, axis = 0)) + h = temp1 - W_n.T.dot(inv_Q.dot(W_n.dot(temp1))) + + else: + temp1 = np.random.rand(N, 1) + temp2 = np.copy(np.delete(W, n, axis = 0) ) + h = temp1 - temp2.T.dot(np.linalg.inv(temp2.dot(temp2.T)).dot(temp2.dot(temp1))) + + w = np.copy(W[n, :].T ) + y = np.copy(w.T.dot(X)) + + # evaluate the upper bound of negentropy of the nth component + NE_Bound = np.zeros((K, 1)) + EGx = np.zeros((K, 1)) + + # we only need to calculate these quantities once + yy = y* y + sign_y = np.sign(y) + abs_y = np.abs(y) + inv_pabs_y = 1/(1 + abs_y) + inv_pabs_yy = 1/(1+ yy) + inv_p10abs_y = 1/(10+abs_y) + + # G1(x) = x^4 + EGx[0] = np.sum(yy*yy)/T + if EGx[0] < nf1['min_EGx']: + NE_Bound[0] = simplified_ppval(nf1['pp_slope'], nf1['min_EGx'] ) * (EGx[0] - nf1['min_EGx']) + NE_Bound[0] = simplified_ppval(nf1['pp'],nf1['min_EGx']) + np.abs(NE_Bound[0] ) + else: + if EGx[0] > nf1['max_EGx']: + NE_Bound[0] = 0 + else: + NE_Bound[0] = simplified_ppval(nf1['pp'], EGx[0] ) + + # G3(x) = np.abs(x)/ (1 + np.abs(x)) + EGx[2] = 1 - np.sum(inv_pabs_y)/T + + if EGx[2] < nf3['min_EGx']: + NE_Bound[2] = simplified_ppval(nf3['pp_slope'], nf3['min_EGx'] ) * (EGx[2] - nf3['min_EGx']) + NE_Bound[2] = simplified_ppval(nf3['pp'], nf3['min_EGx']) + np.abs(NE_Bound[2]) + else: + if EGx[2] > nf3['max_EGx']: + NE_Bound[2] = simplified_ppval(nf3['pp_slope'], nf3['max_EGx'] ) * (EGx[2] - nf3['max_EGx']) + NE_Bound[2] = simplified_ppval(nf3['pp'], nf3['max_EGx']) + np.abs(NE_Bound[2]) + + else: + NE_Bound[2] = simplified_ppval(nf3['pp'], EGx[2] ) + + + # G5(x) = x* np.abs(x) /(10 + np.abs(x)) + EGx[4] = np.sum( y * abs_y * inv_p10abs_y )/T + if EGx[4] < nf5['min_EGx']: + NE_Bound[4] = simplified_ppval(nf5['pp_slope'], nf5['min_EGx'] ) * (EGx[4] - nf5['min_EGx']) + NE_Bound[4] = simplified_ppval(nf5['pp'], nf5['min_EGx']) + np.abs(NE_Bound[4]) + else: + if EGx[4] > nf5['max_EGx']: + NE_Bound[4] = simplified_ppval(nf5['pp_slope'], nf5['max_EGx'] ) * (EGx[4] - nf5['max_EGx']) + NE_Bound[4] = simplified_ppval(nf5['pp'], nf5['max_EGx']) + np.abs(NE_Bound[4]) + else: + NE_Bound[4] = simplified_ppval(nf5['pp'], EGx[4] ) + + # G7(x) = x / (1 + x**2) + EGx[6] = np.sum(y*inv_pabs_yy)/T + if EGx[6] < nf7['min_EGx']: + NE_Bound[6] = simplified_ppval(nf7['pp_slope'], nf7['min_EGx'] ) * (EGx[6] - nf7['min_EGx']) + NE_Bound[6] = simplified_ppval(nf7['pp'], nf7['min_EGx']) + np.abs(NE_Bound[6]) + else: + if EGx[6] > nf7['max_EGx']: + NE_Bound[6] = simplified_ppval(nf7['pp_slope'], nf7['max_EGx'] ) * (EGx[6] - nf7['max_EGx']) + NE_Bound[6] = simplified_ppval(nf7['pp'], nf7['max_EGx']) + np.abs(NE_Bound[6]) + else: + NE_Bound[6] = simplified_ppval(nf7['pp'], EGx[6] ) + + # select the tightest upper bound + max_NE, max_i = np.max(NE_Bound), np.argmax(NE_Bound) + Cost[iter] = np.copy(Cost[iter] - max_NE ) + + # Include constraint here: + + if n < num_guess: + # choose reference signal + r_n_c = guess_mat[:, n] + # compute correlation + if constraint == 'psd' : + e_pair = epsilon_dot(y.T, r_n_c ) + else : + e_pair = epsilon(y.T, r_n_c ) + dis_wr = np.abs(e_pair) + + if mu_idx[n] == 1: + rho_n = np.max(rho[rho <= dis_wr]) + else: + rho_n = np.min(rho[rho > dis_wr]) + + if rho.size == 0: + rho_n = 0.01 + # store rho + rho_n_arr[iter, n] = np.copy(rho_n) + # update mu + mu_old[n] = np.copy(mu_c[n]) + + mu_idx[n] = mu_idx[n] or (mu_c[n] >= 1) + mu_idx[n] = mu_idx[n] and (mu_c[n] > 0) + mu_c[n] = np.minimum(1, mu_c[n]) + mu_c[n] = np.maximum(0, mu_c[n] + gam * (rho_n - dis_wr)) + + if constraint == 'psd' : + r_n_c = r_n_c - np.mean(r_n_c) + + r_n_c = r_n_c / np.linalg.norm(r_n_c) + + + if max_i == 0: + # G1(x) = x^4 + vEGx = 2 * (EGx[0] > nf1['critical_point']) - 1 + grad = X.dot((4 * y * yy).T) / T + EGx[0] = np.maximum(np.minimum(EGx[0], nf1['max_EGx']), nf1['min_EGx']) + grad = (h / (h.T.dot(w))) + np.reshape(X.dot((4 * y * yy).T) * simplified_ppval(nf1['pp_slope'], EGx[0]) / T, (-1, 1)) + elif max_i == 2: + # G3(x) = np.abs(x)/ (1 + np.abs(x)) + vEGx = 2 * (EGx[2] > nf3['critical_point']) - 1 + grad = X.dot((sign_y * inv_pabs_y * inv_pabs_y).T) / T + EGx[2] = np.maximum(np.minimum(EGx[2], nf3['max_EGx']), nf3['min_EGx']) + grad = (h / (h.T.dot(w))) + np.reshape(X.dot((sign_y * inv_pabs_y * inv_pabs_y).T) * simplified_ppval(nf3['pp_slope'], EGx[2]) / T, (-1, 1)) + elif max_i == 4: + # G5(x) = x* np.abs(x) /(10 + np.abs(x)) + vEGx = 2 * (EGx[4] > nf5['critical_point']) - 1 + grad = X.dot((abs_y * (20 + abs_y) * inv_p10abs_y * inv_p10abs_y).T) / T + EGx[4] = np.maximum(np.minimum(EGx[4], nf5['max_EGx']), nf5['min_EGx']) + grad = (h / (h.T.dot(w))) + np.reshape(X.dot((abs_y * (20 + abs_y) * inv_p10abs_y * inv_p10abs_y).T) * simplified_ppval(nf5['pp_slope'], EGx[4]) / T, (-1, 1)) + elif max_i == 6: + # G7(x) = x / (1 + x**2) + vEGx = 2 * (EGx[6] > nf7['critical_point']) - 1 + grad = X.dot(((1 - yy) * inv_pabs_yy ** 2).T) / T + EGx[6] = np.maximum(np.minimum(EGx[6], nf7['max_EGx']), nf7['min_EGx']) + grad = (h / (h.T.dot(w))) + np.reshape(X.dot(((1 - yy) * inv_pabs_yy ** 2).T) * simplified_ppval(nf7['pp_slope'], EGx[6]) / T, (-1, 1)) + + w = np.reshape(w, (-1, 1 )) + + # adapt gradient to include constraints + if n < num_guess: + grad = grad + epsilon_grad() + + grad = grad - ((w.T).dot(grad)) * w + grad = grad / np.linalg.norm(grad) + w1 = w + mu * grad + w1 = w1 / np.linalg.norm(w1) + W[n, :] = np.copy(w1.T ) + + + Cost[iter] = Cost[iter]- (np.sum(np.power(mu_c, 2 ) - np.power(mu_old, 2)) / (2* gam )) + mu = np.copy(np.maximum((decaying_factor**(iter + 1)) , min_mu)) + currentChange = np.maximum(0, np.max(1- np.abs(np.diag(last_W.dot(W.T))))) + + if currentChange < min_change and iter > min_iter: + best_W = np.copy(W) + break + else: + last_W = np.copy(W) + + W = best_W + W = W.dot(P) + + return W + + +############################################################################################################### +# These functions are used in the arc-EBM algorithm. +############################################################################################################### + + +def simplified_ppval(pp: dict, xs: float) -> float: + """Helper function for ICA EBM: simplified version of ppval. + This function evaluates a piecewise polynomial at a specific point. + + Args: + pp (dict): a dictionary containing the piecewise polynomial representation of a function + xs (float): the evaluation point + + Returns: + v (float): the value of the function at xs + """ + b = pp['breaks'][0] + c = pp['coefs'] + l_pieces = int(pp['pieces'] ) + k = 4 + # find index + index = float('nan ') + middle_index = float('nan ') + if xs > b[l_pieces-1]: + index = l_pieces-1 + else: + if xs < b[1]: + index = 0 + else : + low_index = 0 + high_index = l_pieces-1 + + while True : + middle_index = int(np.ceil(((0.6* low_index + 0.4* high_index)))) + if xs < b[middle_index]: + high_index = middle_index + else: + low_index = middle_index + if low_index == high_index -1: + index = low_index + break + # now go to local coordinates + xs = xs - b[index] + # nested multiplication + v = c[index, 0] + for i in range(1, k ): + v = v*xs + c[index, i] + return v + +def inv_sqrtmH(B: np.ndarray) -> np.ndarray: + """Helper function for ICA EBM: computes the inverse square root of a matrix. + + Args: + B (np.ndarray): a square matrix + + Returns: + A (np.ndarray): the inverse square root of B + """ + + D, V = np.linalg.eig(B) + order = np.argsort(D) + D = D[order] + V = V[:, order] + d = 1/np.sqrt(D) + A = np.dot(np.dot(V, np.diag(d)), V.T) + return A + +def pre_processing(X: np.ndarray) -> tuple[np.ndarray, np.ndarray]: + """Helper function for ICA EBM: pre-processing (DC removal & spatial pre-whitening). + + Args: + X (np.ndarray, (Channels, Time Points) ): the data matrix [N x T] + + Returns: + X (np.ndarray, (Channels, Time Points)): the pre-processed data matrix [N x T] + P (np.ndarray, (Channels, Channels)): the pre-whitening matrix [N x N] + """ + + # pre-processing program + T = X.shape[1] + # remove DC + Xmean = np.mean(X, axis=1) + X = X - np.tile(Xmean, (T, 1)).T + # spatio pre-whitening + R = np.dot(X, X.T) / T + P = inv_sqrtmH(R) + X = np.dot(P, X) + return X, P + +def symdecor(M: np.ndarray) -> np.ndarray: + """Helper function for ICA EBM: fast symmetric orthogonalization. + + Args: + M (np.ndarray, (Channels, Channels)): the matrix to be orthogonalized [N x N] + + Returns: + W (np.ndarray, (Channels, Channels)): the orthogonalized matrix [N x N] + """ + + D, V = np.linalg.eig(M.dot(M.T)) + order = np.argsort(D) + D = D[order] + V = V[:, order] + B = np.dot(np.ones((M.shape[1], 1)), np.reshape((1/np.sqrt(D)).T, (1, M.shape[1]) )) + W = np.multiply(V, B ).dot(V.T.dot(M)) + return W + + +def auction(assignCost, guard=None): + """ + Performs assignment using Bertsekas' auction algorithm. + + Parameters: + assignCost (ndarray): m x n matrix of costs for associating each row with each column. m >= n. + guard (float, optional): Cost of column non-assignment. All assignments will have cost < guard. + + Returns: + colsol (ndarray): Column assignments, where colsol[j] gives the row assigned to column j. + rowsol (ndarray): Row assignments, where rowsol[i] gives the column assigned to row i. + """ + + m, n = assignCost.shape + + if m < n: + raise ValueError('Cost matrix must have no more columns than rows.') + + # Augment cost matrix with a guard row if specified. + m0 = m + if guard is not None and np.isfinite(guard): + m += 1 + assignCost = np.vstack((assignCost, np.full((1, n), guard))) + + # Initialize return arrays + colsol = np.zeros(n, dtype=int) + rowsol = np.zeros(m, dtype=int) + price = np.zeros(m) + EPS = np.sqrt(np.finfo(float).eps) / (n + 1) + + # 1st step is a full parallel solution. Get bids for all columns + jp = np.arange(n) + f = assignCost.copy() + b1 = np.min(f, axis=0) # cost of the best choice for each column + ip = np.argmin(f, axis=0) # row index of the best choice for each column + f[ip, jp] = np.inf # eliminate the best from contention + + bids = np.min(f, axis=0) - b1 # cost of runner-up choice hence bid + ibid = np.argsort(bids) # Arrange bids so highest are last + + # Now assign best bids (lesser bids are overwritten by better ones). + price[ip[ibid]] += EPS + bids[ibid] + rowsol[ip[ibid]] = jp[ibid] + 1 # +1 to convert to 1-based indexing + iy = np.nonzero(rowsol)[0] + colsol[rowsol[iy] - 1] = iy + 1 # -1 to convert back to 0-based indexing for Python + + # The guard row cannot be assigned (always available) + if m0 < m: + price[m - 1] = 0 + rowsol[m - 1] = 0 + + # Now Continue with non-parallel code handling any contentions. + while not np.all(colsol): + for jp in np.where(colsol == 0)[0]: + f = assignCost[:, jp] + price # costs + b1 = np.min(f) # cost and row of the best choice + ip = np.argmin(f) + if ip >= m0: + colsol[jp] = m + else: + f[ip] = np.inf # eliminate from contention + price[ip] += EPS + np.min(f) - b1 # runner-up choice hence bid + if rowsol[ip]: # take the row away if already assigned + colsol[rowsol[ip] - 1] = 0 + rowsol[ip] = jp + 1 # +1 to convert to 1-based indexing + colsol[jp] = ip + 1 # +1 to convert to 1-based indexing + + # Screen out infeasible assignments + if m > m0: + colsol[colsol == m] = 0 + rowsol = rowsol[:m0] + + return colsol - 1, rowsol - 1 # -1 to convert back to 0-based indexing for Python + diff --git a/src/cedalion/sigdecomp/multimodal/arc_erbm.py b/src/cedalion/sigdecomp/multimodal/arc_erbm.py new file mode 100644 index 00000000..17dfd342 --- /dev/null +++ b/src/cedalion/sigdecomp/multimodal/arc_erbm.py @@ -0,0 +1,919 @@ +"""Independent Component Analysis by Entropy Bound Rate Minimization (ICA-ERBM). + +This code is based on :cite:t:`Li2010B` and :cite:t:`Fu2014`. It was converted from +matlab versions provided by the MLSP Lab at the University of Maryland, which is +available here: https://mlsp.umbc.edu/resources.html. +""" + + +import scipy as sp +import numpy as np +from cedalion.sigdecomp.multimodal import arc_ebm as arc_ebm +import cedalion.data + +def arc_erbm(X: np.ndarray, guess_mat, p: int = None , pr_guess_mat = None) -> np.ndarray: + """ Adaptive-reverse Constrained ICA by Entropy Rate Bound Minimization (arc-ERBM) is a spectrally constrained ICA algorithm. + + Args: + X (np.ndarray, (Channels, Time Points)): the [N x T] input multivariate time series with dimensionality N observations/channels and T time points + + guess_mat (np.ndarray, (Time Points/2 , Referenced Channels)): Frequency reference signal for the reconstruction + + p (int): the filter length for the invertible filter source model, does not need to be specified. Default is p = minimum(11, T/50). + + pr_guess_mat (np.ndarray, (Time Points, Referenced Channels)): Optional time domain reference signal for the reconstruction, however, only frequency characteristics are used. Only needed if Phase Retrieval Projection constraint should be included. + + Returns: + W (np.ndarray, (Channels, Channels)): the [N x N] demixing matrix with weights for N channels/sources. To obtain the independent components, + the demixed signals can be calculated as S = W @ X. + + Initial Contributors: + - Jacqueline Behrendt | j.behrendt@tu-berlin.de | 2026 + + References: + This code is based on the matlab version of bss by Xi-Lin Li (:cite:t:`Li2010B`) + Xi-Lin Li, Tulay Adali, "Blind spatiotemporal separation of second and/or + higher-order correlated sources by entropy rate minimization," + IEEE International Conference on Acoustics, Speech and Signal Processing 2010. + The original matlab version is available at https://mlsp.umbc.edu/resources.html + under the name "Real-valued ICA by entropy bound minimization (ICA-EBM)" + """ + +################# Part 0: pre-processing ################# + + # load measuring functions as global variables + global nf1, nf3, nf5, nf7 + + file_path = cedalion.data.get("measfunc_table.npy") + table = np.load(file_path, allow_pickle=True) + + K = 8 + nf1, nf3, nf5, nf7 = table[0], table[2], table[4], table[6] + + # Apply pre-processing to data + N, T = X.shape + X, P = pre_processing(X) + + # initialize p if it is not provided + if p is None: + p = int(np.minimum(11, T/ 50)) + + if pr_guess_mat is None: + constraint = 'psd' + else: + constraint = 'phase_retrieval' + + # Define similarity measures and gradients for constraints + # normalize reference signals + for i in range(guess_mat.shape[1]): + r_n_c = guess_mat[:, i] + r_n_c = r_n_c - np.mean(r_n_c) + guess_mat[:, i] = np.copy(r_n_c / np.linalg.norm(r_n_c) ) + + # compute psd of X + X_hat = (2/T) * np.fft.rfft(X, axis = 1) + + # compute cross psd of X + C_hat = np.zeros((X_hat.shape[1], N , N ), dtype=complex) + C_hat = (X_hat[:, None, :] * np.conjugate(X_hat[None, :, :])).transpose(2, 0, 1) + + # center C_hat + C_hat_mean = np.mean(C_hat, axis = 0) + C_hat = C_hat - np.reshape(C_hat_mean, (1, N, N)) + + # store real matrix for gradient computation + C_tilde = np.real(C_hat + np.transpose(C_hat, (0, 2, 1)) ) + + # define similarity measure for psd constraint + def epsilon(a,b): + # power spectral density + # b is already a psd + psd_a = (2/T) * np.abs(np.fft.rfft(a))**2 + psd_correlation = np.corrcoef(psd_a, b)[0,1] + return psd_correlation + + def epsilon_grad(r_n_c): + # compute gradient of epsilon for the psd constraint + # compute psd of estimated source + psd_s = (2/T) * np.abs(np.fft.rfft(w.T.dot(X)))**2 + + # compute correlation between estimated and reference psd + current_corr = np.corrcoef(psd_s, r_n_c)[0,1] + sign = np.sign(current_corr) + + # compute gradient vector + vec = (sign / np.linalg.norm(psd_s, 2)) * r_n_c - (np.abs(current_corr) / np.linalg.norm(psd_s, 2)**2) * psd_s + vec = vec.reshape((-1, 1, 1)) + c_grad = (T/2) * mu_c[n] * np.sum( np.multiply(np.dot(C_tilde, w), vec), axis = 0 ) + return c_grad + + if constraint == 'phase_retrieval' : + amplitude = pr_guess_mat + + def pr_update(amp, y, filter, X_filtered): + # this function applies a phase retrieval update step + + # filter reference amplitude + amp_filtered = sp.signal.lfilter(filter, 1, amp , axis = 0 ) + amp_filtered = np.abs(np.fft.rfft(amp_filtered)) + + # compute fft of estimated source + y_hat = np.fft.rfft(y) + + # project onto magnitude constraint + g_hat = amp_filtered * np.exp(1j * np.angle(y_hat)) + g_hat = g_hat.reshape((-1, 1)) + + # inverse fft to time domain + g = np.fft.irfft(g_hat, axis = 0 , n =X_filtered.shape[1] ) + + # compute corresponding weights + returns = np.linalg.lstsq(X_filtered.T, g.flatten()) + v_tilde = returns[0] + + return v_tilde + + + # initialize W + W = arc_ebm.arc_ebm(X, guess_mat, 'psd') + + gam = 2500 + decaying_factor = 0.99 + + # Choose set of threshold values + rho = np.arange(0, 1.01, 0.01) + + + # Number of reference signals + num_guess = guess_mat.shape[1] + + mu_c = np.ones((num_guess, 1)) + corr_w_guess = np.zeros((num_guess, N)) + num_W = np.shape(W)[0] + corr_w_guess = np.zeros((num_guess, num_W)) + + # Resort W based on correlation with reference signals + for kl in range(num_guess): + r_n_c = guess_mat[:, kl] + for lp in range(num_W): + w = W[lp, :].T + corr_w_guess[kl, lp] = epsilon(X.T.dot(w), r_n_c) + + # May need auction to auction to choose order + max_index = np.argmax(np.abs(corr_w_guess), axis = 1) + if len(np.unique(max_index)) != num_guess: + colsol, _ = auction((1- np.abs(corr_w_guess)).T) + max_index = colsol.T + c = np.arange(0, num_W) + c = np.setdiff1d(c, max_index) + sort_order = np.concatenate((max_index, c)) + W = W[sort_order, :] + + + if p == 1: + W = W.dot(P) + return W + + # prediction coefficients + a = np.zeros((p, N )) + for n in range(N): + a[int(np.floor((p+1)/2) - 1 ), n ] = 1 + + Rz = np.zeros((N, N, N)) + temp5 = np.zeros((T, N)) + Z = np.zeros((N,T,N)) + + # Prepare the data used in integral + calculate_cos_sin_mtx(p) + + + last_W = np.copy(W) + best_W = np.copy(W) + best_a = np.copy(a) + + ################# Part 1: ################# + for stochastic_search in range(1,-1, -1): + if stochastic_search ==1 : + mu = 1/5 + max_cost_increase = 5 + max_iter_north = 500 + tolerance = 1e-3 + else: + mu = 1/50 + max_cost_increase = 3 + max_iter_north = 200 + tolerance = 1e-5 + + cost_increase_counter = 0 + mu_idx = np.full(mu_c.shape, False) + + min_mu = 1/200 + mu_old = np.copy(mu_c) + rho_n_arr = np.zeros((max_iter_north+1, N)) + + W = np.copy(best_W) + a = np.copy(best_a) + last_W = np.copy(best_W) + Cost = np.zeros((max_iter_north+1, 1)) + min_cost = np.inf + min_cost_queue = min_cost * np.ones((max_iter_north+1, 1)) + negentropy_array = np.zeros((N,1)) + + for iter in range(1, max_iter_north+1): + + if stochastic_search == 1: + # estimate AR coefficients + Y = np.copy(np.dot(W, X) ) + for n in range(N): + + if iter%6 == 1 or iter<= 5: + + a1, min_ere1 = lfc(Y[n,:], p , 'unknown', []) + a2, min_ere2 = lfc(Y[n, :], p, [], a[:, n]) + + # choose the best model + min_ere = np.inf + if min_ere > min_ere1: + min_ere = min_ere1 + a[:, n] = np.copy(a1) + if min_ere > min_ere2: + min_ere = min_ere2 + a[:, n] = np.copy(a2) + + elif iter%6 == 4 : + a3, _ = lfc(Y[n, :], p, [], a[:, n]) + a[:, n ] = np.copy(a3) + + temp5 = sp.signal.lfilter(a[:, n].T, 1, X.T , axis = 0 ) + Rz[ :, :, n] = np.dot(temp5.T, temp5) / T + Z[:, :, n] = np.copy(temp5.T) + + Cost[iter-1] = np.copy(- np.log(np.abs(np.linalg.det(W)))) + + # estimate W + for n in range(N): + temp1 = np.random.rand(N, 1) + temp2 = np.delete(W, n, axis = 0) + h = temp1 - temp2.T.dot( np.linalg.solve( np.dot(temp2, temp2.T), temp2)).dot(temp1 ) + v = np.copy(W[n, :].T ) + sigma2 = v.T.dot(Rz[:, :, n]).dot(v) + Cost[iter-1] = np.copy(Cost[iter-1] + np.log(sigma2)/2 ) + v = np.copy(v / np.sqrt(sigma2)) + + # prediction error + y = np.copy(v.T.dot(Z[:, :, n ])) + + # evaluate the upper bound of negentropy of the n-th component + NE_Bound = np.zeros((K, 1)) + EGx = np.zeros((K, 1)) + # we only need to calculate these quantities once + yy = y* y + sign_y = np.sign(y) + abs_y = np.abs(y) + inv_pabs_y = 1/(1 + abs_y) + inv_pabs_yy = 1/(1+ yy) + inv_p10abs_y = 1/(10+abs_y) + + # G1(x) = x^4 + EGx[0] = np.sum(yy*yy)/T + if EGx[0] < nf1['min_EGx']: + NE_Bound[0] = simplified_ppval(nf1['pp_slope'], nf1['min_EGx'] ) * (EGx[0] - nf1['min_EGx']) + NE_Bound[0] = simplified_ppval(nf1['pp'],nf1['min_EGx']) + np.abs(NE_Bound[0] ) + else: + if EGx[0] > nf1['max_EGx']: + NE_Bound[0] = 0 + else: + NE_Bound[0] = simplified_ppval(nf1['pp'], EGx[0] ) + + # G3(x) = np.abs(x)/ (1 + np.abs(x)) + EGx[2] = 1 - np.sum(inv_pabs_y)/T + if EGx[2] < nf3['min_EGx']: + NE_Bound[2] = simplified_ppval(nf3['pp_slope'], nf3['min_EGx'] ) * (EGx[2] - nf3['min_EGx']) + NE_Bound[2] = simplified_ppval(nf3['pp'], nf3['min_EGx']) + np.abs(NE_Bound[2]) + else: + if EGx[2] > nf3['max_EGx']: + NE_Bound[2] = simplified_ppval(nf3['pp_slope'], nf3['max_EGx'] ) * (EGx[2] - nf3['max_EGx']) + NE_Bound[2] = simplified_ppval(nf3['pp'], nf3['max_EGx']) + np.abs(NE_Bound[2]) + + else: + NE_Bound[2] = simplified_ppval(nf3['pp'], EGx[2] ) + + # G5(x) = x* np.abs(x) /(10 + np.abs(x)) + EGx[4] = np.sum( y * abs_y * inv_p10abs_y )/T + if EGx[4] < nf5['min_EGx']: + NE_Bound[4] = simplified_ppval(nf5['pp_slope'], nf5['min_EGx'] ) * (EGx[4] - nf5['min_EGx']) + NE_Bound[4] = simplified_ppval(nf5['pp'], nf5['min_EGx']) + np.abs(NE_Bound[4]) + else: + if EGx[4] > nf5['max_EGx']: + NE_Bound[4] = simplified_ppval(nf5['pp_slope'], nf5['max_EGx'] ) * (EGx[4] - nf5['max_EGx']) + NE_Bound[4] = simplified_ppval(nf5['pp'], nf5['max_EGx']) + np.abs(NE_Bound[4]) + else: + NE_Bound[4] = simplified_ppval(nf5['pp'], EGx[4] ) + + # G7(x) = x / (1 + x**2) + EGx[6] = np.sum(y*inv_pabs_yy)/T + if EGx[6] < nf7['min_EGx']: + NE_Bound[6] = simplified_ppval(nf7['pp_slope'], nf7['min_EGx'] ) * (EGx[6] - nf7['min_EGx']) + NE_Bound[6] = simplified_ppval(nf7['pp'], nf7['min_EGx']) + np.abs(NE_Bound[6]) + else: + if EGx[6] > nf7['max_EGx']: + NE_Bound[6] = simplified_ppval(nf7['pp_slope'], nf7['max_EGx'] ) * (EGx[6] - nf7['max_EGx']) + NE_Bound[6] = simplified_ppval(nf7['pp'], nf7['max_EGx']) + np.abs(NE_Bound[6]) + else: + NE_Bound[6] = simplified_ppval(nf7['pp'], EGx[6] ) + + # select the tightest upper bound + max_NE, max_i = np.max(NE_Bound), np.argmax(NE_Bound) + negentropy_array[n] = np.copy(max_NE) + Cost[iter -1] = np.copy(Cost[iter-1] - max_NE) + + if n < num_guess: + # choose reference signal + r_n_c = guess_mat[:, n] + # compute correlation + w = np.copy(W[n, :].T ) + y_tilde = np.copy(w.T.dot(X)) + w = w.reshape((-1, 1)) + + e_pair = epsilon(y_tilde.T, r_n_c ) + # abs of similarity measure + dis_wr = np.abs(e_pair) + + # choose rho_n based on update scheme + if mu_idx[n] == 1: + if np.size(rho[rho < dis_wr]) != 0: + rho_n = np.max(rho[rho < dis_wr]) + else: + if np.size(rho[rho > dis_wr]) != 0: + rho_n = np.min(rho[rho > dis_wr]) + + if rho.size == 0: + rho_n = 0.01 + + # store rho + rho_n_arr[iter, n] = np.copy(e_pair) + # update mu + mu_old[n] = np.copy(mu_c[n]) + + mu_idx[n] = mu_idx[n] or (mu_c[n] >= 1) + mu_idx[n] = mu_idx[n] and (mu_c[n] > 0) + mu_c[n] = np.minimum(1, mu_c[n]) + mu_c[n] = np.maximum(0, mu_c[n] + gam * (rho_n - dis_wr)) + + + + if stochastic_search == 1: + weight = np.random.rand(1, T) + else: + weight = np.ones((1, T)) + + if max_i == 0: + EGx[0] = np.maximum(np.minimum(EGx[0], nf1['max_EGx']), nf1['min_EGx']) + grad = h / (np.dot(h.T, v)) + Z[:, :, n].dot((4* weight*y*yy).T) * simplified_ppval(nf1['pp_slope'], EGx[0]) / np.sum(weight) + if max_i == 2: + EGx[2] = np.maximum(np.minimum(EGx[2], nf3['max_EGx']), nf3['min_EGx']) + grad = h / (np.dot(h.T, v)) + Z[:, :, n].dot((weight* sign_y*inv_pabs_y**2).T) * simplified_ppval(nf3['pp_slope'], EGx[2]) / np.sum(weight) + if max_i == 4: + EGx[4] = np.maximum(np.minimum(EGx[4], nf5['max_EGx']), nf5['min_EGx']) + grad = h / (np.dot(h.T, v)) + Z[:, :, n].dot((weight* abs_y*(20+abs_y)*inv_p10abs_y**2).T) * simplified_ppval(nf5['pp_slope'], EGx[4]) / np.sum(weight) + if max_i == 6: + EGx[6] = np.maximum(np.minimum(EGx[6], nf7['max_EGx']), nf7['min_EGx']) + grad = h / (np.dot(h.T, v)) + Z[:, :, n].dot((weight*(1-yy)*inv_pabs_yy**2).T) * simplified_ppval(nf7['pp_slope'], EGx[6]) / np.sum(weight) + + # Constant direction + cnstd = Rz[:, :, n].dot(v) + + if n < num_guess: + constraint_grad = epsilon_grad(r_n_c) + grad = grad + constraint_grad + + + # projected gradient + grad = grad - (cnstd.T.dot(grad) * cnstd /(np.dot(cnstd.T, cnstd))).reshape(-1, 1) + check = inv_sqrtmH(Rz[:, :, n]) + grad = check.dot(grad) + + # Normalized gradient + grad = grad / np.sqrt(grad.T.dot(Rz[:, :, n].dot(grad))) + + v = v.reshape(-1,1) + mu * grad + + + if constraint == 'phase_retrieval' and stochastic_search == 1: + if n < amplitude.shape[1]: + v_tilde = pr_update(amplitude[:, n], y, a[:, n], Z[:, :, n]) + v_tilde = np.reshape(v_tilde, v.shape) + v = 0.8* v + 0.2* v_tilde + + W[n, :] = np.copy(v.T ) + + + Cost[iter] = Cost[iter] - (np.sum(np.power(mu_c, 2 ) - np.power(mu_old, 2)) / (2 * gam )) + + + if Cost[iter-1] < min_cost: + cost_increase_counter = 0 + min_cost = np.copy(Cost[iter-1]) + best_W = np.copy(last_W) + best_a = np.copy(a) + else: + cost_increase_counter = cost_increase_counter + 1 + + min_cost_queue[iter-1] = np.copy(min_cost) + + if cost_increase_counter > max_cost_increase: + if stochastic_search == 1: + W1 = np.copy(W) + last_W1 = np.copy(last_W) + for n in range(N): + W1[n, :] = W1[n, :] / np.linalg.norm(W1[n, :]) + last_W1[n, :] = last_W1[n, :] / np.linalg.norm(last_W1[n, :]) + if 1 - np.min(np.abs(np.diag(np.dot(W1, last_W1.T)))) < tolerance: + break + else: + mu = np.copy(np.maximum((decaying_factor**(iter + 1)) , min_mu)) + W = np.copy(best_W) + last_W = np.copy(best_W) + a = np.copy(best_a) + cost_increase_counter = 0 + continue + else: + W1 = np.copy(W) + last_W1 = np.copy(last_W) + for n in range(N): + W1[n, :] = W1[n, :] / np.linalg.norm(W1[n, :]) + last_W1[n, :] = last_W1[n, :] / np.linalg.norm(last_W1[n, :]) + if 1 - np.min(np.abs(np.diag(np.dot(W1, last_W1.T)))) < tolerance: + break + else: + mu = np.copy(np.maximum((decaying_factor**(iter + 1)) , min_mu)) + W = np.copy(best_W) + last_W = np.copy(best_W) + a = np.copy(best_a) + cost_increase_counter = 0 + continue + + last_W = np.copy(W) + + W = np.copy(best_W) + W = np.dot(W, P) + + return W + + +############################################################################################################### +# These functions are used in the ERBM algorithm. +############################################################################################################### + + +def lfc(x: np.ndarray, p: int , choice, a0) -> tuple[np.ndarray, np.ndarray]: + """Helper function for ERBM ICA: computes the linear filtering coefficients (LFC) with length p for entropy rate estimation, and the estimated entropy rate. + + Args: + x (np.ndarray, (Time Points, 1)): the source estimate [T x 1] + p (int): the filter length for the source model + choice : can be 'sub', 'super' or 'unknown'; any other input is handled as 'unknown' + a0 (np.ndarray or empty list): is the intial guess [p x 1] or an empty list [] + + Returns: + a (np.ndarray, (p, 1)): the filter coefficients [p x 1] + min_cost (np.ndarray, (1, 1)): the entropy rate estimation [1 x 1] + """ + + global nf1, nf3, nf5, nf7, cosmtx, sinmtx, Simpson_c + + tolerance = 1e-4 + T = x.shape[0] + X0 = sp.linalg.convolution_matrix(x, p, 'full').T + # remove tail so outliers have less effect + X = X0[:, : T ] + # remove DC + X = X - np.mean(X, axis = 1).reshape(-1, 1) + # pre-whitening + R = np.dot(X, X.T) / T + D, V = np.linalg.eig(R) + order = np.argsort(D) + d = D[order] + V = V[:, order] + eps = np.finfo(np.float64).eps + d[d < 10 * eps]= 10 * eps + P = np.dot(np.dot(V, np.diag(1/np.sqrt(d))), V.T) + X = np.dot(P, X) + + if np.size(a0) == 0: + # use SEA to provide the initial guess + if choice == 'sub': + # we don't need this case + # TO DO + pass + if choice == 'super': + # TO DO + pass + else: + a = np.random.rand(p,1) + a = a / np.linalg.norm(a) + last_a = np.copy(a) + for iter in range(100): + y = np.dot(a.T, X) + a = X.dot((y**3).T) / T - 3 * a + a = np.copy(a / np.linalg.norm(a)) + if 1 - np.abs(a.T.dot(last_a)) < tolerance: + break + else: + last_a = np.copy(a) + + else: + a = np.linalg.solve(P, a0) + + min_cost = np.inf + K = 8 # number of measuring functions + best_a = np.copy(a) + last_a = np.copy(a) + min_mu = 1/128 + if np.size(a0) == 0: + max_iter = 100 + mu = 4* min_mu + else: + max_iter = 100 + mu = 16* min_mu + cost_increase_counter = 0 + Cost = np.zeros((max_iter, 1)) + + for iter in range(max_iter): + a = np.copy(np.reshape(a, (-1, 1)) ) + a_original = np.copy(P.dot(a)) + b_original, G_original = cnstd_and_gain(a_original) + + a = a.dot(np.exp(- G_original/2)) + b = P.dot(b_original) + y = np.copy(np.dot(a.T, X)) + sigma2 = np.dot(a.T, a) + # normalized y + y = np.copy(y / np.sqrt(sigma2)) + + Cost[iter] = np.copy(0.5 * np.log(2 * np.pi * sigma2) + 0.5) + + NE_Bound = np.zeros((K, 1)) + EGx = np.zeros((K, 1)) + # we only need to calculate these quantities once + yy = y* y + sign_y = np.sign(y) + abs_y = np.abs(y) + inv_pabs_y = 1/(1 + abs_y) + inv_pabs_yy = 1/(1+ yy) + inv_p10abs_y = 1/(10+abs_y) + + # G1(x) = x^4 + EGx[0] = np.sum(yy*yy)/T + if EGx[0] < nf1['min_EGx']: + NE_Bound[0] = simplified_ppval(nf1['pp_slope'], nf1['min_EGx'] ) * (EGx[0] - nf1['min_EGx']) + NE_Bound[0] = simplified_ppval(nf1['pp'],nf1['min_EGx']) + np.abs(NE_Bound[0] ) + else: + if EGx[0] > nf1['max_EGx']: + NE_Bound[0] = 0 + else: + NE_Bound[0] = simplified_ppval(nf1['pp'], EGx[0] ) + + # G3(x) = np.abs(x)/ (1 + np.abs(x)) + EGx[2] = 1 - np.sum(inv_pabs_y)/T + if EGx[2] < nf3['min_EGx']: + NE_Bound[2] = simplified_ppval(nf3['pp_slope'], nf3['min_EGx'] ) * (EGx[2] - nf3['min_EGx']) + NE_Bound[2] = simplified_ppval(nf3['pp'], nf3['min_EGx']) + np.abs(NE_Bound[2]) + else: + if EGx[2] > nf3['max_EGx']: + NE_Bound[2] = simplified_ppval(nf3['pp_slope'], nf3['max_EGx'] ) * (EGx[2] - nf3['max_EGx']) + NE_Bound[2] = simplified_ppval(nf3['pp'], nf3['max_EGx']) + np.abs(NE_Bound[2]) + + else: + NE_Bound[2] = simplified_ppval(nf3['pp'], EGx[2] ) + + # G5(x) = x* np.abs(x) /(10 + np.abs(x)) + EGx[4] = np.sum( y * abs_y * inv_p10abs_y )/T + if EGx[4] < nf5['min_EGx']: + NE_Bound[4] = simplified_ppval(nf5['pp_slope'], nf5['min_EGx'] ) * (EGx[4] - nf5['min_EGx']) + NE_Bound[4] = simplified_ppval(nf5['pp'], nf5['min_EGx']) + np.abs(NE_Bound[4]) + else: + if EGx[4] > nf5['max_EGx']: + NE_Bound[4] = simplified_ppval(nf5['pp_slope'], nf5['max_EGx'] ) * (EGx[4] - nf5['max_EGx']) + NE_Bound[4] = simplified_ppval(nf5['pp'], nf5['max_EGx']) + np.abs(NE_Bound[4]) + else: + NE_Bound[4] = simplified_ppval(nf5['pp'], EGx[4] ) + + # G7(x) = x / (1 + x**2) + EGx[6] = np.sum(y*inv_pabs_yy)/T + if EGx[6] < nf7['min_EGx']: + NE_Bound[6] = simplified_ppval(nf7['pp_slope'], nf7['min_EGx'] ) * (EGx[6] - nf7['min_EGx']) + NE_Bound[6] = simplified_ppval(nf7['pp'], nf7['min_EGx']) + np.abs(NE_Bound[6]) + else: + if EGx[6] > nf7['max_EGx']: + NE_Bound[6] = simplified_ppval(nf7['pp_slope'], nf7['max_EGx'] ) * (EGx[6] - nf7['max_EGx']) + NE_Bound[6] = simplified_ppval(nf7['pp'], nf7['max_EGx']) + np.abs(NE_Bound[6]) + else: + NE_Bound[6] = simplified_ppval(nf7['pp'], EGx[6] ) + + # select the tightest upper bound + max_NE, max_i = np.max(NE_Bound), np.argmax(NE_Bound) + Cost[iter] = np.copy(Cost[iter] - max_NE) + last_a = np.copy(a) + + if Cost[iter] < min_cost: + cost_increase_counter = 0 + min_cost = np.copy(Cost[iter]) + best_a = np.copy(a) + else: + cost_increase_counter = cost_increase_counter + 1 + + if cost_increase_counter > 0: + if mu > min_mu: + mu = mu / 2 + cost_increase_counter = 0 + a = np.copy(best_a) + last_a = np.copy(best_a) + continue + else: + break + + grad = a / sigma2 + if max_i == 0: + EGx[0] = np.maximum(np.minimum(EGx[0], nf1['max_EGx']), nf1['min_EGx']) + grad = grad - X.dot((4*y * yy).T) * simplified_ppval(nf1['pp_slope'], EGx[0]) / T /np.sqrt(sigma2) + grad = grad + np.sum(4* y* yy* y ) * simplified_ppval(nf1['pp_slope'], EGx[0])* a / T / sigma2 + if max_i == 2: + EGx[2] = np.maximum(np.minimum(EGx[2], nf3['max_EGx']), nf3['min_EGx']) + grad = grad - X.dot( sign_y *inv_pabs_y**2) * simplified_ppval(nf3['pp_slope'], EGx[2]) / T / np.sqrt(sigma2) + grad = grad + np.sum(sign_y*inv_pabs_y**2*y) * simplified_ppval(nf3['pp_slope'], EGx[2]) * a / T / sigma2 + if max_i == 4: + EGx[4] = np.maximum(np.minimum(EGx[4], nf5['max_EGx']), nf5['min_EGx']) + grad = grad - X.dot( abs_y*(20+abs_y)*inv_p10abs_y**2) * simplified_ppval(nf5['pp_slope'], EGx[4]) / T / np.sqrt(sigma2) + grad = grad + np.sum( abs_y*(20+abs_y)*inv_p10abs_y**2*y ) * simplified_ppval(nf5['pp_slope'], EGx[4]) * a / T / sigma2 + if max_i == 6: + EGx[6] = np.maximum(np.minimum(EGx[6], nf7['max_EGx']), nf7['min_EGx']) + grad = grad - X.dot( (1-yy)*inv_pabs_yy**2) * simplified_ppval(nf7['pp_slope'], EGx[6]) / T / np.sqrt(sigma2) + grad = grad + np.sum( (1-yy)*inv_pabs_yy**2*y) * simplified_ppval(nf7['pp_slope'], EGx[6]) * a / T / sigma2 + + + grad = grad- np.reshape(np.dot(grad.T, b)*b/(np.dot(b.T, b)) , (1, -1)) + grad = np.sqrt(sigma2) * grad/ np.linalg.norm(grad) + a = np.copy(a - mu * grad) + + a = np.reshape(a ,(-1, 1)) + a = np.copy(best_a) + a = np.dot(P,a) + + return a, min_cost + + +def simplified_ppval(pp: dict, xs: float) -> float: + """Helper function for ERBM ICA: simplified version of ppval. + This function evaluates a piecewise polynomial at a specific point. + + Args: + pp (dict): a dictionary containing the piecewise polynomial representation of a function + xs (float): the evaluation point + + Returns: + v (float): the value of the function at xs + """ + + b = pp['breaks'][0] + c = pp['coefs'] + l_pieces = int(pp['pieces'] ) + k = 4 + # find index + index = float('nan ') + middle_index = float('nan ') + if xs > b[l_pieces-1]: + index = l_pieces-1 + else: + if xs < b[1]: + index = 0 + else : + low_index = 0 + high_index = l_pieces-1 + + while True : + middle_index = int(np.ceil(((0.6* low_index + 0.4* high_index)))) + if xs < b[middle_index]: + high_index = middle_index + else: + low_index = middle_index + if low_index == high_index -1: + index = low_index + break + # now go to local coordinates + xs = xs - b[index] + # nested multiplication + v = c[index, 0] + for i in range(1, k ): + v = v*xs + c[index, i] + return v + +def cnstd_and_gain(a: np.ndarray) -> tuple[np.ndarray, np.ndarray]: + """Helper function for ERBM ICA: returns constraint direction used for calculating projected gradient and gain of filter a. + + Args: + a (np.ndarray, (p, 1)): the filter coefficients [p x 1] + + Returns: + b (np.ndarray, (p, 1)): the constraint direction [p x 1] + G (np.ndarray, (1,)): the gain of the filter a + """ + + global cosmtx, sinmtx, Simpson_c + + + eps = np.finfo(np.float64).eps + p = a.shape[0] + # calculate the integral + # sample omega from 0 to pi + n = 10*p + h = np.pi / n + + # calculate |A(w)|^2 + Awr = np.zeros((1, n+1)) # real part + Awi = np.zeros((1, n+1)) # imaginary part + for q in range(p): + Awr = Awr + a[q] * cosmtx[q, :] + Awi = Awi + a[q] * sinmtx[q, :] + + Aw2 = 10*eps+ Awr**2 + Awi**2 + + # calculate the vector + v = np.zeros((p+1, n+1)) + inv_Aw2 = 1 / Aw2 + for q in range(p): + v[q, :] = cosmtx[q, :] * inv_Aw2 + v[p,:] = np.log(Aw2)/np.pi + + # this is the integral + u = h * v.dot(Simpson_c/3) + b = sp.linalg.toeplitz(u[:p].ravel()).dot(a) + + # gain + G = u[p] + return b, G + + + +def calculate_cos_sin_mtx(p: int) -> None : + """Helper function for ERBM ICA: calculates the cos and sin matrix for integral calculation in ERBM ICA. + + Args: + p (int): the filter length for the invertible filter source model + + Returns: + None + """ + + # prepare the cos and sin matrix for integral calculation + global cosmtx, sinmtx, Simpson_c + + # sample omega from 0 to pi + n = 10*p + h = np.pi / n + omega = np.arange(0, n+1, 1) * h + + cosmtx = np.zeros((p, n+1)) + sinmtx = np.zeros((p, n+1)) + for q in range(p): + cosmtx[q, :] = np.cos(q * omega) + sinmtx[q, :] = np.sin(q * omega) + # c ist the vetcor used in Simpson's rule + Simpson_c = np.zeros((n+1, 1)) + Simpson_c[np.arange(0, n+1, 2)] = 2 + Simpson_c[np.arange(1, n, 2)] = 4 + Simpson_c[0] = 1 + Simpson_c[n] = 1 + + +def pre_processing(X: np.ndarray) -> tuple[np.ndarray, np.ndarray]: + """Helper function for ERBM ICA: Preprocessing (removal of mean, patial pre-whitening, temporal pre-filtering) + + Args: + X (np.ndarray, (Channels, Time Points)): the [N x T] input multivariate time series with dimensionality N observations/channels and T time points + + Returns: + X (np.ndarray, (Channels, Time Points)): the pre-processed input multivariate time series + P (np.ndarray, (Channels, Channels)): the pre-whitening matrix + """ + # pre-processing of the data + N, T = X.shape + # remove mean + X = X - np.mean(X, axis = 1).reshape(N, 1) + # spatio pre-whitening + R = np.dot(X, X.T) / T + P1 = inv_sqrtmH(R) + X = np.dot(P1, X) + # temporal pre-filtering for colored signals + q = 3 + r = np.zeros((q, 1)) + for p in range(q): + r[p] = np.trace(X[:, : T-p].dot(X[:, p: T].T)) / T / N + + af = np.linalg.solve(sp.linalg.toeplitz(r[:q-1].ravel()), np.conjugate(r[1:]) ) + for n in range(N): + X[n, :] = sp.signal.lfilter(np.concatenate((np.ones((1,1)), -af), axis = 0)[:,0], 1 ,X[n, :]) + + # spatio pre-whitening + R = np.dot(X, X.T) / T + P2 = inv_sqrtmH(R) + X = np.dot(P2, X) + P = np.dot(P2, P1) + + return X, P + +def inv_sqrtmH(B: np.ndarray) -> np.ndarray: + """Helper function for ERBM ICA: computes the inverse square root of a matrix. + + Args: + B (np.ndarray): a square matrix + + Returns: + A (np.ndarray): the inverse square root of B + """ + D, V = np.linalg.eig(B) + order = np.argsort(D) + D = D[order] + V = V[:, order] + #print('D', D) + d = 1/np.sqrt(D) + A = np.dot(np.dot(V, np.diag(d)), V.T) + return A + +def auction(assignCost, guard=None): + """ + Performs assignment using Bertsekas' auction algorithm. + + Parameters: + assignCost (ndarray): m x n matrix of costs for associating each row with each column. m >= n. + guard (float, optional): Cost of column non-assignment. All assignments will have cost < guard. + + Returns: + colsol (ndarray): Column assignments, where colsol[j] gives the row assigned to column j. + rowsol (ndarray): Row assignments, where rowsol[i] gives the column assigned to row i. + """ + + m, n = assignCost.shape + + if m < n: + raise ValueError('Cost matrix must have no more columns than rows.') + + # Augment cost matrix with a guard row if specified. + m0 = m + if guard is not None and np.isfinite(guard): + m += 1 + assignCost = np.vstack((assignCost, np.full((1, n), guard))) + + # Initialize return arrays + colsol = np.zeros(n, dtype=int) + rowsol = np.zeros(m, dtype=int) + price = np.zeros(m) + EPS = np.sqrt(np.finfo(float).eps) / (n + 1) + + # 1st step is a full parallel solution. Get bids for all columns + jp = np.arange(n) + f = assignCost.copy() + b1 = np.min(f, axis=0) # cost of the best choice for each column + ip = np.argmin(f, axis=0) # row index of the best choice for each column + f[ip, jp] = np.inf # eliminate the best from contention + + bids = np.min(f, axis=0) - b1 # cost of runner-up choice hence bid + ibid = np.argsort(bids) # Arrange bids so highest are last + + # Now assign best bids (lesser bids are overwritten by better ones). + price[ip[ibid]] += EPS + bids[ibid] + rowsol[ip[ibid]] = jp[ibid] + 1 # +1 to convert to 1-based indexing + iy = np.nonzero(rowsol)[0] + colsol[rowsol[iy] - 1] = iy + 1 # -1 to convert back to 0-based indexing for Python + + # The guard row cannot be assigned (always available) + if m0 < m: + price[m - 1] = 0 + rowsol[m - 1] = 0 + + # Now Continue with non-parallel code handling any contentions. + while not np.all(colsol): + for jp in np.where(colsol == 0)[0]: + f = assignCost[:, jp] + price # costs + b1 = np.min(f) # cost and row of the best choice + ip = np.argmin(f) + if ip >= m0: + colsol[jp] = m + else: + f[ip] = np.inf # eliminate from contention + price[ip] += EPS + np.min(f) - b1 # runner-up choice hence bid + if rowsol[ip]: # take the row away if already assigned + colsol[rowsol[ip] - 1] = 0 + rowsol[ip] = jp + 1 # +1 to convert to 1-based indexing + colsol[jp] = ip + 1 # +1 to convert to 1-based indexing + + # Screen out infeasible assignments + if m > m0: + colsol[colsol == m] = 0 + rowsol = rowsol[:m0] + + return colsol - 1, rowsol - 1 # -1 to convert back to 0-based indexing for Python + + + + \ No newline at end of file diff --git a/src/cedalion/sigdecomp/unimodal/ica_ebm.py b/src/cedalion/sigdecomp/unimodal/ica_ebm.py index 65620c19..80bc19d5 100644 --- a/src/cedalion/sigdecomp/unimodal/ica_ebm.py +++ b/src/cedalion/sigdecomp/unimodal/ica_ebm.py @@ -52,21 +52,13 @@ def ICA_EBM(X: np.ndarray) -> np.ndarray: verbose = False # report the progress if verbose== True - # show the cost values vs. iterations at each stage if show_cost== True - # not implemented yet - show_cost = False - # Load 8 measuring functions. But we only use 4 of them. K = 8 - # table = np.load('measfunc_table.npy', allow_pickle= True) - file_path = cedalion.data.get("measfunc_table.npy") table = np.load(file_path, allow_pickle=True) - nf1, nf2, nf3, nf4, nf5, nf6, nf7, nf8 = ( - table[0], table[1], table[2], table[3], table[4], table[5], table[6], table[7] - ) + nf1, nf3, nf5, nf7 = table[0], table[2], table[4], table[6] N = X.shape[0] @@ -189,7 +181,6 @@ def ICA_EBM(X: np.ndarray) -> np.ndarray: min_mu = 1/50 cost_increaser_counter = 0 fastica_on = True - error = 0 max_negentropy = np.zeros((N, 1)) negentropy_array = np.zeros((N, 1)) for iter in range(max_iter_orth): @@ -268,7 +259,7 @@ def ICA_EBM(X: np.ndarray) -> np.ndarray: # Perform orthogonal ICA if max_i == 0: # G1(x) = x^4 - if fastica_on == True : + if fastica_on : grad = X.dot( (4* y* yy).T )/T Edgx = 12 else : @@ -276,7 +267,7 @@ def ICA_EBM(X: np.ndarray) -> np.ndarray: vEGx = 2 * (EGx[0] > nf1['critical_point']) -1 elif max_i == 2: # G3(x) = np.abs(x)/ (1 + np.abs(x)) - if fastica_on == True : + if fastica_on : grad = X.dot( (sign_y * inv_pabs_y * inv_pabs_y).T )/T Edgx = np.sum(-2 * inv_pabs_y * inv_pabs_y * inv_pabs_y)/T else : @@ -284,7 +275,7 @@ def ICA_EBM(X: np.ndarray) -> np.ndarray: vEGx = 2 * (EGx[2] > nf3['critical_point']) -1 elif max_i == 4: # G5(x) = x* np.abs(x) /(10 + np.abs(x)) - if fastica_on == True : + if fastica_on : grad = X.dot((abs_y *(20 + abs_y) * inv_p10abs_y * inv_p10abs_y).T )/T Edgx = np.sum(200 * sign_y * inv_p10abs_y * inv_p10abs_y * inv_p10abs_y)/T else : @@ -292,13 +283,13 @@ def ICA_EBM(X: np.ndarray) -> np.ndarray: vEGx = 2 * (EGx[4] > nf5['critical_point']) -1 elif max_i == 6: # G7(x) = x / (1 + x**2) - if fastica_on == True : + if fastica_on : grad = X.dot(((1 - yy)* inv_pabs_yy**2).T )/T Edgx = np.sum(2 * y * (yy-3)* inv_pabs_yy* inv_pabs_yy* inv_pabs_yy)/T else : grad = X.dot((weight * (1 - yy) * inv_pabs_yy**2 ).T ) / np.sum(weight) vEGx = 2 * (EGx[6] > nf7['critical_point']) -1 - if fastica_on == True : + if fastica_on : w1 = grad - Edgx * w else : grad = vEGx * grad @@ -321,12 +312,11 @@ def ICA_EBM(X: np.ndarray) -> np.ndarray: min_cost_queue[iter] = np.copy(min_cost) - if fastica_on == True : + if fastica_on : if cost_increaser_counter >= max_cost_increase_number or 1- np.min(np.abs(np.diag(W.dot(last_W.T)))) < tolerance: cost_increaser_counter = 0 W = np.copy(best_W ) last_W = np.copy(W) - iter_fastica = np.copy(iter) fastica_on = False continue else : @@ -348,7 +338,7 @@ def ICA_EBM(X: np.ndarray) -> np.ndarray: ############################################################################################################## # Part 2: check for saddle points ############################################################################################################## - if saddle_test_enable == True : + if saddle_test_enable : if verbose: print('Saddle point detection.') SADDLE_TESTED = False @@ -507,7 +497,7 @@ def ICA_EBM(X: np.ndarray) -> np.ndarray: SADDLE_TESTED = False - if SADDLE_TESTED == True : + if SADDLE_TESTED : ############################################################################################################## # Part 3: if saddle points are detected, refine orthogonal ICA # fix step size gradient search @@ -522,7 +512,6 @@ def ICA_EBM(X: np.ndarray) -> np.ndarray: mu = 1/ 50 cost_increaser_counter = 0 fastica_on = True - error = 0 for iter in range(max_iter_orth_refine): for n in range(N): @@ -595,7 +584,7 @@ def ICA_EBM(X: np.ndarray) -> np.ndarray: # Perform orthogonal ICA if max_i == 0: # G1(x) = x^4 - if fastica_on == True : + if fastica_on : grad = X.dot( (4* y* yy).T )/T Edgx = 12 else : @@ -603,7 +592,7 @@ def ICA_EBM(X: np.ndarray) -> np.ndarray: vEGx = 2 * (EGx[0] > nf1['critical_point']) -1 elif max_i == 2: # G3(x) = np.abs(x)/ (1 + np.abs(x)) - if fastica_on == True : + if fastica_on : grad = X.dot( (sign_y * inv_pabs_y * inv_pabs_y).T )/T Edgx = np.sum(-2 * inv_pabs_y * inv_pabs_y * inv_pabs_y)/T else : @@ -611,7 +600,7 @@ def ICA_EBM(X: np.ndarray) -> np.ndarray: vEGx = 2 * (EGx[2] > nf3['critical_point']) -1 elif max_i == 4: # G5(x) = x* np.abs(x) /(10 + np.abs(x)) - if fastica_on == True : + if fastica_on : grad = X.dot((abs_y *(20 + abs_y) * inv_p10abs_y * inv_p10abs_y).T )/T Edgx = np.sum(200 * sign_y * inv_p10abs_y * inv_p10abs_y * inv_p10abs_y)/T else : @@ -619,13 +608,13 @@ def ICA_EBM(X: np.ndarray) -> np.ndarray: vEGx = 2 * (EGx[4] > nf5['critical_point']) -1 elif max_i == 6: # G7(x) = x / (1 + x**2) - if fastica_on == True : + if fastica_on : grad = X.dot(((1 - yy)* inv_pabs_yy**2).T )/T Edgx = np.sum(2 * y * (yy-3)* inv_pabs_yy* inv_pabs_yy* inv_pabs_yy)/T else : grad = X.dot((weight * (1 - yy) * inv_pabs_yy**2 ).T ) / np.sum(weight) vEGx = 2 * (EGx[6] > nf7['critical_point']) -1 - if fastica_on == True : + if fastica_on : w1 = grad - Edgx * w else : grad = vEGx * grad @@ -649,12 +638,11 @@ def ICA_EBM(X: np.ndarray) -> np.ndarray: min_cost_queue[iter] = np.copy(min_cost) - if fastica_on == True : + if fastica_on : if cost_increaser_counter >= max_cost_increase_number or 1- np.min(np.abs(np.diag(W.dot(last_W.T)))) < tolerance: cost_increaser_counter = 0 W = np.copy(best_W ) last_W = np.copy(W) - iter_fastica = iter fastica_on = False continue else : @@ -678,7 +666,6 @@ def ICA_EBM(X: np.ndarray) -> np.ndarray: best_W = np.copy(W) Cost = np.zeros((max_iter_nonorth, 1)) min_cost_queue = min_cost * np.ones((max_iter_nonorth, 1)) - error = np.inf mu = 1 / 25 min_mu = 1/ 200 max_cost_increase_number = 3 @@ -848,7 +835,6 @@ def ICA_EBM(X: np.ndarray) -> np.ndarray: W = best_W W = W.dot(P) - # if show cost: to do later return W @@ -871,20 +857,19 @@ def simplified_ppval(pp: dict, xs: float) -> float: """ b = pp['breaks'][0] c = pp['coefs'] - l = int(pp['pieces'] ) + l_pieces = int(pp['pieces'] ) k = 4 - dd = 1 # find index index = float('nan ') middle_index = float('nan ') - if xs > b[l-1]: - index = l-1 + if xs > b[l_pieces-1]: + index = l_pieces-1 else: if xs < b[1]: index = 0 else : low_index = 0 - high_index = l-1 + high_index = l_pieces-1 while True : middle_index = int(np.ceil(((0.6* low_index + 0.4* high_index)))) @@ -933,7 +918,6 @@ def pre_processing(X: np.ndarray) -> tuple[np.ndarray, np.ndarray]: """ # pre-processing program - N = X.shape[0] T = X.shape[1] # remove DC Xmean = np.mean(X, axis=1) diff --git a/src/cedalion/sigdecomp/unimodal/ica_erbm.py b/src/cedalion/sigdecomp/unimodal/ica_erbm.py index 857070ca..a283eefc 100644 --- a/src/cedalion/sigdecomp/unimodal/ica_erbm.py +++ b/src/cedalion/sigdecomp/unimodal/ica_erbm.py @@ -39,24 +39,20 @@ def ICA_ERBM(X: np.ndarray, p: int = None ) -> np.ndarray: ################# Part 0: pre-processing ################# # load measuring functions as global variables - global nf1, nf2, nf3, nf4, nf5, nf6, nf7, nf8 - # table = np.load('measfunc_table.npy', allow_pickle= True) + global nf1, nf3, nf5, nf7 file_path = cedalion.data.get("measfunc_table.npy") table = np.load(file_path, allow_pickle=True) K = 8 - nf1, nf2, nf3, nf4, nf5, nf6, nf7, nf8 = ( - table[0], table[1], table[2], table[3], table[4], table[5], table[6], table[7] - ) + nf1, nf3, nf5, nf7 = table[0], table[2], table[4], table[6] - show_cost = False N, T = X.shape X, P = pre_processing(X) # initialize p if it is not provided if p is None: - p = int(np.min(11, T/ 50)) + p = int(np.minimum(11, T/ 50)) # initialize W W = ica_ebm.ICA_EBM(X) @@ -248,7 +244,7 @@ def ICA_ERBM(X: np.ndarray, p: int = None ) -> np.ndarray: min_cost = np.copy(Cost[iter-1]) best_W = np.copy(last_W) best_a = np.copy(a) - max_negentropy = np.copy(negentropy_array) + #max_negentropy = np.copy(negentropy_array) else: cost_increase_counter = cost_increase_counter + 1 @@ -318,7 +314,7 @@ def lfc(x: np.ndarray, p: int , choice, a0) -> tuple[np.ndarray, np.ndarray]: min_cost (np.ndarray, (1, 1)): the entropy rate estimation [1 x 1] """ - global nf1, nf2, nf3, nf4, nf5, nf6, nf7, nf8 + global nf1, nf3, nf5, nf7 tolerance = 1e-4 T = x.shape[0] X0 = sp.linalg.convolution_matrix(x, p, 'full').T @@ -516,20 +512,20 @@ def simplified_ppval(pp: dict, xs: float) -> float: b = pp['breaks'][0] c = pp['coefs'] - l = int(pp['pieces'] ) + l_pieces = int(pp['pieces'] ) k = 4 - dd = 1 + #dd = 1 # find index index = float('nan ') middle_index = float('nan ') - if xs > b[l-1]: - index = l-1 + if xs > b[l_pieces-1]: + index = l_pieces-1 else: if xs < b[1]: index = 0 else : low_index = 0 - high_index = l-1 + high_index = l_pieces-1 while True : middle_index = int(np.ceil(((0.6* low_index + 0.4* high_index)))) @@ -568,7 +564,7 @@ def cnstd_and_gain(a: np.ndarray) -> tuple[np.ndarray, np.ndarray]: # sample omega from 0 to pi n = 10*p h = np.pi / n - w = np.arange(0, n+1, 1) * h + #w = np.arange(0, n+1, 1) * h # calculate |A(w)|^2 Awr = np.zeros((1, n+1)) # real part