diff --git a/.gitignore b/.gitignore
index 8ac6f36..79e93bd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,3 +5,5 @@ __pycache__
.DS_Store
*.egg-info/
.idea
+test*
+.env*
\ No newline at end of file
diff --git a/brpylib/brMiscFxns.py b/brpylib/brMiscFxns.py
index 33d4019..884df7f 100644
--- a/brpylib/brMiscFxns.py
+++ b/brpylib/brMiscFxns.py
@@ -28,17 +28,19 @@
pass
-def openfilecheck(open_mode, file_name="", file_ext="", file_type=""):
+def openfilecheck(open_mode, file_name="", file_ext="", file_type="", verbose=True, interactive=True):
"""
:param open_mode: {str} method to open the file (e.g., 'rb' for binary read only)
:param file_name: [optional] {str} full path of file to open
:param file_ext: [optional] {str} file extension (e.g., '.nev')
:param file_type: [optional] {str} file type for use when browsing for file (e.g., 'Blackrock NEV Files')
+ :param verbose: [optional] {bool} whether or not to print status information
+ :param interactive: [optional] {bool} will only try to prompt user for a file if True, otherwise allow failure
:return: {file} opened file
"""
while True:
- if not file_name: # no file name passed
+ if interactive and not file_name: # no file name passed
if not HAS_QT:
raise ModuleNotFoundError(
"Qt required for file dialog. Install PySide + qtpy or provide file_name."
@@ -75,19 +77,23 @@ def openfilecheck(open_mode, file_name="", file_ext="", file_type=""):
test_extension = file_ext
if fext[0 : len(test_extension)] != test_extension:
- file_name = ""
- print(
- "\n*** File given is not a "
- + file_ext
- + " file, try again ***\n"
- )
- continue
+ if interactive:
+ file_name = ""
+ print(
+ "\n*** File given is not a "
+ + file_ext
+ + " file, try again ***\n"
+ )
+ continue
+ else:
+ break
break
else:
file_name = ""
print("\n*** File given does exist, try again ***\n")
+ if verbose:
+ print("\n" + file_name.split("/")[-1] + " opened")
- print("\n" + file_name.split("/")[-1] + " opened")
return open(file_name, open_mode)
diff --git a/brpylib/brpylib.py b/brpylib/brpylib.py
index d4537d2..2461ee9 100644
--- a/brpylib/brpylib.py
+++ b/brpylib/brpylib.py
@@ -512,7 +512,7 @@ class NevFile:
basic header information.
"""
- def __init__(self, datafile=""):
+ def __init__(self, datafile="", verbose=True, interactive=True):
self.datafile = datafile
self.basic_header = {}
self.extended_headers = []
@@ -523,6 +523,8 @@ def __init__(self, datafile=""):
file_name=self.datafile,
file_ext=".nev",
file_type="Blackrock NEV Files",
+ verbose=verbose,
+ interactive=interactive
)
# extract basic header information
@@ -732,16 +734,17 @@ def getdata(self, elec_ids="all", wave_read="read"):
# extract only the "true" comments, distinct from ROI packets
trueComments = np.setdiff1d(
- list(range(0, len(commentPackets) - 1)), ROIPackets
- )
+ list(range(0, len(commentPackets))), ROIPackets
+ ).astype(int)
trueCommentsidx = np.asarray(commentPackets)[trueComments]
textComments = comments[trueCommentsidx]
- textComments[:, -1] = "$"
- stringarray = textComments.tostring()
- stringvector = stringarray.decode("latin-1")
- stringvector = stringvector[0:-1]
- validstrings = stringvector.replace("\x00", "")
- commentsFinal = validstrings.split("$")
+ commentsFinal = []
+ for text in textComments:
+ stringarray = text.astype(str).tobytes()
+ stringvector = stringarray.decode("latin-1")
+ stringvector = stringvector[0:-1]
+ validstring = stringvector.replace("\x00", "")
+ commentsFinal.append(validstring)
# Remove the ROI comments from the list
subsetInds = list(
@@ -761,7 +764,7 @@ def getdata(self, elec_ids="all", wave_read="read"):
nmCommentsidx = np.asarray(commentPackets)[ROIPackets]
nmcomments = comments[nmCommentsidx]
nmcomments[:, -1] = ":"
- nmstringarray = nmcomments.tostring()
+ nmstringarray = nmcomments.astype(str).tobytes()
nmstringvector = nmstringarray.decode("latin-1")
nmstringvector = nmstringvector[0:-1]
nmvalidstrings = nmstringvector.replace("\x00", "")
@@ -1020,7 +1023,7 @@ class NsxFile:
basic header information.
"""
- def __init__(self, datafile=""):
+ def __init__(self, datafile="", interactive=True, verbose=True):
self.datafile = datafile
self.basic_header = {}
@@ -1032,6 +1035,8 @@ def __init__(self, datafile=""):
file_name=self.datafile,
file_ext=".ns*",
file_type="Blackrock NSx Files",
+ interactive=interactive,
+ verbose=verbose
)
# Determine File ID to determine if File Spec 2.1
@@ -1219,11 +1224,8 @@ def getdata(
# memmap moves the file pointer inconsistently depending on platform and numpy version
curr_loc = self.datafile.tell()
expected_loc = bod + num_data_pts * data_pt_size
- if curr_loc == bod:
- # It did not move the pointer at all. Move it manually.
- self.datafile.seek(expected_loc - bod, 1)
- elif curr_loc > expected_loc:
- # Moved it too far (probably to end of file); move manually from beginning to expected.
+ if curr_loc != expected_loc:
+ # memmap moved the cursor to an unexpected location, update it manually
self.datafile.seek(expected_loc, 0)
else:
# 1 sample per packet. Reuse struct_arr.
diff --git a/build/lib/brpylib/__init__.py b/build/lib/brpylib/__init__.py
new file mode 100644
index 0000000..f87edc1
--- /dev/null
+++ b/build/lib/brpylib/__init__.py
@@ -0,0 +1 @@
+from .brpylib import NevFile, NsxFile, brpylib_ver
diff --git a/build/lib/brpylib/brMiscFxns.py b/build/lib/brpylib/brMiscFxns.py
new file mode 100644
index 0000000..884df7f
--- /dev/null
+++ b/build/lib/brpylib/brMiscFxns.py
@@ -0,0 +1,106 @@
+"""
+Random functions that may be useful elsewhere (or necessary)
+current version: 1.2.0 --- 08/04/2016
+
+@author: Mitch Frankel - Blackrock Microsystems
+
+Version History:
+v1.0.0 - 07/05/2016 - initial release
+v1.1.0 - 07/12/2016 - minor editing changes to print statements and addition of version control
+v1.2.0 - 08/04/2016 - minor modifications to allow use of Python 2.6+
+"""
+from os import getcwd, path
+
+try:
+ from qtpy.QtWidgets import QApplication, QFileDialog
+
+ HAS_QT = True
+except ModuleNotFoundError:
+ HAS_QT = False
+
+# Version control
+brmiscfxns_ver = "1.2.0"
+
+# Patch for use with Python 2.6+
+try:
+ input = raw_input
+except NameError:
+ pass
+
+
+def openfilecheck(open_mode, file_name="", file_ext="", file_type="", verbose=True, interactive=True):
+ """
+ :param open_mode: {str} method to open the file (e.g., 'rb' for binary read only)
+ :param file_name: [optional] {str} full path of file to open
+ :param file_ext: [optional] {str} file extension (e.g., '.nev')
+ :param file_type: [optional] {str} file type for use when browsing for file (e.g., 'Blackrock NEV Files')
+ :param verbose: [optional] {bool} whether or not to print status information
+ :param interactive: [optional] {bool} will only try to prompt user for a file if True, otherwise allow failure
+ :return: {file} opened file
+ """
+
+ while True:
+ if interactive and not file_name: # no file name passed
+ if not HAS_QT:
+ raise ModuleNotFoundError(
+ "Qt required for file dialog. Install PySide + qtpy or provide file_name."
+ )
+
+ # Ask user to specify a file path or browse
+ file_name = input(
+ "Enter complete " + file_ext + " file path or hit enter to browse: "
+ )
+
+ if not file_name:
+ if "app" not in locals():
+ app = QApplication([])
+ if not file_ext:
+ file_type = "All Files"
+ file_name = QFileDialog.getOpenFileName(
+ QFileDialog(),
+ "Select File",
+ getcwd(),
+ file_type + " (*" + file_ext + ")",
+ )
+ file_name = file_name[0]
+
+ # Ensure file exists (really needed for users type entering)
+ if path.isfile(file_name):
+ # Ensure given file matches file_ext
+ if file_ext:
+ _, fext = path.splitext(file_name)
+
+ # check for * in extension
+ if file_ext[-1] == "*":
+ test_extension = file_ext[:-1]
+ else:
+ test_extension = file_ext
+
+ if fext[0 : len(test_extension)] != test_extension:
+ if interactive:
+ file_name = ""
+ print(
+ "\n*** File given is not a "
+ + file_ext
+ + " file, try again ***\n"
+ )
+ continue
+ else:
+ break
+ break
+ else:
+ file_name = ""
+ print("\n*** File given does exist, try again ***\n")
+ if verbose:
+ print("\n" + file_name.split("/")[-1] + " opened")
+
+ return open(file_name, open_mode)
+
+
+def checkequal(iterator):
+ try:
+ iterator = iter(iterator)
+ first = next(iterator)
+ return all(first == rest for rest in iterator)
+ except StopIteration:
+ return True
diff --git a/build/lib/brpylib/brpylib.py b/build/lib/brpylib/brpylib.py
new file mode 100644
index 0000000..c2d53ec
--- /dev/null
+++ b/build/lib/brpylib/brpylib.py
@@ -0,0 +1,1668 @@
+# -*- coding: utf-8 -*-
+"""
+Collection of classes used for reading headers and data from Blackrock files
+current version: 2.0.1 --- 11/12/2021
+
+@author: Mitch Frankel - Blackrock Microsystems
+ Stephen Hou - v1.4.0 edits
+ David Kluger - v2.0.0 overhaul
+
+Version History:
+v1.0.0 - 07/05/2016 - initial release - requires brMiscFxns v1.0.0
+v1.1.0 - 07/08/2016 - inclusion of NsxFile.savesubsetnsx() for saving subset of Nsx data to disk4
+v1.1.1 - 07/09/2016 - update to NsxFile.savesubsetnsx() for option (not)overwriting subset files if already exist
+ bug fixes in NsxFile class as reported from beta user
+v1.2.0 - 07/12/2016 - bug fixes in NsxFile.savesubsetnsx()
+ added version control and checking for brMiscFxns
+ requires brMiscFxns v1.1.0
+v1.3.0 - 07/22/2016 - added 'samp_per_s' to NsxFile.getdata() output
+ added close() method to NsxFile and NevFile objects
+ NsxFile.getdata() now pre-allocates output['data'] as zeros - speed and safety
+v1.3.1 - 08/02/2016 - bug fixes to NsxFile.getdata() for usability with Python 2.7 as reported from beta user
+ patch for use with multiple NSP sync (overwriting of initial null data from initial data packet)
+ __future__ import for use with Python 2.7 (division)
+ minor modifications to allow use of Python 2.6+
+v1.3.2 - 08/12/2016 - bug fixes to NsXFile.getdata()
+v1.4.0 - 06/22/2017 - inclusion of wave_read parameter to NevFile.getdata() for including/excluding waveform data
+v2.0.0 - 04/27/2021 - numpy-based architecture rebuild of NevFile.getdata()
+v2.0.1 - 11/12/2021 - fixed indexing error in NevFile.getdata()
+ Added numpy architecture to NsxFile.getdata()
+v2.0.2 - 03/21/2023 - added logic to NsxFile.getdata() for where PTP timestamps are applied to every continuous sample
+v2.0.3 - 05/11/2023 - Fixed bug with memmap and file.seek
+"""
+
+
+from __future__ import division # for those using Python 2.6+
+
+from collections import namedtuple
+from datetime import datetime
+from math import ceil
+from os import path as ospath
+from struct import calcsize, pack, unpack, unpack_from
+
+import numpy as np
+
+from .brMiscFxns import brmiscfxns_ver, openfilecheck
+
+# Version control set/check
+brpylib_ver = "2.0.3"
+brmiscfxns_ver_req = "1.2.0"
+if brmiscfxns_ver.split(".") < brmiscfxns_ver_req.split("."):
+ raise Exception(
+ "brpylib requires brMiscFxns "
+ + brmiscfxns_ver_req
+ + " or higher, please use latest version"
+ )
+
+# Patch for use with Python 2.6+
+try:
+ input = raw_input
+except NameError:
+ pass
+
+# Define global variables to remove magic numbers
+#
+WARNING_SLEEP_TIME = 5
+DATA_PAGING_SIZE = 1024**3
+DATA_FILE_SIZE_MIN = 1024**2 * 10
+STRING_TERMINUS = "\x00"
+UNDEFINED = 0
+ELEC_ID_DEF = "all"
+START_TIME_DEF = 0
+DATA_TIME_DEF = "all"
+DOWNSAMPLE_DEF = 1
+START_OFFSET_MIN = 0
+STOP_OFFSET_MIN = 0
+
+UV_PER_BIT_21 = 0.25
+WAVEFORM_SAMPLES_21 = 48
+NSX_BASIC_HEADER_BYTES_22 = 314
+NSX_EXT_HEADER_BYTES_22 = 66
+DATA_BYTE_SIZE = 2
+TIMESTAMP_NULL_21 = 0
+MAX_SAMP_PER_S = 30000
+
+NO_FILTER = 0
+BUTTER_FILTER = 1
+SERIAL_MODE = 0
+
+RB2D_MARKER = 1
+RB2D_BLOB = 2
+RB3D_MARKER = 3
+BOUNDARY_2D = 4
+MARKER_SIZE = 5
+
+DIGITAL_PACKET_ID = 0
+NEURAL_PACKET_ID_MIN = 1
+NEURAL_PACKET_ID_MAX = 16384
+COMMENT_PACKET_ID = 65535
+VIDEO_SYNC_PACKET_ID = 65534
+TRACKING_PACKET_ID = 65533
+BUTTON_PACKET_ID = 65532
+CONFIGURATION_PACKET_ID = 65531
+
+PARALLEL_REASON = 1
+PERIODIC_REASON = 64
+SERIAL_REASON = 129
+LOWER_BYTE_MASK = 255
+FIRST_BIT_MASK = 1
+SECOND_BIT_MASK = 2
+
+CLASSIFIER_MIN = 1
+CLASSIFIER_MAX = 16
+CLASSIFIER_NOISE = 255
+
+CHARSET_ANSI = 0
+CHARSET_UTF = 1
+CHARSET_ROI = 255
+
+COMM_RGBA = 0
+COMM_TIME = 1
+
+BUTTON_PRESS = 1
+BUTTON_RESET = 2
+
+CHG_NORMAL = 0
+CHG_CRITICAL = 1
+
+ENTER_EVENT = 1
+EXIT_EVENT = 2
+#
+
+# Define a named tuple that has information about header/packet fields
+FieldDef = namedtuple("FieldDef", ["name", "formatStr", "formatFnc"])
+
+
+#
+def processheaders(curr_file, packet_fields):
+ """
+ :param curr_file: {file} the current BR datafile to be processed
+ :param packet_fields : {named tuple} the specific binary fields for the given header
+ :return: a fully unpacked and formatted tuple set of header information
+
+ Read a packet from a binary data file and return a list of fields
+ The amount and format of data read will be specified by the
+ packet_fields container
+ """
+
+ # This is a lot in one line. First I pull out all the format strings from
+ # the basic_header_fields named tuple, then concatenate them into a string
+ # with '<' at the front (for little endian format)
+ packet_format_str = "<" + "".join([fmt for name, fmt, fun in packet_fields])
+
+ # Calculate how many bytes to read based on the format strings of the header fields
+ bytes_in_packet = calcsize(packet_format_str)
+ packet_binary = curr_file.read(bytes_in_packet)
+
+ # unpack the binary data from the header based on the format strings of each field.
+ # This returns a list of data, but it's not always correctly formatted (eg, FileSpec
+ # is read as ints 2 and 3 but I want it as '2.3'
+ packet_unpacked = unpack(packet_format_str, packet_binary)
+
+ # Create a iterator from the data list. This allows a formatting function
+ # to use more than one item from the list if needed, and the next formatting
+ # function can pick up on the correct item in the list
+ data_iter = iter(packet_unpacked)
+
+ # create an empty dictionary from the name field of the packet_fields.
+ # The loop below will fill in the values with formatted data by calling
+ # each field's formatting function
+ packet_formatted = dict.fromkeys([name for name, fmt, fun in packet_fields])
+ for name, fmt, fun in packet_fields:
+ packet_formatted[name] = fun(data_iter)
+
+ return packet_formatted
+
+
+def format_filespec(header_list):
+ return str(next(header_list)) + "." + str(next(header_list)) # eg 2.3
+
+
+def format_timeorigin(header_list):
+ year = next(header_list)
+ month = next(header_list)
+ _ = next(header_list)
+ day = next(header_list)
+ hour = next(header_list)
+ minute = next(header_list)
+ second = next(header_list)
+ millisecond = next(header_list)
+ return datetime(year, month, day, hour, minute, second, millisecond * 1000)
+
+
+def format_stripstring(header_list):
+ string = bytes.decode(next(header_list), "latin-1")
+ return string.split(STRING_TERMINUS, 1)[0]
+
+
+def format_none(header_list):
+ return next(header_list)
+
+
+def format_freq(header_list):
+ return str(float(next(header_list)) / 1000) + " Hz"
+
+
+def format_filter(header_list):
+ filter_type = next(header_list)
+ if filter_type == NO_FILTER:
+ return "none"
+ elif filter_type == BUTTER_FILTER:
+ return "butterworth"
+
+
+def format_charstring(header_list):
+ return int(next(header_list))
+
+
+def format_digconfig(header_list):
+ config = next(header_list) & FIRST_BIT_MASK
+ if config:
+ return "active"
+ else:
+ return "ignored"
+
+
+def format_anaconfig(header_list):
+ config = next(header_list)
+ if config & FIRST_BIT_MASK:
+ return "low_to_high"
+ if config & SECOND_BIT_MASK:
+ return "high_to_low"
+ else:
+ return "none"
+
+
+def format_digmode(header_list):
+ dig_mode = next(header_list)
+ if dig_mode == SERIAL_MODE:
+ return "serial"
+ else:
+ return "parallel"
+
+
+def format_trackobjtype(header_list):
+ trackobj_type = next(header_list)
+ if trackobj_type == UNDEFINED:
+ return "undefined"
+ elif trackobj_type == RB2D_MARKER:
+ return "2D RB markers"
+ elif trackobj_type == RB2D_BLOB:
+ return "2D RB blob"
+ elif trackobj_type == RB3D_MARKER:
+ return "3D RB markers"
+ elif trackobj_type == BOUNDARY_2D:
+ return "2D boundary"
+ elif trackobj_type == MARKER_SIZE:
+ return "marker size"
+ else:
+ return "error"
+
+
+def getdigfactor(ext_headers, idx):
+ max_analog = ext_headers[idx]["MaxAnalogValue"]
+ min_analog = ext_headers[idx]["MinAnalogValue"]
+ max_digital = ext_headers[idx]["MaxDigitalValue"]
+ min_digital = ext_headers[idx]["MinDigitalValue"]
+ return float(max_analog - min_analog) / float(max_digital - min_digital)
+
+
+#
+
+
+#
+nev_header_dict = {
+ "basic": [
+ FieldDef("FileTypeID", "8s", format_stripstring), # 8 bytes - 8 char array
+ FieldDef("FileSpec", "2B", format_filespec), # 2 bytes - 2 unsigned char
+ FieldDef("AddFlags", "H", format_none), # 2 bytes - uint16
+ FieldDef("BytesInHeader", "I", format_none), # 4 bytes - uint32
+ FieldDef("BytesInDataPackets", "I", format_none), # 4 bytes - uint32
+ FieldDef("TimeStampResolution", "I", format_none), # 4 bytes - uint32
+ FieldDef("SampleTimeResolution", "I", format_none), # 4 bytes - uint32
+ FieldDef("TimeOrigin", "8H", format_timeorigin), # 16 bytes - 8 x uint16
+ FieldDef(
+ "CreatingApplication", "32s", format_stripstring
+ ), # 32 bytes - 32 char array
+ FieldDef("Comment", "256s", format_stripstring), # 256 bytes - 256 char array
+ FieldDef("NumExtendedHeaders", "I", format_none),
+ ], # 4 bytes - uint32
+ "ARRAYNME": FieldDef(
+ "ArrayName", "24s", format_stripstring
+ ), # 24 bytes - 24 char array
+ "ECOMMENT": FieldDef(
+ "ExtraComment", "24s", format_stripstring
+ ), # 24 bytes - 24 char array
+ "CCOMMENT": FieldDef(
+ "ContComment", "24s", format_stripstring
+ ), # 24 bytes - 24 char array
+ "MAPFILE": FieldDef(
+ "MapFile", "24s", format_stripstring
+ ), # 24 bytes - 24 char array
+ "NEUEVWAV": [
+ FieldDef("ElectrodeID", "H", format_none), # 2 bytes - uint16
+ FieldDef(
+ "PhysicalConnector", "B", format_charstring
+ ), # 1 byte - 1 unsigned char
+ FieldDef("ConnectorPin", "B", format_charstring), # 1 byte - 1 unsigned char
+ FieldDef("DigitizationFactor", "H", format_none), # 2 bytes - uint16
+ FieldDef("EnergyThreshold", "H", format_none), # 2 bytes - uint16
+ FieldDef("HighThreshold", "h", format_none), # 2 bytes - int16
+ FieldDef("LowThreshold", "h", format_none), # 2 bytes - int16
+ FieldDef(
+ "NumSortedUnits", "B", format_charstring
+ ), # 1 byte - 1 unsigned char
+ FieldDef(
+ "BytesPerWaveform", "B", format_charstring
+ ), # 1 byte - 1 unsigned char
+ FieldDef("SpikeWidthSamples", "H", format_none), # 2 bytes - uint16
+ FieldDef("EmptyBytes", "8s", format_none),
+ ], # 8 bytes - empty
+ "NEUEVLBL": [
+ FieldDef("ElectrodeID", "H", format_none), # 2 bytes - uint16
+ FieldDef("Label", "16s", format_stripstring), # 16 bytes - 16 char array
+ FieldDef("EmptyBytes", "6s", format_none),
+ ], # 6 bytes - empty
+ "NEUEVFLT": [
+ FieldDef("ElectrodeID", "H", format_none), # 2 bytes - uint16
+ FieldDef("HighFreqCorner", "I", format_freq), # 4 bytes - uint32
+ FieldDef("HighFreqOrder", "I", format_none), # 4 bytes - uint32
+ FieldDef("HighFreqType", "H", format_filter), # 2 bytes - uint16
+ FieldDef("LowFreqCorner", "I", format_freq), # 4 bytes - uint32
+ FieldDef("LowFreqOrder", "I", format_none), # 4 bytes - uint32
+ FieldDef("LowFreqType", "H", format_filter), # 2 bytes - uint16
+ FieldDef("EmptyBytes", "2s", format_none),
+ ], # 2 bytes - empty
+ "DIGLABEL": [
+ FieldDef("Label", "16s", format_stripstring), # 16 bytes - 16 char array
+ FieldDef("Mode", "?", format_digmode), # 1 byte - boolean
+ FieldDef("EmptyBytes", "7s", format_none),
+ ], # 7 bytes - empty
+ "NSASEXEV": [
+ FieldDef("Frequency", "H", format_none), # 2 bytes - uint16
+ FieldDef(
+ "DigitalInputConfig", "B", format_digconfig
+ ), # 1 byte - 1 unsigned char
+ FieldDef(
+ "AnalogCh1Config", "B", format_anaconfig
+ ), # 1 byte - 1 unsigned char
+ FieldDef("AnalogCh1DetectVal", "h", format_none), # 2 bytes - int16
+ FieldDef(
+ "AnalogCh2Config", "B", format_anaconfig
+ ), # 1 byte - 1 unsigned char
+ FieldDef("AnalogCh2DetectVal", "h", format_none), # 2 bytes - int16
+ FieldDef(
+ "AnalogCh3Config", "B", format_anaconfig
+ ), # 1 byte - 1 unsigned char
+ FieldDef("AnalogCh3DetectVal", "h", format_none), # 2 bytes - int16
+ FieldDef(
+ "AnalogCh4Config", "B", format_anaconfig
+ ), # 1 byte - 1 unsigned char
+ FieldDef("AnalogCh4DetectVal", "h", format_none), # 2 bytes - int16
+ FieldDef(
+ "AnalogCh5Config", "B", format_anaconfig
+ ), # 1 byte - 1 unsigned char
+ FieldDef("AnalogCh5DetectVal", "h", format_none), # 2 bytes - int16
+ FieldDef("EmptyBytes", "6s", format_none),
+ ], # 2 bytes - empty
+ "VIDEOSYN": [
+ FieldDef("VideoSourceID", "H", format_none), # 2 bytes - uint16
+ FieldDef("VideoSource", "16s", format_stripstring), # 16 bytes - 16 char array
+ FieldDef("FrameRate", "f", format_none), # 4 bytes - single float
+ FieldDef("EmptyBytes", "2s", format_none),
+ ], # 2 bytes - empty
+ "TRACKOBJ": [
+ FieldDef("TrackableType", "H", format_trackobjtype), # 2 bytes - uint16
+ FieldDef("TrackableID", "I", format_none), # 4 bytes - uint32
+ # FieldDef('PointCount', 'H', format_none), # 2 bytes - uint16
+ FieldDef("VideoSource", "16s", format_stripstring), # 16 bytes - 16 char array
+ FieldDef("EmptyBytes", "2s", format_none),
+ ], # 2 bytes - empty
+}
+
+nsx_header_dict = {
+ "basic_21": [
+ FieldDef("Label", "16s", format_stripstring), # 16 bytes - 16 char array
+ FieldDef("Period", "I", format_none), # 4 bytes - uint32
+ FieldDef("ChannelCount", "I", format_none),
+ ], # 4 bytes - uint32
+ "basic": [
+ FieldDef("FileSpec", "2B", format_filespec), # 2 bytes - 2 unsigned char
+ FieldDef("BytesInHeader", "I", format_none), # 4 bytes - uint32
+ FieldDef("Label", "16s", format_stripstring), # 16 bytes - 16 char array
+ FieldDef("Comment", "256s", format_stripstring), # 256 bytes - 256 char array
+ FieldDef("Period", "I", format_none), # 4 bytes - uint32
+ FieldDef("TimeStampResolution", "I", format_none), # 4 bytes - uint32
+ FieldDef("TimeOrigin", "8H", format_timeorigin), # 16 bytes - 8 uint16
+ FieldDef("ChannelCount", "I", format_none),
+ ], # 4 bytes - uint32
+ "extended": [
+ FieldDef("Type", "2s", format_stripstring), # 2 bytes - 2 char array
+ FieldDef("ElectrodeID", "H", format_none), # 2 bytes - uint16
+ FieldDef(
+ "ElectrodeLabel", "16s", format_stripstring
+ ), # 16 bytes - 16 char array
+ FieldDef("PhysicalConnector", "B", format_none), # 1 byte - uint8
+ FieldDef("ConnectorPin", "B", format_none), # 1 byte - uint8
+ FieldDef("MinDigitalValue", "h", format_none), # 2 bytes - int16
+ FieldDef("MaxDigitalValue", "h", format_none), # 2 bytes - int16
+ FieldDef("MinAnalogValue", "h", format_none), # 2 bytes - int16
+ FieldDef("MaxAnalogValue", "h", format_none), # 2 bytes - int16
+ FieldDef("Units", "16s", format_stripstring), # 16 bytes - 16 char array
+ FieldDef("HighFreqCorner", "I", format_freq), # 4 bytes - uint32
+ FieldDef("HighFreqOrder", "I", format_none), # 4 bytes - uint32
+ FieldDef("HighFreqType", "H", format_filter), # 2 bytes - uint16
+ FieldDef("LowFreqCorner", "I", format_freq), # 4 bytes - uint32
+ FieldDef("LowFreqOrder", "I", format_none), # 4 bytes - uint32
+ FieldDef("LowFreqType", "H", format_filter),
+ ], # 2 bytes - uint16
+ "data": [
+ FieldDef("Header", "B", format_none), # 1 byte - uint8
+ FieldDef("Timestamp", "I", format_none), # 4 bytes - uint32
+ FieldDef("NumDataPoints", "I", format_none),
+ ], # 4 bytes - uint32]
+}
+#
+
+
+#
+def check_elecid(elec_ids):
+ if type(elec_ids) is str and elec_ids != ELEC_ID_DEF:
+ print(
+ "\n*** WARNING: Electrode IDs must be 'all', a single integer, or a list of integers."
+ )
+ print(" Setting elec_ids to 'all'")
+ elec_ids = ELEC_ID_DEF
+ if elec_ids != ELEC_ID_DEF and type(elec_ids) is not list:
+ if type(elec_ids) == range:
+ elec_ids = list(elec_ids)
+ elif type(elec_ids) == int:
+ elec_ids = [elec_ids]
+ return elec_ids
+
+
+def check_starttime(start_time_s):
+ if not isinstance(start_time_s, (int, float)) or (
+ isinstance(start_time_s, (int, float)) and start_time_s < START_TIME_DEF
+ ):
+ print("\n*** WARNING: Start time is not valid, setting start_time_s to 0")
+ start_time_s = START_TIME_DEF
+ return start_time_s
+
+
+def check_datatime(data_time_s):
+ if (type(data_time_s) is str and data_time_s != DATA_TIME_DEF) or (
+ isinstance(data_time_s, (int, float)) and data_time_s < 0
+ ):
+ print("\n*** WARNING: Data time is not valid, setting data_time_s to 'all'")
+ data_time_s = DATA_TIME_DEF
+ return data_time_s
+
+
+def check_downsample(downsample):
+ if not isinstance(downsample, int) or downsample < DOWNSAMPLE_DEF:
+ print(
+ "\n*** WARNING: downsample must be an integer value greater than 0. "
+ " Setting downsample to 1 (no downsampling)"
+ )
+ downsample = DOWNSAMPLE_DEF
+ if downsample > 1:
+ print(
+ "\n*** WARNING: downsample will be deprecated in a future version."
+ " Set downsample to 1 (default) to match future behavior."
+ "\n*** WARNING: downsample does not perform anti-aliasing."
+ )
+ return downsample
+
+
+def check_dataelecid(elec_ids, all_elec_ids):
+ unique_elec_ids = set(elec_ids)
+ all_elec_ids = set(all_elec_ids)
+
+ # if some electrodes asked for don't exist, reset list with those that do, or throw error and return
+ if not unique_elec_ids.issubset(all_elec_ids):
+ if not unique_elec_ids & all_elec_ids:
+ print("\nNone of the elec_ids passed exist in the data, returning None")
+ return None
+ else:
+ print(
+ "\n*** WARNING: Channels "
+ + str(sorted(list(unique_elec_ids - all_elec_ids)))
+ + " do not exist in the data"
+ )
+ unique_elec_ids = unique_elec_ids & all_elec_ids
+
+ return sorted(list(unique_elec_ids))
+
+
+def check_filesize(file_size):
+ if file_size < DATA_FILE_SIZE_MIN:
+ print("\n file_size must be larger than 10 Mb, setting file_size=10 Mb")
+ return DATA_FILE_SIZE_MIN
+ else:
+ return int(file_size)
+
+
+#
+
+
+class NevFile:
+ """
+ attributes and methods for all BR event data files. Initialization opens the file and extracts the
+ basic header information.
+ """
+
+ def __init__(self, datafile="", verbose=True, interactive=True):
+ self.datafile = datafile
+ self.basic_header = {}
+ self.extended_headers = []
+
+ # Run openfilecheck and open the file passed or allow user to browse to one
+ self.datafile = openfilecheck(
+ "rb",
+ file_name=self.datafile,
+ file_ext=".nev",
+ file_type="Blackrock NEV Files",
+ verbose=verbose,
+ interactive=interactive
+ )
+
+ # extract basic header information
+ self.basic_header = processheaders(self.datafile, nev_header_dict["basic"])
+
+ # Extract extended headers
+ for i in range(self.basic_header["NumExtendedHeaders"]):
+ self.extended_headers.append({})
+ header_string = bytes.decode(
+ unpack("<8s", self.datafile.read(8))[0], "latin-1"
+ )
+ self.extended_headers[i]["PacketID"] = header_string.split(
+ STRING_TERMINUS, 1
+ )[0]
+ self.extended_headers[i].update(
+ processheaders(
+ self.datafile, nev_header_dict[self.extended_headers[i]["PacketID"]]
+ )
+ )
+
+ # Must set this for file spec 2.1 and 2.2
+ if (
+ header_string == "NEUEVWAV"
+ and float(self.basic_header["FileSpec"]) < 2.3
+ ):
+ self.extended_headers[i]["SpikeWidthSamples"] = WAVEFORM_SAMPLES_21
+
+ def getdata(self, elec_ids="all", wave_read="read"):
+ """
+ This function is used to return a set of data from the NEV datafile.
+
+ :param elec_ids: [optional] {list} User selection of elec_ids to extract specific spike waveforms (e.g., [13])
+ :param wave_read: [optional] {STR} 'read' or 'no_read' - whether to read waveforms or not
+ :return: output: {Dictionary} with one or more of the following dictionaries (all include TimeStamps)
+ dig_events: Reason, Data, [for file spec 2.2 and below, AnalogData and AnalogDataUnits]
+ spike_events: Units='nV', ChannelID, NEUEVWAV_HeaderIndices, Classification, Waveforms
+ comments: CharSet, Flag, Data, Comment
+ video_sync_events: VideoFileNum, VideoFrameNum, VideoElapsedTime_ms, VideoSourceID
+ tracking_events: ParentID, NodeID, NodeCount, TrackingPoints
+ button_trigger_events: TriggerType
+ configuration_events: ConfigChangeType
+
+ Note: For digital and neural data - TimeStamps, Classification, and Data can be lists of lists when more
+ than one digital type or spike event exists for a channel
+ """
+
+ # Initialize output dictionary and reset position in file (if read before, may not be here anymore)
+ output = dict()
+
+ # Safety checks
+ elec_ids = check_elecid(elec_ids)
+
+ ######
+ # extract raw data
+ self.datafile.seek(0, 2)
+ lData = self.datafile.tell()
+ nPackets = int(
+ (lData - self.basic_header["BytesInHeader"])
+ / self.basic_header["BytesInDataPackets"]
+ )
+ self.datafile.seek(self.basic_header["BytesInHeader"], 0)
+ rawdata = self.datafile.read()
+ # rawdataArray = np.reshape(np.fromstring(rawdata,'B'),(nPackets,self.basic_header['BytesInDataPackets']))
+
+ # Find all timestamps and PacketIDs
+ if self.basic_header["FileTypeID"] == "BREVENTS":
+ tsBytes = 8
+ ts = np.ndarray(
+ (nPackets,),
+ " 0:
+ ChannelID = PacketID
+ if type(elec_ids) is list:
+ elecindices = [
+ idx
+ for idx, element in enumerate(ChannelID[neuralPackets])
+ if element in elec_ids
+ ]
+ neuralPackets = [neuralPackets[index] for index in elecindices]
+
+ spikeUnit = np.ndarray(
+ (nPackets,),
+ " 0:
+ insertionReason = np.ndarray(
+ (nPackets,),
+ " 0:
+ charSet = np.ndarray(
+ (nPackets,),
+ " 0:
+ charSetList[ANSIPackets] = "ANSI"
+ UTFPackets = [
+ idx for idx, element in enumerate(charSet) if element == CHARSET_UTF
+ ]
+ if len(UTFPackets) > 0:
+ charSetList[UTFPackets] = "UTF "
+
+ # need to transfer comments from neuromotive. identify region of interest (ROI) events...
+ ROIPackets = [
+ idx for idx, element in enumerate(charSet) if element == CHARSET_ROI
+ ]
+
+ lcomment = self.basic_header["BytesInDataPackets"] - tsBytes - 10
+ comments = np.chararray(
+ (nPackets, lcomment),
+ 1,
+ False,
+ rawdata,
+ tsBytes + 8,
+ (self.basic_header["BytesInDataPackets"], 1),
+ )
+
+ # extract only the "true" comments, distinct from ROI packets
+ trueComments = np.setdiff1d(
+ list(range(0, len(commentPackets) - 1)), ROIPackets
+ ).astype(int)
+ trueCommentsidx = np.asarray(commentPackets)[trueComments]
+ textComments = comments[trueCommentsidx]
+ commentsFinal = []
+ for text in textComments:
+ stringarray = text.astype(str).tobytes()
+ stringvector = stringarray.decode("latin-1")
+ stringvector = stringvector[0:-1]
+ validstring = stringvector.replace("\x00", "")
+ commentsFinal.append(validstring)
+
+ # Remove the ROI comments from the list
+ subsetInds = list(
+ set(list(range(0, len(charSetList) - 1))) - set(ROIPackets)
+ )
+
+ output["comments"] = {
+ "TimeStamps": list(ts[trueCommentsidx]),
+ "TimeStampsStarted": list(tsStarted[trueCommentsidx]),
+ "Data": commentsFinal,
+ "CharSet": list(charSetList[subsetInds]),
+ }
+
+ # parsing and outputing ROI events
+ if len(ROIPackets) > 0:
+ nmPackets = np.asarray(ROIPackets)
+ nmCommentsidx = np.asarray(commentPackets)[ROIPackets]
+ nmcomments = comments[nmCommentsidx]
+ nmcomments[:, -1] = ":"
+ nmstringarray = nmcomments.astype(str).tobytes()
+ nmstringvector = nmstringarray.decode("latin-1")
+ nmstringvector = nmstringvector[0:-1]
+ nmvalidstrings = nmstringvector.replace("\x00", "")
+ nmcommentsFinal = nmvalidstrings.split(":")
+ ROIfields = [l.split(":") for l in ":".join(nmcommentsFinal).split(":")]
+ ROIfieldsRS = np.reshape(ROIfields, (len(ROIPackets), 5))
+ output["tracking_events"] = {
+ "TimeStamps": list(ts[nmCommentsidx]),
+ "ROIName": list(ROIfieldsRS[:, 0]),
+ "ROINumber": list(ROIfieldsRS[:, 1]),
+ "Event": list(ROIfieldsRS[:, 2]),
+ "Frame": list(ROIfieldsRS[:, 3]),
+ }
+
+ # NeuroMotive video syncronization packets
+ vidsyncPackets = [
+ idx
+ for idx, element in enumerate(PacketID)
+ if element == VIDEO_SYNC_PACKET_ID
+ ]
+ if len(vidsyncPackets) > 0:
+ fileNumber = np.ndarray(
+ (nPackets,),
+ " 0:
+ trackerObjs = [
+ sub["VideoSource"]
+ for sub in self.extended_headers
+ if sub["PacketID"] == "TRACKOBJ"
+ ]
+ trackerIDs = [
+ sub["TrackableID"]
+ for sub in self.extended_headers
+ if sub["PacketID"] == "TRACKOBJ"
+ ]
+ output["tracking"] = {
+ "TrackerIDs": trackerIDs,
+ "TrackerTypes": [
+ sub["TrackableType"]
+ for sub in self.extended_headers
+ if sub["PacketID"] == "TRACKOBJ"
+ ],
+ }
+ parentID = np.ndarray(
+ (nPackets,),
+ " 0:
+ trigType = np.ndarray(
+ (nPackets,),
+ " 0:
+ changeType = np.ndarray(
+ (nPackets,),
+ "= 3.x with PTP timestamping.
+ :return: output: {Dictionary} of: data_headers: {list} dictionaries of all data headers, 1 per segment
+ [seg_id]["Timestamp"]: timestamps of each sample in segment
+ if full_timestamps, else timestamp of first sample in segment
+ [seg_id]["NumDataPoints"]: number of samples in segment
+ [seg_id]["data_time_s"]: duration in segment
+ elec_ids: {list} elec_ids that were extracted (sorted)
+ start_time_s: {float} starting time for data extraction
+ data_time_s: {float} length of time of data returned
+ downsample: {int} data downsampling factor
+ samp_per_s: {float} output data samples per second
+ data: {numpy array} continuous data in a 2D elec x samps numpy array
+ (or samps x elec if elec_rows is False).
+ """
+ # Safety checks
+ start_time_s = check_starttime(start_time_s)
+ data_time_s = check_datatime(data_time_s)
+ downsample = check_downsample(downsample)
+ elec_ids = check_elecid(elec_ids)
+ if zeropad and self.basic_header["TimeStampResolution"] == 1e9:
+ print("zeropad does not work with ptp-timestamped data. Ignoring zeropad argument.\n")
+ zeropad = False
+ if force_srate and self.basic_header["TimeStampResolution"] != 1e9:
+ print("force_srate only works with ptp timestamps in filespec >= 3.x. Ignoring force_srate argument.\n")
+ force_srate = False
+
+ # initialize parameters
+ output = dict()
+ output["start_time_s"] = float(start_time_s)
+ output["data_time_s"] = data_time_s
+ output["downsample"] = downsample
+ output["elec_ids"] = []
+ output["data_headers"] = [] # List of dicts with fields Timestamp, NumDataPoints, data_time_s, BoH, BoD
+ output["data"] = [] # List of ndarrays
+ output["samp_per_s"] = self.basic_header["SampleResolution"] / self.basic_header["Period"]
+
+ # Pull some useful variables from the basic_header
+ data_pt_size = self.basic_header["ChannelCount"] * DATA_BYTE_SIZE
+ clk_per_samp = self.basic_header["Period"] * self.basic_header["TimeStampResolution"] / self.basic_header["SampleResolution"]
+ filespec_maj, filespec_min = tuple([int(_) for _ in self.basic_header["FileSpec"].split(".")][:2])
+
+ # Timestamp is 64-bit for filespec >= 3.0
+ ts_type, ts_size = (" 2 else ("= 3:
+ # Starty by assuming that these files are from firmware >= 7.6 thus we have 1 sample per packet.
+ npackets = int((eof - eoh) / np.dtype(ptp_dt).itemsize)
+ struct_arr = np.memmap(self.datafile, dtype=ptp_dt, shape=npackets, offset=eoh, mode="r")
+ self.datafile.seek(eoh, 0) # Reset to end-of-header in case memmap moved the pointer.
+ samp_per_pkt = np.all(struct_arr["num_data_points"] == 1) # Confirm 1 sample per packet
+
+ if not samp_per_pkt:
+ # Multiple samples per packet; 1 packet == 1 uninterrupted segment.
+ while 0 < self.datafile.tell() < ospath.getsize(self.datafile.name):
+ # boh = self.datafile.tell() # Beginning of segment header
+ self.datafile.seek(1, 1) # Skip the reserved 0x01
+ timestamp = unpack(ts_type, self.datafile.read(ts_size))[0]
+ num_data_pts = unpack(" seg_thresh_clk).flatten()))
+ for seg_ix, seg_start_idx in enumerate(seg_starts):
+ seg_stop_idx = seg_starts[seg_ix + 1] if seg_ix < (len(seg_starts) - 1) else (len(struct_arr) - 1)
+ offset = eoh + seg_start_idx * struct_arr.dtype.itemsize
+ num_data_pts = seg_stop_idx - seg_start_idx
+ seg_struct_arr = np.memmap(self.datafile, dtype=ptp_dt, shape=num_data_pts, offset=offset, mode="r")
+ output["data_headers"].append({
+ "Timestamp": seg_struct_arr["timestamps"],
+ "NumDataPoints": num_data_pts,
+ "data_time_s": num_data_pts / output["samp_per_s"]
+ })
+ output["data"].append(seg_struct_arr["samples"])
+
+ ## Post-processing ##
+
+ # Drop segments that are not within the requested time window
+ ts_0 = output["data_headers"][0]["Timestamp"][0]
+ start_time_ts = start_time_s * self.basic_header["TimeStampResolution"]
+ test_start_ts = ts_0 + start_time_ts
+ test_stop_ts = np.inf # Will update below
+ if start_time_s != START_TIME_DEF:
+ # Keep segments with at least one sample on-or-after test_start_ts
+ b_keep = [_["Timestamp"][-1] >= test_start_ts for _ in output["data_headers"]]
+ output["data_headers"] = [_ for _, b in zip(output["data_headers"], b_keep) if b]
+ output["data"] = [_ for _, b in zip(output["data"], b_keep) if b]
+ if data_time_s != DATA_TIME_DEF:
+ # Keep segments with at least one sample on-or-before test_stop_ts
+ data_time_ts = data_time_s * self.basic_header["TimeStampResolution"]
+ test_stop_ts = test_start_ts + data_time_ts
+ b_keep = [_["Timestamp"][0] <= test_stop_ts for _ in output["data_headers"]]
+ output["data_headers"] = [_ for _, b in zip(output["data_headers"], b_keep) if b]
+ output["data"] = [_ for _, b in zip(output["data"], b_keep) if b]
+
+ # Post-process segments for start_time_s, data_time_s, zeropad
+ for ix, data_header in enumerate(output["data_headers"]):
+ data = output["data"][ix]
+ # start_time_s and data_time_s
+ b_keep = np.ones((data.shape[0],), dtype=bool)
+ if start_time_s > START_TIME_DEF and data_header["Timestamp"][0] < test_start_ts:
+ # if segment begins before test_start_ts, slice it to begin at test_start_ts.
+ b_keep &= data_header["Timestamp"] >= test_start_ts
+ if data_time_s != DATA_TIME_DEF and data_header["Timestamp"][-1] > test_stop_ts:
+ # if segment finishes after start_time_s + data_time_s, slice it to finish at start_time_s + data_time_s
+ b_keep &= data_header["Timestamp"] <= test_stop_ts
+ if np.any(~b_keep):
+ data_header["Timestamp"] = data_header["Timestamp"][b_keep]
+ data = data[b_keep]
+
+ # zeropad: Prepend the data with zeros so its first timestamp is nsp_time=0.
+ if ix == 0 and zeropad and data_header["Timestamp"][0] != 0:
+ # Calculate how many samples we need.
+ padsize = ceil(data_header["Timestamp"][0] / self.basic_header["Period"])
+ pad_dat = np.zeros((padsize, data.shape[1]), dtype=data.dtype)
+ # Stack pad_dat in front of output["data"][ix]. Slow! Might run out of memory!
+ try:
+ data = np.vstack((pad_dat, data))
+ except MemoryError as err:
+ err.args += (
+ " Output data size requested is larger than available memory. Use the parameters\n"
+ " for getdata(), e.g., 'elec_ids', to request a subset of the data or use\n"
+ " NsxFile.savesubsetnsx() to create subsets of the main nsx file\n",
+ )
+ raise
+ pad_ts = data_header["Timestamp"][0] - (clk_per_samp * np.arange(1, padsize + 1)).astype(np.int64)[::-1]
+ data_header["Timestamp"] = np.hstack((pad_ts, data_header["Timestamp"]))
+
+ # force_srate: Force the returned arrays to have exactly the expected number of samples per elapsed ptp time.
+ if force_srate:
+ # Dur of segment in ts-clks (nanoseconds)
+ seg_clks = data_header["Timestamp"][-1] - data_header["Timestamp"][0] + np.uint64(clk_per_samp)
+ # Number of samples in segment
+ npoints = data.shape[0]
+ # Expected number of samples based on duration.
+ n_expected = seg_clks / clk_per_samp
+ # How many are we missing? -ve number means we have too many.
+ n_insert = int(np.round(n_expected - npoints))
+ # identify where in the segments the data should be added/removed
+ insert_inds = np.linspace(0, npoints, num=abs(n_insert) + 1, endpoint=False, dtype=int)[1:]
+ if n_insert > 0:
+ # Create samples for the middle of the N largest gaps then insert.
+ insert_vals = (data[insert_inds] + data[insert_inds + 1]) / 2
+ data = np.insert(data, insert_inds, insert_vals, axis=0)
+ elif n_insert < 0:
+ data = np.delete(data, insert_inds, axis=0)
+
+ # Replace data_header["Timestamp"] with ideal timestamps
+ data_header["Timestamp"] = data_header["Timestamp"][0] + (clk_per_samp * np.arange(data.shape[0])).astype(np.int64)
+
+ if downsample > 1:
+ data = data[::downsample]
+
+ data_header["NumDataPoints"] = data.shape[0]
+ data_header["data_time_s"] = data_header["NumDataPoints"] / output["samp_per_s"]
+
+ if elec_rows:
+ data = data.T
+
+ output["data"][ix] = data
+
+ if not full_timestamps:
+ data_header["Timestamp"] = data_header["Timestamp"][0]
+
+ return output
+
+ def savesubsetnsx(
+ self, elec_ids="all", file_size=None, file_time_s=None, file_suffix=""
+ ):
+ """
+ This function is used to save a subset of data based on electrode IDs, file sizing, or file data time. If
+ both file_time_s and file_size are passed, it will default to file_time_s and determine sizing accordingly.
+
+ :param elec_ids: [optional] {list} List of elec_ids to extract (e.g., [13])
+ :param file_size: [optional] {int} Byte size of each subset file to save (e.g., 1024**3 = 1 Gb). If nothing
+ is passed, file_size will be all data points.
+ :param file_time_s: [optional] {float} Time length of data for each subset file, in seconds (e.g. 60.0). If
+ nothing is passed, file_size will be used as default.
+ :param file_suffix: [optional] {str} Suffix to append to NSx datafile name for subset files. If nothing is
+ passed, default will be "_subset".
+ :return: None - None of the electrodes requested exist in the data
+ SUCCESS - All file subsets extracted and saved
+ """
+
+ # Initializations
+ elec_id_indices = []
+ file_num = 1
+ pausing = False
+ datafile_datapt_size = self.basic_header["ChannelCount"] * DATA_BYTE_SIZE
+ self.datafile.seek(0, 0)
+
+ # Run electrode id checks and set num_elecs
+ elec_ids = check_elecid(elec_ids)
+ if self.basic_header["FileSpec"] == "2.1":
+ all_elec_ids = self.basic_header["ChannelID"]
+ else:
+ all_elec_ids = [x["ElectrodeID"] for x in self.extended_headers]
+
+ if elec_ids == ELEC_ID_DEF:
+ elec_ids = all_elec_ids
+ else:
+ elec_ids = check_dataelecid(elec_ids, all_elec_ids)
+ if not elec_ids:
+ return None
+ else:
+ elec_id_indices = [all_elec_ids.index(x) for x in elec_ids]
+
+ num_elecs = len(elec_ids)
+
+ # If file_size or file_time_s passed, check it and set file_sizing accordingly
+ if file_time_s:
+ if file_time_s and file_size:
+ print(
+ "\nWARNING: Only one of file_size or file_time_s can be passed, defaulting to file_time_s."
+ )
+ file_size = int(
+ num_elecs
+ * DATA_BYTE_SIZE
+ * file_time_s
+ * self.basic_header["TimeStampResolution"]
+ / self.basic_header["Period"]
+ )
+ if self.basic_header["FileSpec"] == "2.1":
+ file_size += 32 + 4 * num_elecs
+ else:
+ file_size += (
+ NSX_BASIC_HEADER_BYTES_22 + NSX_EXT_HEADER_BYTES_22 * num_elecs + 5
+ )
+ print(
+ "\nBased on timing request, file size will be {0:d} Mb".format(
+ int(file_size / 1024**2)
+ )
+ )
+ elif file_size:
+ file_size = check_filesize(file_size)
+
+ # Create and open subset file as writable binary, if it already exists ask user for overwrite permission
+ file_name, file_ext = ospath.splitext(self.datafile.name)
+ if file_suffix:
+ file_name += "_" + file_suffix
+ else:
+ file_name += "_subset"
+
+ if ospath.isfile(file_name + "_000" + file_ext):
+ if "y" != input(
+ "\nFile '"
+ + file_name.split("/")[-1]
+ + "_xxx"
+ + file_ext
+ + "' already exists, overwrite [y/n]: "
+ ):
+ print("\nExiting, no overwrite, returning None")
+ return None
+ else:
+ print("\n*** Overwriting existing subset files ***")
+
+ subset_file = open(file_name + "_000" + file_ext, "wb")
+ print("\nWriting subset file: " + ospath.split(subset_file.name)[1])
+
+ # For file spec 2.1:
+ # 1) copy the first 28 bytes from the datafile (these are unchanged)
+ # 2) write subset channel count and channel ID to file
+ # 3) skip ahead in datafile the number of bytes in datafile ChannelCount(4) plus ChannelID (4*ChannelCount)
+ if self.basic_header["FileSpec"] == "2.1":
+ subset_file.write(self.datafile.read(28))
+ subset_file.write(np.array(num_elecs).astype(np.uint32).tobytes())
+ subset_file.write(np.array(elec_ids).astype(np.uint32).tobytes())
+ self.datafile.seek(4 + 4 * self.basic_header["ChannelCount"], 1)
+
+ # For file spec 2.2 and above
+ # 1) copy the first 10 bytes from the datafile (unchanged)
+ # 2) write subset bytes-in-headers and skip 4 bytes in datafile, noting position of this for update later
+ # 3) copy the next 296 bytes from datafile (unchanged)
+ # 4) write subset channel-count value and skip 4 bytes in datafile
+ # 5) append extended headers based on the channel ID. Must read the first 4 bytes, determine if correct
+ # Channel ID, repack first 4 bytes, write to disk, then copy remaining 62 (66-4) bytes
+ else:
+ subset_file.write(self.datafile.read(10))
+ bytes_in_headers = (
+ NSX_BASIC_HEADER_BYTES_22 + NSX_EXT_HEADER_BYTES_22 * num_elecs
+ )
+ num_pts_header_pos = bytes_in_headers + 5
+ subset_file.write(np.array(bytes_in_headers).astype(np.uint32).tobytes())
+ self.datafile.seek(4, 1)
+ subset_file.write(self.datafile.read(296))
+ subset_file.write(np.array(num_elecs).astype(np.uint32).tobytes())
+ self.datafile.seek(4, 1)
+
+ for i in range(len(self.extended_headers)):
+ h_type = self.datafile.read(2)
+ chan_id = self.datafile.read(2)
+ if unpack("