diff --git a/acq4/devices/PatchPipette/statemanager.py b/acq4/devices/PatchPipette/statemanager.py index 0ba4475bd..f846a3d3e 100644 --- a/acq4/devices/PatchPipette/statemanager.py +++ b/acq4/devices/PatchPipette/statemanager.py @@ -38,6 +38,7 @@ class PatchPipetteStateManager(Qt.QObject): states.CellAttachedState, states.BreakInState, states.WholeCellState, + states.ClearState, states.ResealState, states.BlowoutState, states.BrokenState, diff --git a/acq4/devices/PatchPipette/states/__init__.py b/acq4/devices/PatchPipette/states/__init__.py index 4196e8538..2a16541f5 100644 --- a/acq4/devices/PatchPipette/states/__init__.py +++ b/acq4/devices/PatchPipette/states/__init__.py @@ -7,6 +7,7 @@ from .cell_attached import CellAttachedState from .cell_detect import CellDetectAnalysis, CellDetectState from .clean import CleanState +from .clear import ClearState from .fouled import FouledState from .move_nucleus_to_home import MoveNucleusToHomeState from .nucleus_collect import NucleusCollectState @@ -32,6 +33,7 @@ 'BreakInState', 'ResealAnalysis', 'ResealState', + 'ClearState', 'MoveNucleusToHomeState', 'BlowoutState', 'CleanState', diff --git a/acq4/devices/PatchPipette/states/clear.py b/acq4/devices/PatchPipette/states/clear.py new file mode 100644 index 000000000..aaf69e458 --- /dev/null +++ b/acq4/devices/PatchPipette/states/clear.py @@ -0,0 +1,89 @@ +import numpy as np +import scipy + +from acq4.devices.PatchPipette.states import PatchPipetteState +from acq4.util import ptime + + +class ClearState(PatchPipetteState): + """A cross between Break In and Seal that scans pressures for possible recovery of a cell loss. + + Parameters + ---------- + recoveryTimeout : float + Time (s) to spend trying to recover from a cell loss before giving up (default 30 s). + recoveryResistanceThresholdAbsolute : float + Access resistance (Ohms) below which to consider the cell loss successfully reversed and + transition to 'whole cell' state (default 10 MΩ). + recoverySustainedTime : float + Time (s) that resistance must be below the recovery threshold before considering the cell loss successfully + reversed and transitioning to 'whole cell' state (default 2 s). + """ + + stateName = 'clear' + + _parameterDefaultOverrides = { + 'initialPressureSource': 'atmosphere', + 'initialClampMode': 'VC', + 'initialVCHolding': -70e-3, + 'initialTestPulseEnable': True, + 'fallbackState': 'fouled', + } + _parameterTreeConfig = { + 'recoveryTimeout': {'type': 'float', 'default': 60.0, 'suffix': 's'}, + 'recoveryResistanceThresholdAbsolute': {'type': 'float', 'default': 10e6, 'suffix': 'Ω', 'siPrefix': True}, + 'recoverySustainedTime': {'type': 'float', 'default': 2.0, 'suffix': 's'}, + } + + def run(self): + # TODO relative R_acc threshold? + start = ptime.time() + tps = [] + while start + 2 > ptime.time(): + tps.extend(self.processAtLeastOneTestPulse()) + best_r_acc = min(tp['access_resistance'] for tp in tps) + pressure = -500 # Pa + start = ptime.time() + sign_has_flipped = False + while start + self.config['recoveryTimeout'] > ptime.time(): + first_recovery_time = None + while ( + self.processAtLeastOneTestPulse()[-1].analysis['access_resistance'] + < self.config['recoveryResistanceThresholdAbsolute'] + ): + if first_recovery_time is None: + first_recovery_time = ptime.time() + elif first_recovery_time + self.config['recoverySustainedTime'] < ptime.time(): + self.setState("cell recovered") + # todo make sure when this lands in `main` we use the dict-style return + return "whole cell" + + pulse_start = ptime.time() + tps_during = [] + self.waitFor(self.dev.pressureDevice.setPressure(pressure, source='regulator')) + while pulse_start + 2 > ptime.time(): + tps_during.extend(self.processAtLeastOneTestPulse()) + self.dev.pressureDevice.setPressure(0, source='atmosphere') + r_acc_during = np.asarray([tp['access_resistance'] for tp in tps_during]) + time_during = np.asarray([tp['event_time'] for tp in tps_during]) + slope = scipy.stats.linregress(r_acc_during, time_during).slope + if slope < 0: + tps_after = [] + while pulse_start + 2 > ptime.time(): + tps_after.extend(self.processAtLeastOneTestPulse()) + r_acc_after = np.asarray([tp['access_resistance'] for tp in tps_after]) + if r_acc_after.mean() < best_r_acc: + best_r_acc = r_acc_after.mean() + # and repeat the same pressure + else: + # increase the pressure magnitude + pressure = np.sign(pressure) * (abs(pressure) + 500) + else: + if sign_has_flipped: + # increase pressure magnitude and flip sign + pressure = -1 * np.sign(pressure) * (abs(pressure) + 500) + else: + # first flip keeps magnitude the same + sign_has_flipped = True + pressure = -1 * pressure + return self.config['fallbackState'] diff --git a/acq4/devices/PatchPipette/states/whole_cell.py b/acq4/devices/PatchPipette/states/whole_cell.py index 14bf5353f..c648b785d 100644 --- a/acq4/devices/PatchPipette/states/whole_cell.py +++ b/acq4/devices/PatchPipette/states/whole_cell.py @@ -5,6 +5,19 @@ class WholeCellState(PatchPipetteState): + """State representing a successful break-in, with the pipette dialed in for whole-cell recording. + + Parameters + ---------- + cellLossResistanceThresholdAbsolute : float + If the pipette resistance (Ra) rises above this threshold (Ω), transition to + `cellLossState` (default 20 MΩ). + cellLossState : str + Name of state to transition to if possible cell loss is detected (default 'clear'). + cellLossSustainedTime : float + Time (s) that resistance must be above threshold before transitioning to `cellLossState` + (default 2 s). + """ stateName = 'whole cell' _parameterDefaultOverrides = { 'initialPressureSource': 'atmosphere', @@ -14,6 +27,11 @@ class WholeCellState(PatchPipetteState): 'initialAutoBiasEnable': True, 'initialAutoBiasTarget': -70e-3, } + _parameterTreeConfig = { + 'cellLossResistanceThresholdAbsolute': {'type': 'float', 'value': 20e6, 'suffix': 'Ω', 'siPrefix': True, 'step': 1e6}, + 'cellLossState': {'type': 'str', 'value': 'clear'}, + 'cellLossSustainedTime': {'type': 'float', 'value': 2.0, 'suffix': 's', 'step': 0.5}, + } def run(self): patchrec = self.dev.patchRecord() @@ -22,8 +40,26 @@ def run(self): # TODO: Option to switch to I=0 for a few seconds to get initial RMP decay + # TODO relative R_acc threshold? + # tps = [] + # start = ptime.time() + # while start + 5 > ptime.time(): + # tps.extend(self.processAtLeastOneTestPulse()) + + threshold = self.config['cellLossResistanceThresholdAbsolute'] + first_loss_time = None while True: - # TODO: monitor for cell loss + tps = self.processAtLeastOneTestPulse() + r_access = tps[-1].analysis['access_resistance'] + if r_access > threshold: + if first_loss_time is None: + first_loss_time = ptime.time() + elif first_loss_time + self.config['cellLossSustainedTime'] < ptime.time(): + self.setState(f"cell loss in progress (R_acc rose to {r_access / 1e6:.1f} MΩ)") + # TODO make sure when this lands in `main` we use the dict-style return + return "clear" + else: + first_loss_time = None self.sleep(0.1) def cleanup(self): diff --git a/acq4/util/ptime.py b/acq4/util/ptime.py index 99769ccd1..51e197c75 100644 --- a/acq4/util/ptime.py +++ b/acq4/util/ptime.py @@ -31,3 +31,14 @@ def unixTime(): time = winTime else: time = unixTime + + +def loop(duration=None, end_time=None): + """Generator that yields the current time in a loop until the specified duration has elapsed or + end_time is reached.""" + if duration is None and end_time is None: + raise ValueError("Must specify either duration or end_time") + if end_time is None: + end_time = time() + duration + while (now := time()) < end_time: + yield now