diff --git a/aopy/analysis/base.py b/aopy/analysis/base.py index 70393662..8039a013 100644 --- a/aopy/analysis/base.py +++ b/aopy/analysis/base.py @@ -3,6 +3,7 @@ import numpy as np from matplotlib import pyplot as plt +import copy from sklearn.decomposition import PCA, FactorAnalysis from sklearn.cluster import KMeans @@ -1734,4 +1735,91 @@ def calc_confidence_interval_overlap(CI1, CI2): # Calculate overlap ratio overlap = (overlap_width / min(width1, width2)) - return overlap \ No newline at end of file + return overlap + + +def calc_max_coh_and_lags(lags, lfp_array1, lfp_array2, frequency_range): + """ + Calculate the maximum coherence and corresponding lags between pairs of channels + in two local field potential (LFP) arrays within a specified frequency range. + + Parameters: + - lags: List of time lags (in seconds) to apply to the channels in lfp_array1. + - lfp_array1: First LFP array (e.g., motor cortex data) with shape (n_channels, n_timepoints). + - lfp_array2: Second LFP array (e.g., pre-motor cortex data) with shape (n_channels, n_timepoints). + - frequency_range: Frequency range of interest (e.g., [0, 4] Hz for delta band). + + Returns: + - max_coherence_matrix: Matrix of maximum coherence values for each channel pair. + - lag_of_max_coherence_matrix: Matrix of corresponding lags (in ms) for max coherence values. + """ + # Taper parameters + NW, BW, step, fk, fs = 0.5, 4, 0.005, 200, 2500 + n, half_bw, num_tapers = precondition.convert_taper_parameters(NW, BW) + print(f"Using {num_tapers} tapers, taper length {n}, half-bandwidth {half_bw}") + + # Get channel counts + num_channels_lfp1 = lfp_array1.shape[0] + num_channels_lfp2 = lfp_array2.shape[0] + + # Initialize matrices for max coherence values and their corresponding lags + max_coherence_matrix = np.zeros((num_channels_lfp1, num_channels_lfp2)) + lag_of_max_coherence_matrix = np.zeros((num_channels_lfp1, num_channels_lfp2)) + + def apply_lag(signal, lag, fs): + """ + Shift a signal by a specified lag in seconds, converting to samples. + Positive lag shifts forward, negative lag shifts backward. + """ + lag_samples = int(lag * fs) + return np.roll(signal, lag_samples) + + def compute_coherence_for_pair(channel_lfp1, channel_lfp2): + """ + Compute coherence between a specific pair of channels for all specified lags. + Returns coherence values for each lag at time zero. + """ + coherence_at_zero_lag = np.empty(len(lags)) + + for idx, lag in enumerate(lags): + # Process signal from lfp_array1 and normalize + signal1 = lfp_array1[channel_lfp1, :] + signal1 = copy.deepcopy((signal1 - np.mean(signal1)) / np.std(signal1)) + signal1 = apply_lag(signal1, lag, fs) + + # Process signal from lfp_array2 and normalize + signal2 = lfp_array2[channel_lfp2, :] + signal2 = copy.deepcopy((signal2 - np.mean(signal2)) / np.std(signal2)) + + # Stack signals for coherence computation + data = np.vstack((signal1, signal2)) + frequencies, timepoints, coherence = calc_mt_tfcoh( + data.T, [0, 1], n, half_bw, num_tapers, fs, step, fk=fk, pad=2, ref=False, imaginary=False + ) + + # Find the time index closest to zero + zero_time_idx = np.argmin(np.abs(timepoints)) + + # Average coherence within the specified frequency range + freq_indices = np.where((frequencies > frequency_range[0]) & (frequencies < frequency_range[1]))[0] + mean_coherence = np.mean(coherence[freq_indices, :], axis=0) + coherence_at_zero_lag[idx] = mean_coherence[zero_time_idx] + + return coherence_at_zero_lag + + # Calculate max coherence and lags for each channel pair + for ch_lfp1 in range(num_channels_lfp1): + for ch_lfp2 in range(num_channels_lfp2): + print(f"Processing LFP1 channel {ch_lfp1}, LFP2 channel {ch_lfp2}") + delta_coherence = compute_coherence_for_pair(ch_lfp1, ch_lfp2) + + # Find maximum coherence and corresponding lag + max_coherence = np.max(delta_coherence) + max_lag_index = np.argmax(delta_coherence) + max_lag = lags[max_lag_index] * 1000 # Convert lag to milliseconds + + # Store max coherence and lag in matrices + max_coherence_matrix[ch_lfp1, ch_lfp2] = max_coherence + lag_of_max_coherence_matrix[ch_lfp1, ch_lfp2] = max_lag + + return max_coherence_matrix, lag_of_max_coherence_matrix diff --git a/aopy/preproc/neuropixel.py b/aopy/preproc/neuropixel.py index 31c852e1..3f4c4fd1 100644 --- a/aopy/preproc/neuropixel.py +++ b/aopy/preproc/neuropixel.py @@ -4,6 +4,9 @@ import os import glob from ibldsp.voltage import destripe_lfp +from .. import preproc +from .. import data +import pickle def sync_neuropixel_ecube(raw_timestamp,on_times_np,off_times_np,on_times_ecube,off_times_ecube,inter_barcode_interval=20,bar_duration=0.02,verbose=False): ''' @@ -275,4 +278,86 @@ def destripe_lfp_batch(lfp_data, save_path, sample_rate, bit_volts, max_memory_g for ibatch in range(Nbatches): tmp = destripe_lfp((lfp_data[ibatch*batch_size:(ibatch+1)*batch_size, :]*bit_volts).T/1e6, sample_rate)*1e6 lfp_destriped[ibatch*batch_size:(ibatch+1)*batch_size, :] = (tmp/bit_volts).T.astype(dtype) - lfp_destriped.flush() \ No newline at end of file + lfp_destriped.flush() + + +def make_average_neuropixel_array(te_id, date, subject, aligning_index, tbefore, tafter): + """ + Generates an averaged neuropixel LFP array around specified alignment events. + + Parameters: + te_id (int): Identifier for the experimental trial. + date (str): Date of the experiment. + subject (str): Subject ID. + aligning_index (int): Index of the event to align on. + index 0 = cursor enter center target + + index 1 = peripheral target on + + index 2 = Go-Cue + + index 3 = cursor enter peripheral target + + index 4 = reward + + index 5 = trial end + tbefore (int): Time before the alignment event in seconds. + tafter (int): Time after the alignment event in seconds. + + Returns: + average_array (np.array): Averaged LFP array across trials. + stacked_arrays_3d (np.array): 3D array with trial-wise, aligned LFP data. + """ + + # Define paths and file names + data_path_preproc = "/data/preprocessed/" + type = "lfp" + filename_opto = f"preproc_{date}_{subject}_{te_id}_{type}.hdf" + + # Load LFP data + with open(f"/media/moor-data/postprocessed/beignet/neuropixel_lfp_destriped/lfp_destriped_{te_id}", 'rb') as file: + lfp_data_destriped = pickle.load(file) + lfp_data = data.load_hdf_group(os.path.join(data_path_preproc, subject), filename_opto, f"{type}") + + # Load behavior times and filter for rewarded trials + subject, te_id, date = [subject], [te_id], [date] + times = data.bmi3d.tabulate_behavior_data_center_out(data_path_preproc, subject, te_id, date) + rewarded_times = times[times["reward"] == True].reset_index() + + # Extract and transpose LFP data + lfp = np.array(lfp_data_destriped[0]).T + + # Align times based on selected index and compute time window + selected_event_times = [events[aligning_index] for events in rewarded_times["event_times"]] + total_time = tbefore + tafter + + # Get trial-aligned indices for LFP data + align_times, trial_indices = preproc.base.trial_align_times(lfp_data['sync_timestamp'], selected_event_times, tbefore, tafter) + + # Filter out trials with mismatched lengths + valid_trial_indices = [indices for indices in trial_indices if len(indices) == (2500 * total_time)] + + # Initialize 3D array to hold aligned trial data (time x channels x trials) + num_trials = len(valid_trial_indices) + num_channels = 384 + num_samples = 2500 * total_time + stacked_arrays_3d = np.full((num_samples, num_channels, num_trials), np.nan) + + # Process each trial and subtract baseline from each channel + for j, indices in enumerate(valid_trial_indices): + trial_data = lfp[indices] + adjusted_channels = [] + + for ch in range(num_channels): + channel_data = trial_data[:, ch] + baseline = np.mean(channel_data[:1000]) # Baseline is the mean of the first 1000 samples + adjusted_channel = channel_data - baseline + adjusted_channels.append(adjusted_channel) + + # Stack adjusted channel data into the 3D array + stacked_arrays_3d[:, :, j] = np.array(adjusted_channels).T + + # Calculate the average across trials + average_array = np.nanmean(stacked_arrays_3d, axis=2) + + return average_array, stacked_arrays_3d diff --git a/tests/test_analysis.py b/tests/test_analysis.py index f798ff6a..4c0e15c7 100644 --- a/tests/test_analysis.py +++ b/tests/test_analysis.py @@ -11,6 +11,7 @@ import os import matplotlib.pyplot as plt from scipy import signal +from unittest.mock import MagicMock test_dir = os.path.dirname(__file__) data_dir = os.path.join(test_dir, 'data') @@ -1905,6 +1906,49 @@ def test_calc_confidence_interval_overlap(self): overlap = aopy.analysis.calc_confidence_interval_overlap(CI1,CI2) self.assertEqual(overlap,(20-17)/(20-10)) + +class TestCalcMaxCohAndLags(unittest.TestCase): + def setUp(self): + # Define the parameters for the test + self.lags = [-0.1, 0, 0.1] # Lags in seconds + self.frequency_range = [0, 4] # Frequency range (0 to 4 Hz) + + # Simulate some LFP data with random values + self.num_channels = 3 + self.num_timepoints = 5000 + self.lfp_array1 = np.random.randn(self.num_channels, self.num_timepoints) + self.lfp_array2 = np.random.randn(self.num_channels, self.num_timepoints) + + # Mock taper parameters and the coherence calculation function + global precondition + precondition = MagicMock() + precondition.convert_taper_parameters = MagicMock(return_value=(256, 2, 3)) # Mock return values for tapers + + global calc_mt_tfcoh + calc_mt_tfcoh = MagicMock(return_value=( + np.linspace(0, 200, 100), # Mock frequency array (0-200 Hz) + np.linspace(-0.5, 0.5, 50), # Mock time array (centered around 0) + np.random.rand(100, 50) # Mock coherence array (freq x time) + )) + + def test_calc_max_coh_and_lags(self): + # Run the function + max_coherence_matrix, lag_of_max_coherence_matrix = aopy.analysis.calc_max_coh_and_lags( + self.lags, self.lfp_array1, self.lfp_array2, self.frequency_range + ) + + # Check the shapes of the output matrices + self.assertEqual(max_coherence_matrix.shape, (self.num_channels, self.num_channels)) + self.assertEqual(lag_of_max_coherence_matrix.shape, (self.num_channels, self.num_channels)) + + # Verify the values fall within expected ranges + self.assertTrue(np.all(max_coherence_matrix >= 0) and np.all(max_coherence_matrix <= 1), + "Coherence values should be between 0 and 1") + + # Check if lags are converted to milliseconds in lag_of_max_coherence_matrix + self.assertTrue(np.all(np.isin(lag_of_max_coherence_matrix, np.array(self.lags) * 1000)), + "Lags should be in milliseconds and match the provided lags") + if __name__ == "__main__": unittest.main() diff --git a/tests/test_preproc.py b/tests/test_preproc.py index 9d8417d4..8e21db48 100644 --- a/tests/test_preproc.py +++ b/tests/test_preproc.py @@ -10,6 +10,9 @@ import numpy as np import unittest from pathlib import Path +from unittest.mock import patch, MagicMock +import pickle +import io test_dir = os.path.dirname(__file__) @@ -1754,5 +1757,65 @@ def test_calibrate_sensor(self): powers = preproc.laser.calibrate_sensor(voltage, 12.) np.testing.assert_allclose(powers, [8.57, 5.07, 10.32], atol=1e-2) + +class TestMakeAverageNeuropixelArray(unittest.TestCase): + + @patch("builtins.open", new_callable=MagicMock) + @patch("aopy.data.load_hdf_group") + @patch("aopy.data.bmi3d.tabulate_behavior_data_center_out") + @patch("aopy.preproc.base.trial_align_times") + def test_make_average_neuropixel_array(self, mock_trial_align_times, mock_tabulate_behavior_data, mock_load_hdf_group, mock_open): + # Define mock inputs + te_id = 123 + date = "2023-01-01" + subject = "subject_1" + aligning_index = 2 # Example aligning index for "Go-Cue" + tbefore = 2 # 2 seconds before + tafter = 2 # 2 seconds after + + # Set up mock LFP data and behavior times + mock_lfp_data_destriped = [np.random.randn(384, 10000)] + + # Simulate binary file content for pickle + mock_file = io.BytesIO(pickle.dumps(mock_lfp_data_destriped)) + mock_open.return_value.__enter__.return_value = mock_file + + mock_lfp_data = {"sync_timestamp": np.linspace(0, 10000, 10000)} + mock_load_hdf_group.return_value = mock_lfp_data + + # Mock behavior times, with 'reward' and event times for alignment + mock_behavior_data = pd.DataFrame({ + "reward": [True, False, True], # Only reward trials are used + "event_times": [ + [0.5, 1.0, 2.0, 3.0, 4.0, 5.0], + [1.5, 2.0, 3.0, 4.0, 5.0, 6.0], + [2.5, 3.0, 4.0, 5.0, 6.0, 7.0] + ] + }) + + # Mock trial alignment indices based on sync timestamps and event times + mock_trial_align_times.return_value = ( + [1.5, 2.5, 3.5], # Aligned event times + [np.arange(2500 * 4), np.arange(2500 * 4), np.arange(2500 * 4)] + ) + + # Call the function with mock data + average_array, stacked_arrays_3d = make_average_neuropixel_array( + te_id, date, subject, aligning_index, tbefore, tafter + ) + + # Assert output shapes + expected_num_samples = 2500 * (tbefore + tafter) + self.assertEqual(average_array.shape, (expected_num_samples, 384), "Average array shape is incorrect") + self.assertEqual(stacked_arrays_3d.shape, (expected_num_samples, 384, 3), "Stacked arrays 3D shape is incorrect") + + # Check for baseline adjustment by asserting that the mean of the baseline window is near zero for each channel + baseline_means = np.mean(stacked_arrays_3d[:1000, :, :], axis=0) # Baseline is first 1000 samples + self.assertTrue(np.allclose(baseline_means, 0, atol=1e-1), "Baseline not adjusted correctly across channels") + + # Check if NaN mean handling is correct (should have no NaNs in the final average) + self.assertFalse(np.isnan(average_array).any(), "average_array should not contain NaNs") + + if __name__ == "__main__": unittest.main()