diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b5bb35ad..f49814fc 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,21 +15,21 @@ repos: - id: trailing-whitespace - id: end-of-file-fixer - repo: https://github.com/asottile/pyupgrade - rev: v3.19.1 + rev: v3.20.0 hooks: - id: pyupgrade args: [--py39-plus] - repo: https://github.com/pycqa/isort - rev: 5.13.2 + rev: 6.0.1 hooks: - id: isort args: ["--profile", "black"] - repo: https://github.com/psf/black - rev: 24.10.0 + rev: 25.1.0 hooks: - id: black args: ["--line-length", "120"] - repo: https://github.com/pycqa/flake8 - rev: 7.1.1 + rev: 7.2.0 hooks: - id: flake8 diff --git a/docs/device_object.rst b/docs/device_object.rst index 9b1116c4..3bbb8b8b 100644 --- a/docs/device_object.rst +++ b/docs/device_object.rst @@ -194,7 +194,7 @@ This stores all parameter values (of parameters that can be set). With you can reset it to the stored configuration. Alternatively you can use "device.save_state(name)" and "device.set_state(name)" to store and set multiple working points with -custom names. They can also be accessed via "device.states" in case you forgot the name. To store a state in json-file, use device.save_to_file(name, path), you can load it +custom names. They can also be accessed via "device.states" in case you forgot the name. To store a state in json-file, use device.save_to_file(name, path), you can load it via "device.load_state_from_file(name, path)" to load it (use "device.set_state(name)" afterwards to set it) or with "device.set_state_from_file(name, path)" to directly apply the configuration to the device. For all of those methods the parameters are ramped to the final state by default (with the default QuMada ramp rate), again depending on the "device.ramp" setting. diff --git a/src/qumada/instrument/custom_drivers/ZI/MFLI.py b/src/qumada/instrument/custom_drivers/ZI/MFLI.py index 3294c0e1..ca37f189 100644 --- a/src/qumada/instrument/custom_drivers/ZI/MFLI.py +++ b/src/qumada/instrument/custom_drivers/ZI/MFLI.py @@ -41,11 +41,13 @@ class MFLI(Instrument): """ def __init__( - self, name: str, device: str, + self, + name: str, + device: str, serverhost: str = "localhost", - existing_session: Session = None, - allow_version_mismatch = False, - **kwargs + existing_session: Session = None, + allow_version_mismatch=False, + **kwargs, ): super().__init__(name, **kwargs) if isinstance(existing_session, Session): diff --git a/src/qumada/instrument/mapping/Dummies/DummyDac.py b/src/qumada/instrument/mapping/Dummies/DummyDac.py index 3c8c97fe..ffe04a1c 100644 --- a/src/qumada/instrument/mapping/Dummies/DummyDac.py +++ b/src/qumada/instrument/mapping/Dummies/DummyDac.py @@ -60,8 +60,8 @@ def ramp( if not start_values: start_values = [param.get() for param in parameters] - - print("Ramping...") + + print("Ramping...") instrument._triggered_ramp_channels( [param._instrument for param in parameters], start_values, end_values, ramp_time, num_points ) @@ -96,7 +96,7 @@ def pulse( def setup_trigger_in(): pass - + def force_trigger(self): self._instrument.force_trigger() - self._instrument._is_triggered.clear() \ No newline at end of file + self._instrument._is_triggered.clear() diff --git a/src/qumada/instrument/mapping/Harvard/Decadac.py b/src/qumada/instrument/mapping/Harvard/Decadac.py index 28430dca..ed3dc20f 100644 --- a/src/qumada/instrument/mapping/Harvard/Decadac.py +++ b/src/qumada/instrument/mapping/Harvard/Decadac.py @@ -19,12 +19,14 @@ # - Till Huckeman +import logging + from qcodes.instrument.parameter import Parameter from qumada.instrument.custom_drivers.Harvard.Decadac import Decadac from qumada.instrument.mapping import DECADAC_MAPPING from qumada.instrument.mapping.base import InstrumentMapping -import logging + logger = logging.getLogger(__name__) @@ -159,6 +161,6 @@ def trigger_in(self, trigger: str | None) -> None: def force_trigger(self): string = "" for b in range(0, 6): - for c in range(0,6): - string+=f"B{b};C{c};G0;" - self._instrument.write(string) \ No newline at end of file + for c in range(0, 6): + string += f"B{b};C{c};G0;" + self._instrument.write(string) diff --git a/src/qumada/instrument/mapping/QDevil/qdac.py b/src/qumada/instrument/mapping/QDevil/qdac.py index 7c65d786..a96da913 100644 --- a/src/qumada/instrument/mapping/QDevil/qdac.py +++ b/src/qumada/instrument/mapping/QDevil/qdac.py @@ -30,6 +30,7 @@ class QDacMapping(InstrumentMapping): def __init__(self): super().__init__(QDAC_MAPPING) self.max_ramp_channels = 8 + def ramp( self, parameters: list[Parameter], @@ -72,7 +73,7 @@ def ramp( def setup_trigger_in(self): raise Exception("QDac does not have a trigger input!") - + def force_trigger(self): pass # Not required as QDac has no trigger input and starts ramps instantly. diff --git a/src/qumada/instrument/mapping/QDevil/qdac2.py b/src/qumada/instrument/mapping/QDevil/qdac2.py index ace4d7e2..65f30136 100644 --- a/src/qumada/instrument/mapping/QDevil/qdac2.py +++ b/src/qumada/instrument/mapping/QDevil/qdac2.py @@ -182,7 +182,7 @@ def setup_trigger_in(self): "QDac2 does not have a trigger input \ not yet supported!" ) - + def force_trigger(self): pass # Currently no trigger inputs are supported, thus all ramps are started diff --git a/src/qumada/instrument/mapping/base.py b/src/qumada/instrument/mapping/base.py index 99dd7fe9..7d41a2cd 100644 --- a/src/qumada/instrument/mapping/base.py +++ b/src/qumada/instrument/mapping/base.py @@ -76,8 +76,10 @@ def pulse( **kwargs, ) -> None: """Defining qumada pulse. Requires proper implementation for each instrument""" - raise Exception("Pulse not properly implemented for this instrument!\ - You cannot use pulsed measurements with this instrument.") + raise Exception( + "Pulse not properly implemented for this instrument!\ + You cannot use pulsed measurements with this instrument." + ) @abstractmethod def setup_trigger_in(self, trigger_settings: dict) -> None: diff --git a/src/qumada/measurement/device_object.py b/src/qumada/measurement/device_object.py index 4c5a410b..7e785232 100644 --- a/src/qumada/measurement/device_object.py +++ b/src/qumada/measurement/device_object.py @@ -1,12 +1,11 @@ from __future__ import annotations +import json import logging from abc import ABC from copy import deepcopy -from typing import Any from time import sleep -import json - +from typing import Any import numpy as np from qcodes import Station @@ -15,7 +14,10 @@ from qumada.instrument.buffers.buffer import map_triggers, save_trigger_mapping from qumada.instrument.mapping import map_terminals_gui -from qumada.instrument.mapping.base import load_mapped_terminal_parameters, save_mapped_terminal_parameters +from qumada.instrument.mapping.base import ( + load_mapped_terminal_parameters, + save_mapped_terminal_parameters, +) from qumada.measurement.measurement import MeasurementScript, load_param_whitelist from qumada.measurement.scripts import ( Generic_1D_Hysteresis_buffered, @@ -120,40 +122,40 @@ def set_state(self, name: str, ramp=None, **kwargs): Returns ------- None - """ + """ self.update_terminal_parameters() if ramp is None: ramp = self.ramp self.load_from_dict(self.states[name]) self.set_stored_values(ramp=ramp, **kwargs) - + def save_state_to_file(self, name: str, path: str): """ - Saves the specified state to a json file. - - Parameters - ---------- - name : str - The name of the state to save. - path : str - The file path where the state will be saved. Has to be a json file! - - Returns - ------- - None - - Notes - ----- - Before saving, any setpoints in terminal parameters are cleared by setting them to `None` - to avoid problems with json.dump. - """ + Saves the specified state to a json file. + + Parameters + ---------- + name : str + The name of the state to save. + path : str + The file path where the state will be saved. Has to be a json file! + + Returns + ------- + None + + Notes + ----- + Before saving, any setpoints in terminal parameters are cleared by setting them to `None` + to avoid problems with json.dump. + """ for t in self.terminals: for param in self.terminals[t].terminal_parameters: self.terminals[t].terminal_parameters[param].properties["setpoints"] = None state = self.states[name] with open(file=path, mode="w") as f: - json.dump(state, f) - + json.dump(state, f) + def load_state_from_file(self, name: str, path: str): """ Loads a state from a json file and stores it in the device object. @@ -169,10 +171,10 @@ def load_state_from_file(self, name: str, path: str): ------- None """ - with open(file=path, mode="r") as f: + with open(file=path) as f: state = json.load(f) self.states[name] = state - + def set_state_from_file(self, name: str, path: str): """ Sets the devices state by loading it from a json file. @@ -319,17 +321,17 @@ def save_to_dict(self, priorize_stored_value=False): logger.warning(f"Couldn't find value for {terminal_name} {param_name}") return return_dict - - def map_terminals(self, - terminal_parameters: None | dict = None, - path: None | str = None, - skip_gui_if_mapped: bool = True, - ): + def map_terminals( + self, + terminal_parameters: None | dict = None, + path: None | str = None, + skip_gui_if_mapped: bool = True, + ): """ Maps devices terminal parameters using map_terminals_gui. You can pass an existing mapping as terminal_parameters. If a path is provided it first tries to use the provided mapping file. - + Parameters ---------- terminal_parameters : None | dict, optional @@ -355,33 +357,35 @@ def map_terminals(self, if not isinstance(self.station, Station): raise TypeError("No valid qcodes station found. Make sure you have set the station attribute correctly!") if path is not None: - load_mapped_terminal_parameters(terminal_parameters, self.station, path) - map_terminals_gui(self.station.components, - self.terminal_parameters, - terminal_parameters, - skip_gui_if_mapped = skip_gui_if_mapped) + load_mapped_terminal_parameters(terminal_parameters, self.station, path) + map_terminals_gui( + self.station.components, + self.terminal_parameters, + terminal_parameters, + skip_gui_if_mapped=skip_gui_if_mapped, + ) self.update_terminal_parameters() - - def mapping(self, - terminal_parameters: None | dict = None, - path: None | str = None, - skip_gui_if_mapped = True, - ): - #TODO: Remove! - logger.warning("Deprecation Warning: device.mapping was renamed to \ + def mapping( + self, + terminal_parameters: None | dict = None, + path: None | str = None, + skip_gui_if_mapped=True, + ): + # TODO: Remove! + logger.warning( + "Deprecation Warning: device.mapping was renamed to \ device.map_terminals. Device.mapping will be removed \ - in a future release!") + in a future release!" + ) self.map_terminals(terminal_parameters, path, skip_gui_if_mapped) - - - def save_terminal_mapping(self, - path: str): + + def save_terminal_mapping(self, path: str): """ Save terminal mapping to specified file (json). """ save_mapped_terminal_parameters(self.terminal_parameters, path) - + def map_triggers( self, components: None | dict = None, @@ -394,7 +398,7 @@ def map_triggers( Uses components of assigned station by default. Ignores already mapped triggers by default. You can provide a path in order to load and existing mapping. - + Parameters ---------- components : None|dict, optional @@ -410,14 +414,9 @@ def map_triggers( """ if components is None: components = self.station.components - map_triggers(components=components, - skip_mapped=skip_mapped, - path=path, - kwargs=kwargs) - - def save_trigger_mapping( - self, - path: str): + map_triggers(components=components, skip_mapped=skip_mapped, path=path, kwargs=kwargs) + + def save_trigger_mapping(self, path: str): """ Save trigger mapping to json file. @@ -518,7 +517,7 @@ def timetrace( map_triggers(station.components) data = script.run() return data - + def sweep_1d( self, params: Parameter | list[Parameter], @@ -526,12 +525,12 @@ def sweep_1d( num_points: int = 100, dynamic_values: None | list[float] = None, backsweep: bool = False, - name = None, - metadata = None, - station = None, - buffered = False, + name=None, + metadata=None, + station=None, + buffered=False, buffer_settings: dict | None = None, - priorize_stored_value = False, + priorize_stored_value=False, **kwargs, ): """ @@ -552,8 +551,8 @@ def sweep_1d( Number of points measured in each sweep. Doubled if backsweep is True. dynamic_values : None | list[float], optional List of values for the dynamic parameters (only if a list of params is provided). - Ignored if only one parameter is passed. - Parameters are kept at this value during the ramps of the other parameters. + Ignored if only one parameter is passed. + Parameters are kept at this value during the ramps of the other parameters. Current values of the parameters are used if it is None. Default is None. backsweep : bool, optional @@ -588,7 +587,7 @@ def sweep_1d( Notes ----- - Again: This measurement can only do linear ramps! - - Does one measurement for each param provided. + - Does one measurement for each param provided. - Ignores other dynamic parameters that are not in params - Records only gettable parameters. """ @@ -603,49 +602,55 @@ def sweep_1d( if dynamic_values is None: dynamic_values = [param() for param in params] assert len(params) == len(dynamic_values) - else: + else: assert len(params) == len(dynamic_values) for param, val in zip(params, dynamic_values): param(val) for param, setpoint, val in zip(params, sweep_range, dynamic_values): - data.append(*param.measured_ramp( - value = setpoint[-1], - num_points=len(setpoint), - start=setpoint[0], - station=station, - name=name, - metadata=metadata, - backsweep=backsweep, - buffered=buffered, - buffer_settings=buffer_settings, - priorize_stored_value=priorize_stored_value, - )) + data.append( + *param.measured_ramp( + value=setpoint[-1], + num_points=len(setpoint), + start=setpoint[0], + station=station, + name=name, + metadata=metadata, + backsweep=backsweep, + buffered=buffered, + buffer_settings=buffer_settings, + priorize_stored_value=priorize_stored_value, + ) + ) param(val) - + elif isinstance(params, Terminal_Parameter): assert dynamic_values != list if dynamic_values is not None: val = dynamic_values else: val = params() - data.append(*params.measured_ramp( - value=sweep_range[-1], - num_points=num_points, - start=sweep_range[0], - station=station, - name=name, - metadata=metadata, - backsweep=backsweep, - buffered=buffered, - buffer_settings=buffer_settings, - priorize_stored_value=priorize_stored_value, - )) + data.append( + *params.measured_ramp( + value=sweep_range[-1], + num_points=num_points, + start=sweep_range[0], + station=station, + name=name, + metadata=metadata, + backsweep=backsweep, + buffered=buffered, + buffer_settings=buffer_settings, + priorize_stored_value=priorize_stored_value, + ) + ) params(val) return data - + def sweep_2D(): - logger.exception("Deprecation Warning: sweep_2D was renamed to sweep_2d \ - for better naming consistency!") + logger.exception( + "Deprecation Warning: sweep_2D was renamed to sweep_2d \ + for better naming consistency!" + ) def sweep_2d( self, @@ -1256,11 +1261,10 @@ def value(self, value): self._value = value self.instrument_parameter(value) - @value.getter def value(self): return self.instrument_parameter() - + @property def instrument_parameter(self): return self._instrument_parameter diff --git a/src/qumada/measurement/measurement.py b/src/qumada/measurement/measurement.py index 4be66abd..94227ee3 100644 --- a/src/qumada/measurement/measurement.py +++ b/src/qumada/measurement/measurement.py @@ -22,6 +22,8 @@ from __future__ import annotations import copy +import functools +import importlib import inspect import json import logging @@ -31,20 +33,22 @@ from contextlib import suppress from datetime import datetime from functools import wraps -from typing import Any, Callable from time import sleep +from typing import Any, Callable import numpy as np import qcodes as qc from qcodes import Station from qcodes.dataset import AbstractSweep, LinSweep from qcodes.dataset.dond.do_nd_utils import ActionsT +from qcodes.dataset.measurements import Measurement as QCoDeSMeasurement from qcodes.parameters import Parameter, ParameterBase from qumada.instrument.buffers import is_bufferable, is_triggerable from qumada.metadata import Metadata +from qumada.utils.liveplot import MeasurementAndPlot from qumada.utils.ramp_parameter import ramp_or_set_parameter, ramp_or_set_parameters -from qumada.utils.utils import flatten_array, _validate_mapping +from qumada.utils.utils import _validate_mapping, flatten_array logger = logging.getLogger(__name__) @@ -104,6 +108,7 @@ class MeasurementScript(ABC): """ PARAMETER_NAMES: set[str] = load_param_whitelist() + DEFAULT_LIVE_PLOTTER: callable = None def __init__(self): # Create function hooks for metadata @@ -112,10 +117,30 @@ def __init__(self): self.run = create_hook(self.run, self._add_data_to_metadata) self.run = create_hook(self.run, self._add_current_datetime_to_metadata) + self.live_plotter = self.DEFAULT_LIVE_PLOTTER + self.properties: dict[Any, Any] = {} self.terminal_parameters: dict[Any, dict[Any, Parameter | None] | Parameter | None] = {} self._buffered_num_points: int | None = None + def _new_measurement(self, name, **kwargs) -> Union[MeasurementAndPlot, QCoDeSMeasurement]: + if self.live_plotter is None: + return QCoDeSMeasurement(name=name, **kwargs) + else: + return MeasurementAndPlot(script=self, name=name, gui=self.live_plotter, **kwargs) + + def _dond(self, *args, **kwargs): + """This is a wrapper around qcodes dond function that monkeypatches the live plotter in the datasaver""" + # we need to use importlib here because the dond function shadows the qcodes.dataset.dond package + do_nd = importlib.import_module("qcodes.dataset.dond.do_nd") + + prev_meas_cls = do_nd.Measurement + try: + do_nd.Measurement = self._new_measurement + return do_nd.dond(*args, **kwargs) + finally: + do_nd.Measurement = prev_meas_cls + def add_terminal_parameter(self, parameter_name: str, gate_name: str = None, parameter: Parameter = None) -> None: """ Adds a gate parameter to self.terminal_parameters. @@ -449,7 +474,7 @@ def generate_lists(self) -> None: } self.trigger_ins = { param.root_instrument._qumada_mapping for param in self.dynamic_channels if is_triggerable(param) - } #Independent of self.buffered to allow parallel ramping for unbuffered measurement initialization. + } # Independent of self.buffered to allow parallel ramping for unbuffered measurement initialization. self.sort_by_priority() self._lists_created = True self._relabel_instruments() @@ -495,7 +520,7 @@ def initialize(self, dyn_ramp_to_val=False, inactive_dyn_channels: list | None = setpoint_intervall = self.settings.get("setpoint_intervall", 0.1) trigger_start = self.settings.get("trigger_start", "software") # TODO: this should be set elsewhere trigger_reset = self.settings.get("trigger_reset", None) - trigger_type = self.settings.get("trigger_type", None), + trigger_type = (self.settings.get("trigger_type", None),) if not self._lists_created: self.generate_lists() # for item in self.compensated_parameters: @@ -663,10 +688,16 @@ def initialize(self, dyn_ramp_to_val=False, inactive_dyn_channels: list | None = else: raise Exception(f"{gettable_param} is not bufferable.") self.ready_triggers() - ramp_or_set_parameters(ramp_params, ramp_targets, ramp_rate, - ramp_time, setpoint_intervall, trigger_start=trigger_start, - trigger_type=trigger_type, trigger_reset=trigger_reset) - + ramp_or_set_parameters( + ramp_params, + ramp_targets, + ramp_rate, + ramp_time, + setpoint_intervall, + trigger_start=trigger_start, + trigger_type=trigger_type, + trigger_reset=trigger_reset, + ) @abstractmethod def run(self) -> list: @@ -676,7 +707,6 @@ def run(self) -> list: """ return [] - def clean_up(self, additional_actions: list[Callable] | None = None, **kwargs) -> None: """ Things to do after the measurement is complete. Cleans up subscribed paramteres for @@ -704,7 +734,7 @@ def ready_buffers(self, **kwargs) -> None: buffer.setup_buffer(settings=self.buffer_settings) buffer.start() self.ready_triggers() - + def ready_triggers(self, **kwargs): """ Prepare trigger inputs for not buffered instruments. @@ -785,8 +815,8 @@ def _insert_metadata_into_db(self, *args, insert_metadata_into_db: bool = True, metadata.save() except Exception as ex: print(f"Metadata could not inserted into database: {ex}") - - def trigger_measurement(self, parameters, setpoints, method = "ramp" ,sync_trigger=None): + + def trigger_measurement(self, parameters, setpoints, method="ramp", sync_trigger=None): TRIGGER_TYPES = ["software", "hardware", "manual"] trigger_start = self.settings.get("trigger_start", "manual") # TODO: this should be set elsewhere @@ -797,28 +827,28 @@ def trigger_measurement(self, parameters, setpoints, method = "ramp" ,sync_trigg default="software", default_key_error="software", ) - setpoints_mapping = {param : setpoint for param, setpoint in zip(parameters, setpoints)} + setpoints_mapping = {param: setpoint for param, setpoint in zip(parameters, setpoints)} if sync_trigger is None: sync_trigger = () buffer_timeout_multiplier = self.settings.get("buffer_timeout_multiplier", 20) # Some logic to sort instruments. Instruments with sync-triggers have to be added last, # as executing _qumada_pulse/_ramp with them instantly runs the pulse/ramp, before other instruments - # that wait for a trigger signal are added and prepared. + # that wait for a trigger signal are added and prepared. instruments_set = {param.root_instrument for param in parameters} instruments = [instrument for instrument in instruments_set if instrument not in sync_trigger] for instrument in instruments_set: if instrument in sync_trigger: instruments.append(instrument) - + for instr in instruments: instr_params = [param for param in parameters if param.root_instrument is instr] if method == "ramp": try: instr._qumada_ramp( - parameters = instr_params, - end_values = [setpoints_mapping[param][-1] for param in instr_params], - ramp_time = self._burst_duration, - sync_trigger = sync_trigger, + parameters=instr_params, + end_values=[setpoints_mapping[param][-1] for param in instr_params], + ramp_time=self._burst_duration, + sync_trigger=sync_trigger, ) except AttributeError as ex: logger.error( @@ -828,14 +858,14 @@ def trigger_measurement(self, parameters, setpoints, method = "ramp" ,sync_trigg Use the unbuffered script!" ) raise ex - + elif method == "pulse": try: instr._qumada_pulse( - parameters = instr_params, - setpoints = [setpoints_mapping[param] for param in instr_params], - delay = self._burst_duration / self.buffered_num_points, - sync_trigger = sync_trigger, + parameters=instr_params, + setpoints=[setpoints_mapping[param] for param in instr_params], + delay=self._burst_duration / self.buffered_num_points, + sync_trigger=sync_trigger, ) except AttributeError as ex: logger.error( @@ -849,7 +879,7 @@ def trigger_measurement(self, parameters, setpoints, method = "ramp" ,sync_trigg with NameError as ex: logger.error("Argument 'method' has to be eiter 'ramp' or 'pulse'") raise ex - + if trigger_type == "manual": logger.warning( "You are using manual triggering. If you want to pulse parameters on multiple" diff --git a/src/qumada/measurement/scripts/generic_measurement.py b/src/qumada/measurement/scripts/generic_measurement.py index 0156fa50..26f94802 100644 --- a/src/qumada/measurement/scripts/generic_measurement.py +++ b/src/qumada/measurement/scripts/generic_measurement.py @@ -26,7 +26,6 @@ import numpy as np from qcodes.dataset import dond -from qcodes.dataset.measurements import Measurement from qcodes.parameters.specialized_parameters import ElapsedTimeParameter from qumada.instrument.buffers import is_bufferable @@ -96,7 +95,7 @@ def run(self, **dond_kwargs) -> list: self.initialize(inactive_dyn_channels=inactive_channels) sleep(wait_time) data.append( - dond( + self._dond( sweep, *measured_channels, measurement_name=self._measurement_name, @@ -154,7 +153,7 @@ def run(self, **dond_kwargs): for sweep in self.dynamic_sweeps: ramp_or_set_parameters([sweep._param], [sweep.get_setpoints()[0]]) sleep(wait_time) - data = dond( + data = self._dond( *tuple(self.dynamic_sweeps), *tuple(self.gettable_channels), measurement_name=measurement_name, @@ -248,7 +247,7 @@ def run(self): timestep = self.settings.get("timestep", 1) timer = ElapsedTimeParameter("time") naming_helper(self, default_name="Timetrace") - meas = Measurement(name=self.measurement_name) + meas = self._new_measurement(name=self.measurement_name) meas.register_parameter(timer) for parameter in [*self.gettable_channels, *self.dynamic_channels]: meas.register_parameter( @@ -331,7 +330,7 @@ def run(self): self.generate_lists() naming_helper(self, default_name="Timetrace") - meas = Measurement(name=self.measurement_name) + meas = self._new_measurement(name=self.measurement_name) meas.register_parameter(timer) for parameter in [*self.gettable_channels, *self.dynamic_channels]: @@ -443,7 +442,7 @@ def run(self): timestep = self.settings.get("timestep", 1) # backsweeps = self.settings.get("backsweeps", False) timer = ElapsedTimeParameter("time") - meas = Measurement(name=self.metadata.measurement.name or "timetrace") + meas = self._new_measurement(name=self.metadata.measurement.name or "timetrace") meas.register_parameter(timer) setpoints = [timer] for parameter in self.dynamic_channels: @@ -529,7 +528,7 @@ def run(self): datasets = [] self.generate_lists() naming_helper(self, default_name="Timetrace with sweeps") - meas = Measurement(name=self.measurement_name) + meas = self._new_measurement(name=self.measurement_name) meas.register_parameter(timer) for dynamic_param in self.dynamic_parameters: @@ -558,17 +557,19 @@ def run(self): self.ready_buffers() t = time() - start try: - self.trigger_measurement(parameters = self.dynamic_channels, - setpoints = [sweep.get_setpoints() for sweep in self.dynamic_sweeps], - method="ramp", - sync_trigger=sync_trigger, - ) - + self.trigger_measurement( + parameters=self.dynamic_channels, + setpoints=[sweep.get_setpoints() for sweep in self.dynamic_sweeps], + method="ramp", + sync_trigger=sync_trigger, + ) + results = self.readout_buffers(timestamps=True) dynamic_param_results = [ - (dyn_channel, sweep.get_setpoints()) for dyn_channel, sweep in zip( - self.dynamic_channels, self.dynamic_sweeps)] - results.pop(-1) #removes timestamps from results + (dyn_channel, sweep.get_setpoints()) + for dyn_channel, sweep in zip(self.dynamic_channels, self.dynamic_sweeps) + ] + results.pop(-1) # removes timestamps from results datasaver.add_result( (timer, t), *dynamic_param_results, @@ -585,7 +586,7 @@ def run(self): raise ex except TimeoutError: logger.error(f"A timeout error occured. Skipping line at time {t}.") - #results = self.readout_buffers(timestamps=True) + # results = self.readout_buffers(timestamps=True) self.clean_up() datasets.append(datasaver.dataset) return datasets @@ -665,7 +666,7 @@ def run(self): dynamic_param = self.dynamic_sweeps[i].param inactive_channels = [chan for chan in self.dynamic_channels if chan != dynamic_param] self.initialize(inactive_dyn_channels=inactive_channels) - meas = Measurement(name=self.measurement_name) + meas = self._new_measurement(name=self.measurement_name) meas.register_parameter(dynamic_param) for c_param in self.active_compensating_channels: meas.register_parameter( @@ -736,13 +737,15 @@ def run(self): results = [] self.ready_buffers() self.trigger_measurement( - parameters = [dynamic_param, *self.active_compensating_channels], - setpoints = [dynamic_sweep.get_setpoints(), - *[sweep.get_setpoints for sweep in active_comping_sweeps]], - method = "ramp", - sync_trigger=sync_trigger - ) - + parameters=[dynamic_param, *self.active_compensating_channels], + setpoints=[ + dynamic_sweep.get_setpoints(), + *[sweep.get_setpoints for sweep in active_comping_sweeps], + ], + method="ramp", + sync_trigger=sync_trigger, + ) + results = self.readout_buffers() comp_results = [] for ch, sw in zip(self.active_compensating_channels, active_comping_sweeps): @@ -834,7 +837,7 @@ def run(self): self.measurement_name += f" {dynamic_parameter['gate']}" self.properties[dynamic_parameter["gate"]][dynamic_parameter["parameter"]]["_is_triggered"] = True dynamic_param = dynamic_sweep.param - meas = Measurement(name=self.measurement_name) + meas = self._new_measurement(name=self.measurement_name) meas.register_parameter(dynamic_param) # This next block is required to log static and idle dynamic # parameters that cannot be buffered. @@ -1027,7 +1030,7 @@ def run(self): gate_names = [gate["gate"] for gate in self.dynamic_parameters] self.measurement_name += f" {gate_names}" - meas = Measurement(name=self.measurement_name) + meas = self._new_measurement(name=self.measurement_name) if reverse_param_order: slow_param = self.dynamic_parameters[1] @@ -1283,7 +1286,7 @@ def run(self): gate_names = [gate["gate"] for gate in self.dynamic_parameters] self.measurement_name += f" {gate_names}" - meas = Measurement(name=self.measurement_name) + meas = self._new_measurement(name=self.measurement_name) meas.register_parameter(timer) for parameter in self.dynamic_parameters: self.properties[parameter["gate"]][parameter["parameter"]]["_is_triggered"] = True @@ -1486,7 +1489,7 @@ def run(self): gate_names = [gate["gate"] for gate in self.dynamic_parameters] self.measurement_name += f" {gate_names}" - meas = Measurement(name=self.measurement_name) + meas = self._new_measurement(name=self.measurement_name) meas.register_parameter(timer) for parameter in self.dynamic_parameters: self.properties[parameter["gate"]][parameter["parameter"]]["_is_triggered"] = True diff --git a/src/qumada/utils/liveplot.py b/src/qumada/utils/liveplot.py new file mode 100644 index 00000000..30b42f41 --- /dev/null +++ b/src/qumada/utils/liveplot.py @@ -0,0 +1,88 @@ +import contextlib +import functools +import warnings +from collections.abc import Sequence +from typing import Optional, Protocol, Union + +from qcodes import Measurement +from qcodes.dataset.data_set import DataSet +from qcodes.parameters import ParameterBase + + +class MeasurementAndPlot: + UPDATE_PERIOD = 0.01 + + def __init__(self, *, script: "MeasurementScript", name: str, gui=None, **kwargs): + self.qcodes_measurement = Measurement(name=name, **kwargs) + self.qcodes_measurement.write_period = self.UPDATE_PERIOD + self.gui = gui + self.script = script + self._shapes = None + + def register_parameter( + self, parameter: ParameterBase, setpoints: Optional[Sequence[Union[str, ParameterBase]]] = None, **kwargs + ): + self.qcodes_measurement.register_parameter(parameter, setpoints, **kwargs) + + def set_shapes(self, shapes): + self.qcodes_measurement.set_shapes(shapes=shapes) + self._shapes = shapes + + @contextlib.contextmanager + def run(self, **kwargs): + if self.gui is not None: + # here we could add some more arguments in the future + plot_target = self.gui + else: + plot_target = None + + with self.qcodes_measurement.run(**kwargs) as qcodes_datasaver: + yield DataSaverAndPlotter(self, qcodes_datasaver, plot_target=plot_target, shapes=self._shapes) + + +class DataSaverAndPlotter: + def __init__(self, parent: MeasurementAndPlot, qcodes_datasaver, shapes, plot_target: callable): + self._parent = parent + self.qcodes_datasaver = qcodes_datasaver + self.plot_target = plot_target + self._shapes = shapes + self._last_plot_call = None + + def _process_xarr(self, xarr): + terminal_parameters = self._parent.script.terminal_parameters + rename_dict = { + parameter.full_name: parameter.label + for parameters in terminal_parameters.values() + for parameter in parameters.values() + if parameter.full_name in xarr.variables + } + renamed = xarr.rename(rename_dict) + return renamed + + def add_result(self, *args): + self.qcodes_datasaver.add_result(*args) + if self.plot_target is not None: + # the following logic only generates a dataset and sends data to the plotter + # if the QCoDeS internal _last_save_time attribute was updated. + last_save_time = getattr(self.qcodes_datasaver, "_last_save_time", None) + if last_save_time is None: + warnings.warn( + "Current QCoDeS version is not compatible with efficient live plotting. " + "The plot is updated even if the data did not change.", + category=RuntimeWarning, + ) + update_plot = True + elif last_save_time != self._last_plot_call: + update_plot = True + self._last_plot_call = last_save_time + else: + update_plot = False + + if update_plot: + xarr = self.dataset.to_xarray_dataset() + processed = self._process_xarr(xarr) + self.plot_target(processed) + + @property + def dataset(self) -> DataSet: + return self.qcodes_datasaver.dataset diff --git a/src/qumada/utils/load_from_sqlite_db.py b/src/qumada/utils/load_from_sqlite_db.py index 74fa6235..2d7d946f 100644 --- a/src/qumada/utils/load_from_sqlite_db.py +++ b/src/qumada/utils/load_from_sqlite_db.py @@ -24,6 +24,7 @@ """ from __future__ import annotations +import re from os import path import numpy as np @@ -32,7 +33,6 @@ from qcodes.dataset.plotting import plot_dataset from qumada.utils.browsefiles import browsefiles -import re # %% @@ -216,7 +216,7 @@ def pick_measurements(sample_name: str = None, preview_dialogue=False, measureme load_db() return pick_measurements(preview_dialogue=preview_dialogue, measurement_list=measurement_list) elif re.match(pattern, chosen): - for i in range(int(match.group(1)), int(match.group(2))+1): + for i in range(int(match.group(1)), int(match.group(2)) + 1): measurement_list.append(measurements[i]) else: chosen = int(chosen) @@ -289,14 +289,17 @@ def get_parameter_data(dataset=None, parameter_name=None, **kwargs): ) return zip(params, data, units, labels) + # %% -def get_parameter_name_by_label(dataset=None, parameter_label=None, appendix = ""): + +def get_parameter_name_by_label(dataset=None, parameter_label=None, appendix=""): for param in dataset.get_parameters(): if param.label == parameter_label + appendix: return param.name return None + # %% def separate_up_down(x_data, y_data): grad = np.gradient(x_data) diff --git a/src/qumada/utils/plotting.py b/src/qumada/utils/plotting.py index fdd7e7b0..9571b1c2 100644 --- a/src/qumada/utils/plotting.py +++ b/src/qumada/utils/plotting.py @@ -171,7 +171,7 @@ def plot_2D( if args: x_data, y_data, z_data = _handle_overload(x_data, y_data, z_data, *args, output_dimension=3) if ax is None or fig is None: - fig, ax = plt.subplots(figsize = kwargs.get("figsize", (10,10))) + fig, ax = plt.subplots(figsize=kwargs.get("figsize", (10, 10))) # Skalierung der Achsendaten und Einheiten x_values, y_values, z_values = x_data[1], y_data[1], z_data[1] @@ -196,7 +196,7 @@ def plot_2D( # Plotten der 2D-Daten im = ax.pcolormesh(x, y, z, shading="auto") cbar = fig.colorbar(im, ax=ax) - + if z_label is None: cbar.set_label(f"{z_data[3]} ({z_unit})") else: @@ -341,12 +341,12 @@ def plot_multiple_datasets( scale_axis=True, legend=True, exclude_string_from_legend: list = ["1D Sweep"], - legend_entries: None|list = None, - save_to_file = None, - close = False, - x_label = None, - y_label = None, - color_map = None, + legend_entries: None | list = None, + save_to_file=None, + close=False, + x_label=None, + y_label=None, + color_map=None, **kwargs, ): """ @@ -442,15 +442,15 @@ def plot_multiple_datasets( if font is not None: matplotlib.rc("font", 30) matplotlib.rc("font", size=40) - default_colors = plt.rcParams['axes.prop_cycle'].by_key()['color'] + default_colors = plt.rcParams["axes.prop_cycle"].by_key()["color"] if ax is None or fig is None: - fig, ax = plt.subplots(figsize = kwargs.get("figsize", (10, 10))) + fig, ax = plt.subplots(figsize=kwargs.get("figsize", (10, 10))) x_labels = [] y_labels = [] for i in range(len(datasets)): if color_map is None: - color = default_colors[i % len(default_colors)] + color = default_colors[i % len(default_colors)] else: color = color_map[i] if legend_entries is None: @@ -464,12 +464,12 @@ def plot_multiple_datasets( x_name = x_axis_parameters_name[i] else: x_name = x_axis_parameters_name - + if isinstance(y_axis_parameters_name, list): y_name = y_axis_parameters_name[i] else: y_name = y_axis_parameters_name - + x, y = _handle_overload( *get_parameter_data(datasets[i], y_axis_parameters_name), x_name=x_name, @@ -497,10 +497,10 @@ def plot_multiple_datasets( if not optimize_hysteresis_legend: f_label += " backsweep" if optimize_hysteresis_legend is True: - # Only one legend entry per dataset (instead of one for each fore-/backsweep) + # Only one legend entry per dataset (instead of one for each fore-/backsweep) if j > 0: f_label = None - if j%2 == False: # ;-) Ensuring the first sweep marker is always filled + if j % 2 == False: # ;-) Ensuring the first sweep marker is always filled fill_style = "full" else: fill_style = "none" @@ -512,8 +512,8 @@ def plot_multiple_datasets( linestyle=kwargs.get("linestyle", ""), label=f_label, markersize=kwargs.get("markersize", 20), - color = color, - fillstyle = fill_style + color=color, + fillstyle=fill_style, ) else: plt.plot( @@ -523,14 +523,14 @@ def plot_multiple_datasets( linestyle=kwargs.get("linestyle", ""), label=label, markersize=kwargs.get("markersize", 20), - color = color, + color=color, ) # Scale axes and update labels if scale_axis is True: x_scaling_factor, x_units[0] = _rescale_axis(ax.xaxis, np.concatenate(x_data), x_units[0], "x") y_scaling_factor, y_units[0] = _rescale_axis(ax.yaxis, np.concatenate(y_data), y_units[0], "y") - + if x_label is None: plt.xlabel(f"{x_labels[0]} ({x_units[0]})") else: @@ -539,7 +539,7 @@ def plot_multiple_datasets( plt.ylabel(f"{y_labels[0]} ({y_units[0]})") else: plt.ylabel(f"{y_label} ({y_units[0]})") - + # Update x and y labels leg_entries = ax.legend().get_texts() if legend is True: @@ -547,9 +547,9 @@ def plot_multiple_datasets( loc=kwargs.get("legend_position", "upper left"), fontsize=kwargs.get("legend_fontsize", 35), markerscale=kwargs.get("legend_markerscale", 1), - ncol = kwargs.get("legend_ncols", int(len(leg_entries)/9)+1), - columnspacing = kwargs.get("legend_columnspacing", 0.2), - handletextpad = kwargs.get("legend_handletextpad", -0.7), + ncol=kwargs.get("legend_ncols", int(len(leg_entries) / 9) + 1), + columnspacing=kwargs.get("legend_columnspacing", 0.2), + handletextpad=kwargs.get("legend_handletextpad", -0.7), ) plt.tight_layout() if save_to_file is not None: diff --git a/src/qumada/utils/ramp_parameter.py b/src/qumada/utils/ramp_parameter.py index 3c93aa72..ae0e7d7a 100644 --- a/src/qumada/utils/ramp_parameter.py +++ b/src/qumada/utils/ramp_parameter.py @@ -23,13 +23,14 @@ import logging import time +from copy import deepcopy from math import isclose from qumada.utils.generate_sweeps import generate_sweep -from copy import deepcopy LOG = logging.getLogger(__name__) + def has_ramp_method(parameter): try: parameter.root_instrument._qumada_ramp @@ -37,13 +38,15 @@ def has_ramp_method(parameter): except AttributeError: return False + def has_pulse_method(parameter): try: parameter.root_instrument._qumada_pulse return True except AttributeError: return False - + + def has_force_trigger_method(parameter): try: parameter.root_instrument._qumada_mapping.force_trigger @@ -51,9 +54,11 @@ def has_force_trigger_method(parameter): except AttributeError: return False + class Unsweepable_parameter(Exception): pass + class Unsettable_parameter(Exception): pass @@ -119,7 +124,7 @@ def ramp_parameter( LOG.debug(f"ramp rate: {ramp_rate}") LOG.debug(f"ramp time: {ramp_time}") - if isinstance(current_value, float|int) and not isinstance(current_value, bool): + if isinstance(current_value, float | int) and not isinstance(current_value, bool): LOG.debug(f"target: {target}") if isclose(current_value, target, rel_tol=tolerance): LOG.debug("Target value is sufficiently close to current_value, no need to ramp") @@ -151,7 +156,7 @@ def ramp_parameter( LOG.debug(f"sweep: {sweep}") for value in sweep: parameter.set(value) - time.sleep(ramp_time/num_points) + time.sleep(ramp_time / num_points) return True else: raise Unsweepable_parameter("Parameter has non-float values") @@ -176,70 +181,78 @@ def ramp_or_set_parameter( parameter.set(target) except Unsettable_parameter: pass - + + def ramp_or_set_parameters( - parameters: list, - targets: list[float], - ramp_rate: float | list[float] = 0.3, - ramp_time: float | list[float] = 5, - setpoint_interval: float |list[float] = 0.1, - tolerance: float = 1e-5, - trigger_start = None, - trigger_type = "software", - trigger_reset = None, - sync_trigger = None): + parameters: list, + targets: list[float], + ramp_rate: float | list[float] = 0.3, + ramp_time: float | list[float] = 5, + setpoint_interval: float | list[float] = 0.1, + tolerance: float = 1e-5, + trigger_start=None, + trigger_type="software", + trigger_reset=None, + sync_trigger=None, +): instruments = {param.root_instrument for param in parameters} - instruments_dict = {} #Will contain instruments as keys and their params with targets as vals. - #Check requirements for parallel ramps. + instruments_dict = {} # Will contain instruments as keys and their params with targets as vals. + # Check requirements for parallel ramps. if trigger_type is not None: for instr in instruments: - if has_ramp_method(instr) and has_force_trigger_method(instr) and hasattr(instr._qumada_mapping, "max_ramp_channels"): - instruments_dict[instr] = [] #Only instruments supporting ramps are added! + if ( + has_ramp_method(instr) + and has_force_trigger_method(instr) + and hasattr(instr._qumada_mapping, "max_ramp_channels") + ): + instruments_dict[instr] = [] # Only instruments supporting ramps are added! # Loop groups params according to their instruments for later execution of ramps. for param, target in zip(parameters, targets): if param._settable is False: LOG.warning(f"{param} is not _settable and cannot be ramped!") continue - - current_value = param.get() - #TODO: Possibly further improvements with cached val or known start. + + current_value = param.get() + # TODO: Possibly further improvements with cached val or known start. # Check if parameter should be ramped or set. - if isinstance(current_value, float|int) and not isinstance(current_value, bool): + if isinstance(current_value, float | int) and not isinstance(current_value, bool): LOG.debug(f"current value: {current_value}, target: {target}") if isclose(current_value, target, rel_tol=tolerance): LOG.debug("Target value is sufficiently close to current_value, no need to ramp") continue - if param.root_instrument in instruments_dict.keys(): #Only instruments supporting ramps - instruments_dict[param.root_instrument].append((param,target)) - else: #Everything that cannot be ramped with instrument ramp is ramped/set here + if param.root_instrument in instruments_dict.keys(): # Only instruments supporting ramps + instruments_dict[param.root_instrument].append((param, target)) + else: # Everything that cannot be ramped with instrument ramp is ramped/set here ramp_or_set_parameter(param, target, ramp_rate, ramp_time, setpoint_interval) # Now go through all instruments supporting ramps and start the ramps for instr, values in instruments_dict.items(): counter = 1 param_helper = [] target_helper = [] - #Params and targets are added until the max number of simultaneously rampable - #channels is reached. Then ramp is started and new params are added. + # Params and targets are added until the max number of simultaneously rampable + # channels is reached. Then ramp is started and new params are added. for param, target in values: param_helper.append(param) target_helper.append(target) - #TODO: Triggering logic won't work if sync trigger is used to trigger another + # TODO: Triggering logic won't work if sync trigger is used to trigger another # DAC (e.g. QDac in combination with Decadac) - if counter%instr._qumada_mapping.max_ramp_channels == 0 or counter == len(values): - LOG.debug(f"Ramping {param_helper} to {target_helper}") + if counter % instr._qumada_mapping.max_ramp_channels == 0 or counter == len(values): + LOG.debug(f"Ramping {param_helper} to {target_helper}") if sync_trigger is not None: - LOG.exception("You are using a sync trigger for ramps outside measurements. \ + LOG.exception( + "You are using a sync trigger for ramps outside measurements. \ If the sync trigger is required to start an another DACs/AWGs ramp \ this will not work. (E.g. QDac and Decadac). If you only need it to \ - start data acquisitions you're fine.") + start data acquisitions you're fine." + ) instr._qumada_ramp( param_helper, - end_values = target_helper, - ramp_time = min(ramp_time, 1/ramp_rate), #TODO: Is that fine/Safe enough? - sync_trigger=None - ) + end_values=target_helper, + ramp_time=min(ramp_time, 1 / ramp_rate), # TODO: Is that fine/Safe enough? + sync_trigger=None, + ) instr._qumada_mapping.force_trigger() - #TODO: Force trigger for AWGs/DACs? + # TODO: Force trigger for AWGs/DACs? time.sleep(ramp_time) try: trigger_reset() @@ -247,11 +260,4 @@ def ramp_or_set_parameters( LOG.info("No method to reset the trigger defined.") param_helper = [] target_helper = [] - counter +=1 - - - - - - - + counter += 1 diff --git a/src/tests/conftest.py b/src/tests/conftest.py new file mode 100644 index 00000000..c3b52717 --- /dev/null +++ b/src/tests/conftest.py @@ -0,0 +1,55 @@ +import dataclasses +import pathlib +import tempfile +import threading +import time + +import pytest +from qcodes.dataset.experiment_container import load_or_create_experiment +from qcodes.station import Station + +from qumada.instrument.buffered_instruments import BufferedDummyDMM as DummyDmm +from qumada.instrument.custom_drivers.Dummies.dummy_dac import DummyDac +from qumada.instrument.mapping import ( + DUMMY_DMM_MAPPING, + add_mapping_to_instrument, +) +from qumada.instrument.mapping.Dummies.DummyDac import DummyDacMapping +from qumada.utils.load_from_sqlite_db import load_db + + +@dataclasses.dataclass +class MeasurementTestSetup: + trigger: threading.Event + + station: Station + dmm: DummyDmm + dac: DummyDac + + db_path: pathlib.Path + + +@pytest.fixture +def measurement_test_setup(tmp_path): + trigger = threading.Event() + + # Setup qcodes station + station = Station() + + # The dummy instruments have a trigger_event attribute as replacement for + # the trigger inputs of real instruments. + + dmm = DummyDmm("dmm", trigger_event=trigger) + add_mapping_to_instrument(dmm, mapping=DUMMY_DMM_MAPPING) + station.add_component(dmm) + + dac = DummyDac("dac", trigger_event=trigger) + add_mapping_to_instrument(dac, mapping=DummyDacMapping()) + station.add_component(dac) + + db_path = tmp_path / "test.db" + load_db(str(db_path)) + load_or_create_experiment("test", "dummy_sample") + + yield MeasurementTestSetup(trigger, station, dmm, dac, db_path) + station.close_all_registered_instruments() diff --git a/src/tests/device_test.py b/src/tests/device_test.py new file mode 100644 index 00000000..b9efc50a --- /dev/null +++ b/src/tests/device_test.py @@ -0,0 +1,132 @@ +import dataclasses +import itertools + +import numpy as np +import pytest + +from qumada.measurement.device_object import QumadaDevice + +from .conftest import MeasurementTestSetup + + +@dataclasses.dataclass +class DeviceTestSetup: + measurement_test_setup: MeasurementTestSetup + device: QumadaDevice + parameters: dict + namespace: dict + + +@pytest.fixture +def device_test_setup(measurement_test_setup): + """This fixture is derived from device_object_example""" + + parameters = { + "ohmic": { + "current": {"type": "gettable"}, + }, + "gate1": {"voltage": {"type": "static"}}, + "gate2": {"voltage": {"type": "static"}}, + } + namespace = {} + device = QumadaDevice.create_from_dict(parameters, station=measurement_test_setup.station, namespace=namespace) + + buffer_settings = { + "sampling_rate": 512, + "num_points": 12, + "delay": 0, + } + + mapping = { + "ohmic": { + "current": measurement_test_setup.dmm.current, + }, + "gate1": { + "voltage": measurement_test_setup.dac.ch01.voltage, + }, + "gate2": { + "voltage": measurement_test_setup.dac.ch02.voltage, + }, + } + + # This tells a measurement script how to start a buffered measurement. + # "Hardware" means that you want to use a hardware trigger. To start a measurement, + # the method provided as "trigger_start" is called. The "trigger_reset" method is called + # at the end of each buffered line, in our case resetting the trigger flag. + # For real instruments, you might have to define a method that sets the output of your instrument + # to a desired value as "trigger_start". For details on other ways to setup your triggers, + # check the documentation. + + buffer_script_settings = { + "trigger_type": "hardware", + "trigger_start": measurement_test_setup.trigger.set, + "trigger_reset": measurement_test_setup.trigger.clear, + } + + device.buffer_script_setup = buffer_script_settings + device.buffer_settings = buffer_settings + + # device.map_terminals() + # - map_terminals_gui(self.station.components, self.instrument_parameters, instrument_parameters) + device.terminal_parameters = mapping + # - self.update_terminal_parameters() + device.update_terminal_parameters() + + # device.map_triggers() + measurement_test_setup.dac._qumada_mapping.trigger_in = None + (measurement_test_setup.dmm._qumada_buffer.trigger,) = measurement_test_setup.dmm._qumada_buffer.AVAILABLE_TRIGGERS + + return DeviceTestSetup( + measurement_test_setup, + device, + parameters, + namespace, + ) + + +@pytest.mark.parametrize( + "buffered,backsweep", + itertools.product( + # buffered + [True, False], + # backsweep + [False, True], + ), +) +def test_measured_ramp(device_test_setup, buffered, backsweep): + gate1 = device_test_setup.namespace["gate1"] + + plot_args = [] + + def plot_backend(*args, **kwargs): + plot_args.append((args, kwargs)) + + from qumada.measurement.measurement import MeasurementScript + + MeasurementScript.DEFAULT_LIVE_PLOTTER = plot_backend + + (qcodes_data,) = gate1.voltage.measured_ramp(0.4, start=-0.3, buffered=buffered, backsweep=backsweep) + if backsweep: + assert gate1.voltage() == pytest.approx(-0.3, abs=0.001) + else: + assert gate1.voltage() == pytest.approx(0.4, abs=0.001) + + if not buffered: + # TODO: Why is this necessary??? + (qcodes_data, _, _) = qcodes_data + xarr = qcodes_data.to_xarray_dataset() + + set_points = xarr.dac_ch01_voltage.values + + if backsweep: + fwd = np.linspace(-0.3, 0.4, len(set_points) // 2) + expected = np.concatenate((fwd, fwd[::-1])) + else: + expected = np.linspace(-0.3, 0.4, len(set_points)) + + if buffered: + assert len(plot_args) == 1 + int(backsweep) + else: + assert len(plot_args) == len(set_points) + + np.testing.assert_almost_equal(expected, set_points) diff --git a/src/tests/mapping_test.py b/src/tests/mapping_test.py index 2db714d5..14274ff8 100644 --- a/src/tests/mapping_test.py +++ b/src/tests/mapping_test.py @@ -52,34 +52,38 @@ from qumada.measurement.scripts.generic_measurement import Generic_1D_Sweep -@pytest.fixture(name="dmm", scope="session") +@pytest.fixture(name="dmm") def fixture_dmm(): dmm = DummyDmm("dmm") add_mapping_to_instrument(dmm, mapping=mapping.DUMMY_DMM_MAPPING) - return dmm + yield dmm + dmm.close() -@pytest.fixture(name="dac", scope="session") +@pytest.fixture(name="dac") def fixture_dac(): dac = DummyDac("dac") add_mapping_to_instrument(dac, mapping=DummyDacMapping()) - return dac + yield dac + dac.close() -@pytest.fixture(name="dci", scope="session") +@pytest.fixture(name="dci") def fixture_dci(): dci = DummyChannelInstrument("dci") add_mapping_to_instrument(dci, mapping=mapping.DUMMY_CHANNEL_MAPPING) - return dci + yield dci + dci.close() -@pytest.fixture(name="station_with_instruments", scope="session") +@pytest.fixture(name="station_with_instruments") def fixture_station_with_instruments(dmm, dac, dci): station = Station() station.add_component(dmm) station.add_component(dac) station.add_component(dci) - return station + yield station + station.close_all_registered_instruments() @pytest.fixture(name="script") diff --git a/src/tests/measurement_test.py b/src/tests/measurement_test.py new file mode 100644 index 00000000..2b106423 --- /dev/null +++ b/src/tests/measurement_test.py @@ -0,0 +1,102 @@ +import dataclasses +import tempfile +import threading + +import numpy as np +import pytest +import yaml +from qcodes.dataset import ( + Measurement, + experiments, + initialise_or_create_database_at, + load_by_run_spec, + load_or_create_experiment, +) +from qcodes.station import Station + +from qumada.instrument.buffered_instruments import BufferedDummyDMM as DummyDmm +from qumada.instrument.buffers.buffer import ( + load_trigger_mapping, + map_triggers, + save_trigger_mapping, +) +from qumada.instrument.custom_drivers.Dummies.dummy_dac import DummyDac +from qumada.instrument.mapping import ( + DUMMY_DMM_MAPPING, + add_mapping_to_instrument, + map_terminals_gui, +) +from qumada.instrument.mapping.Dummies.DummyDac import DummyDacMapping +from qumada.measurement.scripts import ( + Generic_1D_parallel_asymm_Sweep, + Generic_1D_parallel_Sweep, + Generic_1D_Sweep, + Generic_1D_Sweep_buffered, + Generic_2D_Sweep_buffered, + Generic_nD_Sweep, + Timetrace, +) +from qumada.utils.generate_sweeps import generate_sweep, replace_parameter_settings +from qumada.utils.GUI import open_web_gui +from qumada.utils.load_from_sqlite_db import load_db +from qumada.utils.ramp_parameter import * + + +@pytest.fixture +def buffer_settings(): + return { + "sampling_rate": 512, + "duration": 12 / 512, + "burst_duration": 12 / 512, + "delay": 0, + } + + +@pytest.fixture +def parameters(): + return { + "ohmic": { + "voltage": {"type": "gettable"}, + "current": {"type": "gettable"}, + }, + "gate1": {"voltage": {"type": "dynamic", "setpoints": np.linspace(0, np.pi, 12), "value": 0}}, + "gate2": {"voltage": {"type": "dynamic", "setpoints": np.linspace(0, np.pi, 12), "value": 0}}, + } + + +def test_1d_buffered(measurement_test_setup, buffer_settings, parameters): + script = Generic_1D_Sweep_buffered() + script.setup( + parameters, + metadata=None, + buffer_settings=buffer_settings, + trigger_type="hardware", + trigger_start=measurement_test_setup.trigger.set, + trigger_reset=measurement_test_setup.trigger.clear, + ) + + mapping = { + "ohmic": { + "voltage": measurement_test_setup.dmm.voltage, + "current": measurement_test_setup.dmm.current, + }, + "gate1": { + "voltage": measurement_test_setup.dac.ch01.voltage, + }, + "gate2": { + "voltage": measurement_test_setup.dac.ch02.voltage, + }, + } + script.gate_parameters = mapping + ds1, ds2 = script.run() + ds1 = ds1.to_xarray_dataset() + ds2 = ds2.to_xarray_dataset() + + np.testing.assert_almost_equal( + parameters["gate1"]["voltage"]["setpoints"], + ds1.dac_ch01_voltage.values, + ) + np.testing.assert_almost_equal( + parameters["gate2"]["voltage"]["setpoints"], + ds2.dac_ch02_voltage.values, + )