Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 89 additions & 1 deletion aopy/analysis/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import numpy as np
from matplotlib import pyplot as plt
import copy

from sklearn.decomposition import PCA, FactorAnalysis
from sklearn.cluster import KMeans
Expand Down Expand Up @@ -1734,4 +1735,91 @@ def calc_confidence_interval_overlap(CI1, CI2):
# Calculate overlap ratio
overlap = (overlap_width / min(width1, width2))

return overlap
return overlap


def calc_max_coh_and_lags(lags, lfp_array1, lfp_array2, frequency_range):
"""
Calculate the maximum coherence and corresponding lags between pairs of channels
in two local field potential (LFP) arrays within a specified frequency range.

Parameters:
- lags: List of time lags (in seconds) to apply to the channels in lfp_array1.
- lfp_array1: First LFP array (e.g., motor cortex data) with shape (n_channels, n_timepoints).
- lfp_array2: Second LFP array (e.g., pre-motor cortex data) with shape (n_channels, n_timepoints).
Comment on lines +1748 to +1749
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lab convention is to put the time axis first (even if this isn't necessarily the optimal way)

- frequency_range: Frequency range of interest (e.g., [0, 4] Hz for delta band).

Returns:
- max_coherence_matrix: Matrix of maximum coherence values for each channel pair.
- lag_of_max_coherence_matrix: Matrix of corresponding lags (in ms) for max coherence values.
Comment on lines +1746 to +1754
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make the docstring google format or else the readthedocs won't work. (https://sphinxcontrib-napoleon.readthedocs.io/en/latest/example_google.html)

"""
# Taper parameters
NW, BW, step, fk, fs = 0.5, 4, 0.005, 200, 2500
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make these input arguments to the function?

n, half_bw, num_tapers = precondition.convert_taper_parameters(NW, BW)
print(f"Using {num_tapers} tapers, taper length {n}, half-bandwidth {half_bw}")

# Get channel counts
num_channels_lfp1 = lfp_array1.shape[0]
num_channels_lfp2 = lfp_array2.shape[0]

# Initialize matrices for max coherence values and their corresponding lags
max_coherence_matrix = np.zeros((num_channels_lfp1, num_channels_lfp2))
lag_of_max_coherence_matrix = np.zeros((num_channels_lfp1, num_channels_lfp2))

Comment on lines +1766 to +1768
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you initialize these arrays as np.nan values to make it obvious if the answer is 0 or if the value just didn't get filled in? You can just add "*np.nan" to the end of each of these lines.

def apply_lag(signal, lag, fs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rename this function to something more specific? Something like "circle_shift_by_time"

"""
Shift a signal by a specified lag in seconds, converting to samples.
Positive lag shifts forward, negative lag shifts backward.
"""
Comment on lines +1770 to +1773
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add information about the function args and what it returns to the docstring.

lag_samples = int(lag * fs)
return np.roll(signal, lag_samples)

def compute_coherence_for_pair(channel_lfp1, channel_lfp2):
"""
Compute coherence between a specific pair of channels for all specified lags.
Returns coherence values for each lag at time zero.
"""
Comment on lines +1778 to +1781
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the docstring as above? Also can you pull this function out of the wrapper function so that it can easily be used alone?

Copy link
Collaborator

@rcanfield2 rcanfield2 Nov 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then each input value to the array can be a 1D array, which should also simplify the function.

coherence_at_zero_lag = np.empty(len(lags))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to before, please initialize this as np.nan values. Initializing as an empty array will do weird things.


for idx, lag in enumerate(lags):
# Process signal from lfp_array1 and normalize
signal1 = lfp_array1[channel_lfp1, :]
signal1 = copy.deepcopy((signal1 - np.mean(signal1)) / np.std(signal1))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zscoring should be done outside of the function to make it more flexible.

signal1 = apply_lag(signal1, lag, fs)

# Process signal from lfp_array2 and normalize
signal2 = lfp_array2[channel_lfp2, :]
signal2 = copy.deepcopy((signal2 - np.mean(signal2)) / np.std(signal2))

# Stack signals for coherence computation
data = np.vstack((signal1, signal2))
frequencies, timepoints, coherence = calc_mt_tfcoh(
data.T, [0, 1], n, half_bw, num_tapers, fs, step, fk=fk, pad=2, ref=False, imaginary=False
)

# Find the time index closest to zero
zero_time_idx = np.argmin(np.abs(timepoints))

# Average coherence within the specified frequency range
freq_indices = np.where((frequencies > frequency_range[0]) & (frequencies < frequency_range[1]))[0]
mean_coherence = np.mean(coherence[freq_indices, :], axis=0)
coherence_at_zero_lag[idx] = mean_coherence[zero_time_idx]

return coherence_at_zero_lag

# Calculate max coherence and lags for each channel pair
for ch_lfp1 in range(num_channels_lfp1):
for ch_lfp2 in range(num_channels_lfp2):
print(f"Processing LFP1 channel {ch_lfp1}, LFP2 channel {ch_lfp2}")
delta_coherence = compute_coherence_for_pair(ch_lfp1, ch_lfp2)

# Find maximum coherence and corresponding lag
max_coherence = np.max(delta_coherence)
max_lag_index = np.argmax(delta_coherence)
max_lag = lags[max_lag_index] * 1000 # Convert lag to milliseconds

# Store max coherence and lag in matrices
max_coherence_matrix[ch_lfp1, ch_lfp2] = max_coherence
lag_of_max_coherence_matrix[ch_lfp1, ch_lfp2] = max_lag

return max_coherence_matrix, lag_of_max_coherence_matrix
Comment on lines +1810 to +1825
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part can be a separate wrapper function.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And can you add an optional argument called "verbose". Then, only run print statements if verbose=True

87 changes: 86 additions & 1 deletion aopy/preproc/neuropixel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import os
import glob
from ibldsp.voltage import destripe_lfp
from .. import preproc
from .. import data
import pickle

def sync_neuropixel_ecube(raw_timestamp,on_times_np,off_times_np,on_times_ecube,off_times_ecube,inter_barcode_interval=20,bar_duration=0.02,verbose=False):
'''
Expand Down Expand Up @@ -275,4 +278,86 @@ def destripe_lfp_batch(lfp_data, save_path, sample_rate, bit_volts, max_memory_g
for ibatch in range(Nbatches):
tmp = destripe_lfp((lfp_data[ibatch*batch_size:(ibatch+1)*batch_size, :]*bit_volts).T/1e6, sample_rate)*1e6
lfp_destriped[ibatch*batch_size:(ibatch+1)*batch_size, :] = (tmp/bit_volts).T.astype(dtype)
lfp_destriped.flush()
lfp_destriped.flush()


def make_average_neuropixel_array(te_id, date, subject, aligning_index, tbefore, tafter):
"""
Comment on lines +284 to +285
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for including this function but Tomo and I are in the middle of restructuring how LFP preprocessing is done. This will change how data is loaded and stuff so it isn't necessary to include this function in github quite yet.

Generates an averaged neuropixel LFP array around specified alignment events.

Parameters:
te_id (int): Identifier for the experimental trial.
date (str): Date of the experiment.
subject (str): Subject ID.
aligning_index (int): Index of the event to align on.
index 0 = cursor enter center target

index 1 = peripheral target on

index 2 = Go-Cue

index 3 = cursor enter peripheral target

index 4 = reward

index 5 = trial end
tbefore (int): Time before the alignment event in seconds.
tafter (int): Time after the alignment event in seconds.

Returns:
average_array (np.array): Averaged LFP array across trials.
stacked_arrays_3d (np.array): 3D array with trial-wise, aligned LFP data.
"""

# Define paths and file names
data_path_preproc = "/data/preprocessed/"
type = "lfp"
filename_opto = f"preproc_{date}_{subject}_{te_id}_{type}.hdf"

# Load LFP data
with open(f"/media/moor-data/postprocessed/beignet/neuropixel_lfp_destriped/lfp_destriped_{te_id}", 'rb') as file:
lfp_data_destriped = pickle.load(file)
lfp_data = data.load_hdf_group(os.path.join(data_path_preproc, subject), filename_opto, f"{type}")

# Load behavior times and filter for rewarded trials
subject, te_id, date = [subject], [te_id], [date]
times = data.bmi3d.tabulate_behavior_data_center_out(data_path_preproc, subject, te_id, date)
rewarded_times = times[times["reward"] == True].reset_index()

# Extract and transpose LFP data
lfp = np.array(lfp_data_destriped[0]).T

# Align times based on selected index and compute time window
selected_event_times = [events[aligning_index] for events in rewarded_times["event_times"]]
total_time = tbefore + tafter

# Get trial-aligned indices for LFP data
align_times, trial_indices = preproc.base.trial_align_times(lfp_data['sync_timestamp'], selected_event_times, tbefore, tafter)

# Filter out trials with mismatched lengths
valid_trial_indices = [indices for indices in trial_indices if len(indices) == (2500 * total_time)]

# Initialize 3D array to hold aligned trial data (time x channels x trials)
num_trials = len(valid_trial_indices)
num_channels = 384
num_samples = 2500 * total_time
stacked_arrays_3d = np.full((num_samples, num_channels, num_trials), np.nan)

# Process each trial and subtract baseline from each channel
for j, indices in enumerate(valid_trial_indices):
trial_data = lfp[indices]
adjusted_channels = []

for ch in range(num_channels):
channel_data = trial_data[:, ch]
baseline = np.mean(channel_data[:1000]) # Baseline is the mean of the first 1000 samples
adjusted_channel = channel_data - baseline
adjusted_channels.append(adjusted_channel)

# Stack adjusted channel data into the 3D array
stacked_arrays_3d[:, :, j] = np.array(adjusted_channels).T

# Calculate the average across trials
average_array = np.nanmean(stacked_arrays_3d, axis=2)

return average_array, stacked_arrays_3d
44 changes: 44 additions & 0 deletions tests/test_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import os
import matplotlib.pyplot as plt
from scipy import signal
from unittest.mock import MagicMock

test_dir = os.path.dirname(__file__)
data_dir = os.path.join(test_dir, 'data')
Expand Down Expand Up @@ -1905,6 +1906,49 @@ def test_calc_confidence_interval_overlap(self):
overlap = aopy.analysis.calc_confidence_interval_overlap(CI1,CI2)
self.assertEqual(overlap,(20-17)/(20-10))


class TestCalcMaxCohAndLags(unittest.TestCase):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make the name of the test class "test_{function_name_to_be_tested}"? This way it is easy to identify which set of tests belongs to which function.

def setUp(self):
# Define the parameters for the test
self.lags = [-0.1, 0, 0.1] # Lags in seconds
self.frequency_range = [0, 4] # Frequency range (0 to 4 Hz)

# Simulate some LFP data with random values
self.num_channels = 3
self.num_timepoints = 5000
self.lfp_array1 = np.random.randn(self.num_channels, self.num_timepoints)
self.lfp_array2 = np.random.randn(self.num_channels, self.num_timepoints)

# Mock taper parameters and the coherence calculation function
global precondition
precondition = MagicMock()
precondition.convert_taper_parameters = MagicMock(return_value=(256, 2, 3)) # Mock return values for tapers
Comment on lines +1922 to +1925
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just import the actual function from aopy.precondition.


global calc_mt_tfcoh
calc_mt_tfcoh = MagicMock(return_value=(
np.linspace(0, 200, 100), # Mock frequency array (0-200 Hz)
np.linspace(-0.5, 0.5, 50), # Mock time array (centered around 0)
np.random.rand(100, 50) # Mock coherence array (freq x time)
))
Comment on lines +1927 to +1932
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to before, I'm also not sure why importing this is necessary.


def test_calc_max_coh_and_lags(self):
# Run the function
max_coherence_matrix, lag_of_max_coherence_matrix = aopy.analysis.calc_max_coh_and_lags(
self.lags, self.lfp_array1, self.lfp_array2, self.frequency_range
)

# Check the shapes of the output matrices
self.assertEqual(max_coherence_matrix.shape, (self.num_channels, self.num_channels))
self.assertEqual(lag_of_max_coherence_matrix.shape, (self.num_channels, self.num_channels))

# Verify the values fall within expected ranges
self.assertTrue(np.all(max_coherence_matrix >= 0) and np.all(max_coherence_matrix <= 1),
"Coherence values should be between 0 and 1")

# Check if lags are converted to milliseconds in lag_of_max_coherence_matrix
self.assertTrue(np.all(np.isin(lag_of_max_coherence_matrix, np.array(self.lags) * 1000)),
"Lags should be in milliseconds and match the provided lags")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are good for testing that the structure of the outputs are as expected, but it doesn't require the actual values to be as expected. After pulling out the single lagged coherence function, the test will be easier to write. You can create fake signals and then circle shift them before inputting them into your function. Then you can test is the max lag is the same as the circle shift. And if it is the same signal, it should be very high. I would start something with a single hump and zeros elsewhere, but play around with it. Let me know if you have questions.

if __name__ == "__main__":

unittest.main()
Expand Down
63 changes: 63 additions & 0 deletions tests/test_preproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import numpy as np
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
import pickle
import io


test_dir = os.path.dirname(__file__)
Expand Down Expand Up @@ -1754,5 +1757,65 @@ def test_calibrate_sensor(self):
powers = preproc.laser.calibrate_sensor(voltage, 12.)
np.testing.assert_allclose(powers, [8.57, 5.07, 10.32], atol=1e-2)


class TestMakeAverageNeuropixelArray(unittest.TestCase):

@patch("builtins.open", new_callable=MagicMock)
@patch("aopy.data.load_hdf_group")
@patch("aopy.data.bmi3d.tabulate_behavior_data_center_out")
@patch("aopy.preproc.base.trial_align_times")
def test_make_average_neuropixel_array(self, mock_trial_align_times, mock_tabulate_behavior_data, mock_load_hdf_group, mock_open):
# Define mock inputs
te_id = 123
date = "2023-01-01"
subject = "subject_1"
aligning_index = 2 # Example aligning index for "Go-Cue"
tbefore = 2 # 2 seconds before
tafter = 2 # 2 seconds after

# Set up mock LFP data and behavior times
mock_lfp_data_destriped = [np.random.randn(384, 10000)]

# Simulate binary file content for pickle
mock_file = io.BytesIO(pickle.dumps(mock_lfp_data_destriped))
mock_open.return_value.__enter__.return_value = mock_file

mock_lfp_data = {"sync_timestamp": np.linspace(0, 10000, 10000)}
mock_load_hdf_group.return_value = mock_lfp_data

# Mock behavior times, with 'reward' and event times for alignment
mock_behavior_data = pd.DataFrame({
"reward": [True, False, True], # Only reward trials are used
"event_times": [
[0.5, 1.0, 2.0, 3.0, 4.0, 5.0],
[1.5, 2.0, 3.0, 4.0, 5.0, 6.0],
[2.5, 3.0, 4.0, 5.0, 6.0, 7.0]
]
})

# Mock trial alignment indices based on sync timestamps and event times
mock_trial_align_times.return_value = (
[1.5, 2.5, 3.5], # Aligned event times
[np.arange(2500 * 4), np.arange(2500 * 4), np.arange(2500 * 4)]
)

# Call the function with mock data
average_array, stacked_arrays_3d = make_average_neuropixel_array(
te_id, date, subject, aligning_index, tbefore, tafter
)

# Assert output shapes
expected_num_samples = 2500 * (tbefore + tafter)
self.assertEqual(average_array.shape, (expected_num_samples, 384), "Average array shape is incorrect")
self.assertEqual(stacked_arrays_3d.shape, (expected_num_samples, 384, 3), "Stacked arrays 3D shape is incorrect")

# Check for baseline adjustment by asserting that the mean of the baseline window is near zero for each channel
baseline_means = np.mean(stacked_arrays_3d[:1000, :, :], axis=0) # Baseline is first 1000 samples
self.assertTrue(np.allclose(baseline_means, 0, atol=1e-1), "Baseline not adjusted correctly across channels")

# Check if NaN mean handling is correct (should have no NaNs in the final average)
self.assertFalse(np.isnan(average_array).any(), "average_array should not contain NaNs")


if __name__ == "__main__":
unittest.main()