diff --git a/docs/device_object.rst b/docs/device_object.rst index 9b1116c4..3bbb8b8b 100644 --- a/docs/device_object.rst +++ b/docs/device_object.rst @@ -194,7 +194,7 @@ This stores all parameter values (of parameters that can be set). With you can reset it to the stored configuration. Alternatively you can use "device.save_state(name)" and "device.set_state(name)" to store and set multiple working points with -custom names. They can also be accessed via "device.states" in case you forgot the name. To store a state in json-file, use device.save_to_file(name, path), you can load it +custom names. They can also be accessed via "device.states" in case you forgot the name. To store a state in json-file, use device.save_to_file(name, path), you can load it via "device.load_state_from_file(name, path)" to load it (use "device.set_state(name)" afterwards to set it) or with "device.set_state_from_file(name, path)" to directly apply the configuration to the device. For all of those methods the parameters are ramped to the final state by default (with the default QuMada ramp rate), again depending on the "device.ramp" setting. diff --git a/pyproject.toml b/pyproject.toml index 780b1ae4..604483f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,12 +24,14 @@ dependencies = [ "jsonschema", "zhinst-toolkit >= 0.3.3", "pyqt5", + "shapely", "versioningit ~= 2.2.0", ] [project.optional-dependencies] gui = ["plottr"] spyder = ["spyder"] +monitor = ["dash", "ezdxf", "dash-extensions"] [project.urls] Repository = "https://github.com/qutech/qumada" diff --git a/src/qumada-monitor/README.md b/src/qumada-monitor/README.md new file mode 100644 index 00000000..88486418 --- /dev/null +++ b/src/qumada-monitor/README.md @@ -0,0 +1,30 @@ +# QuMADA Device Monitor + +This is a dash app that is intended to monitor a qumada device. To use it you need to install the `monitor` dependencies for example via +`pip install qumada[monitor]` or with a local editable install `pip install -e .[monitor]`. + +The feature that separates it from the qcodes monitor is the ability to display a proper layout. + +```python +from qumada.utils.device_server import start_monitor_socket + +device: 'QumadaDevice' = ... +my_dxf_file = ... + +start_monitor_socket(device, my_dxf_file) +``` + +This does the following steps under the hood: + +1. Create a list of `qumada.utils.geometry.Gate` objects with the correct labels + - Load a dxf file with `doc = ezdxf.readfile('my_layout.dxf')` + - Crop it to the desired region with `raw_gates = qumada.utils.dxf.get_gates_from_cropped_region` + - Merge gates from the same layer that overlap (necessary for some reason) with `merged_gates = qumada.utils.dxf.auto_merge` + - Give the gates the proper labels with a helper GUI: `gates = qumada.utils.dxf.label_gates(merged_gates)` + - Save the gate list to json with `qumada.utils.geometry.store_to_file(gates, 'my_gates.json')` +2. Start the device monitor server and set its gate_geometry + +To view the monitor you need to start the webserver by running `python -m qumada-monitor` in a shell + +The webserver will try to match parameters to gates based on the gate labels being present in the parameter labels. +Un-matched gates are nan. diff --git a/src/qumada-monitor/__init__.py b/src/qumada-monitor/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/qumada-monitor/__main__.py b/src/qumada-monitor/__main__.py new file mode 100644 index 00000000..123a86e4 --- /dev/null +++ b/src/qumada-monitor/__main__.py @@ -0,0 +1,13 @@ +import argparse + +from .app import make_app + +p = argparse.ArgumentParser(description="Dash front-end for your measurement WebSocket") +p.add_argument("--ws", default="ws://127.200.200.9:6789", help="WebSocket URL, e.g. ws://localhost:8765") +p.add_argument("--host", default="127.0.0.1", help="Dash host to bind") +p.add_argument("--port", default=8050, type=int, help="Dash port (default: 8050)") +p.add_argument("--debug", default=False, action="store_true", help="Enable debug mode") +args = p.parse_args() + +app = make_app(args.ws) +app.run(host=args.host, port=args.port, debug=args.debug) diff --git a/src/qumada-monitor/app.py b/src/qumada-monitor/app.py new file mode 100644 index 00000000..3bc52dde --- /dev/null +++ b/src/qumada-monitor/app.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 +""" +Live quantum‑device monitor: geometry + voltages ➜ interactive SVG plot. + +Start with: + python dash_device_monitor.py --ws ws://localhost:8765 --colorscale Cividis +""" +import argparse +import itertools +import json +import os +import warnings +from collections import defaultdict + +import dash.exceptions +import plotly.express as px +import plotly.graph_objects as go +from dash import Dash, Input, Output, State, dash_table, dcc, html, no_update +from dash_extensions import WebSocket # pip install dash-extensions +from plotly.colors import sample_colorscale +from shapely.geometry import Polygon + +# ---------------- helpers -------------------------------------------------- # +from qumada.utils.geometry import Gate, string_to_gate_list + + +def layer_palette(layers): + """Return dict layer -> rgba border colour (qualitative palette, repeats if needed).""" + palette = itertools.cycle(px.colors.qualitative.Plotly) + return {layer: next(palette) for layer in layers} + + +def voltage_colour(v, vabs_max, colorscale): + """Map voltage to rgba string using the chosen Plotly colourscale.""" # flat value + ratio = 0.5 + v / vabs_max / 2 + return sample_colorscale(colorscale, [ratio])[0] # :contentReference[oaicite:9]{index=9} + + +def _set_alpha(color: str, value) -> str: + if color.startswith("rgb("): + result = color.replace("rgb(", "rgba(") + result = result.replace(")", f",{value})") + elif color.startswith("rgba("): + result = color.rsplit(",", 1)[0] + result = f"{result},{value})" + else: + raise NotImplementedError(color) + return result + + +def gates_to_figure(gates: list[Gate], voltages: list[dict], colorscale): + """Build a Plotly figure from gate list + latest voltages dict.""" + if not gates: + return go.Figure() + + gate_runtime_info = [] + for gate in gates: + candidates = [] + for data in voltages: + if gate.label in data["label"]: + candidates.append(data) + if not candidates: + gate_runtime_info.append(None) + else: + if len(candidates) > 1: + data = min(candidates, key=lambda d: d["label"]) + else: + (data,) = candidates + gate_runtime_info.append(data) + + # 1) derive colour mapping + v_values = [info["value"] for info in gate_runtime_info if info is not None] + vabs_max = max(map(abs, v_values + [1.0])) + layers = sorted({g.layer for g in gates}) + border_col = layer_palette(layers) # :contentReference[oaicite:10]{index=10} + + fig = go.Figure() + for gate, info in zip(gates, gate_runtime_info): + poly: Polygon = gate.polygon + x, y = poly.exterior.xy + x = list(x) + y = list(y) + if info: + v = info["value"] + unit = info["unit"] + text = f"{gate.label}:
{v:.3f} {unit}" + fill_col = voltage_colour(v, vabs_max, colorscale) + else: + text = f"{gate.label}:
NaN" + fill_col = None + line_col = border_col[gate.layer] + + if fill_col is None: + fill_col = "rgba(0,0,0,0)" + else: + fill_col = _set_alpha(fill_col, 0.3) + + fig.add_trace( + go.Scatter( + x=x, + y=y, + fill="toself", # polygon fill trick :contentReference[oaicite:11]{index=11} + fillcolor=fill_col, + line=dict(color=line_col, width=1), + hoverinfo="text", + text=text, + showlegend=False, + mode="lines", + ) + ) + # add static label at predefined position + if getattr(gate, "label_position", None): + lx, ly = gate.label_position + fig.add_annotation(x=lx, y=ly, text=text, showarrow=False, font=dict(size=10, color="black")) + + fig.update_layout( + xaxis=dict(scaleanchor="y", visible=False), + yaxis=dict(visible=False), + margin=dict(l=0, r=0, t=0, b=0), + plot_bgcolor="white", + ) + return fig + + +# ---------------- Dash app -------------------------------------------------- # +def make_app(ws_url: str) -> Dash: + app = Dash(__name__, title="Quantum‑dot voltage monitor") + + app.layout = html.Div( + [ + html.H3("Live device layout"), + WebSocket(id="ws", url=ws_url), + dcc.Store(id="gate-geometry"), + dcc.Store(id="voltages"), + dcc.Store(id="parameters"), + dcc.Graph(id="layout-graph", style={"height": "700px"}), + dcc.Dropdown( + id="colorscale-dropdown", + value="rdbu", + options=[{"label": cs, "value": cs} for cs in px.colors.named_colorscales()], + clearable=False, + style={"width": "250px"}, + ), + html.Hr(), + dash_table.DataTable( + id="parameter-table", + columns=[ + {"name": "Label", "id": "label"}, + {"name": "Value", "id": "value"}, + {"name": "Unit", "id": "unit"}, + {"name": "Timestamp", "id": "timestamp"}, + {"name": "Name", "id": "name"}, + ], + style_cell={"fontFamily": "monospace", "padding": "2px 6px"}, + style_table={"max-height": "400px", "overflowY": "auto"}, + ), + ], + style={"font-family": "Source Sans Pro, sans-serif", "margin": "0 20px"}, + ) + + # ---------- 1) unpack every WebSocket message -------------------------- # + @app.callback( + Output("gate-geometry", "data"), + Output("parameters", "data"), + Input("ws", "message"), + prevent_initial_call=True, + ) + def _unpack_ws(msg): + if msg is None: + return no_update, no_update + + data = json.loads(msg["data"]) + + parameters = gate_geometry = no_update + if "parameters" in data: + parameters = data["parameters"] + if "gate_geometry" in data: + gate_geometry = data["gate_geometry"] + return gate_geometry, parameters + + @app.callback( + Output("voltages", "data"), + Input("parameters", "data"), + prevent_initial_call=True, + ) + def _select_voltages(parameters): + voltages = [] + for parameter in parameters: + assert "unit" in parameter, f"{parameter!r}" + if "V" in parameter["unit"]: + value = {attr: parameter[attr] for attr in ["name", "value", "label", "unit"]} + voltages.append(value) + return voltages + + @app.callback( + Output("parameter-table", "data"), + Input("parameters", "data"), + ) + def _parameter_table(parameters): + if not parameters: + return [] + + cols = ["label", "value", "unit", "timestamp", "name"] + return [{col: parameter[col] for col in cols} for parameter in parameters] + + @app.callback( + Output("layout-graph", "figure"), + Input("gate-geometry", "data"), + Input("voltages", "data"), + Input("colorscale-dropdown", "value"), + ) + def _draw_layout(geom_str, volts, colorscale_selected): + if geom_str is None or volts is None: + fig = go.Figure() + else: + gates = string_to_gate_list(geom_str) + fig = gates_to_figure(gates, volts, colorscale_selected) + + fig.update_layout( + xaxis=dict(scaleanchor="y", visible=False), + yaxis=dict(visible=False), + margin=dict(l=0, r=0, t=0, b=0), + plot_bgcolor="white", + uirevision=hash(geom_str), + ) + return fig + + return app diff --git a/src/qumada/instrument/custom_drivers/ZI/MFLI.py b/src/qumada/instrument/custom_drivers/ZI/MFLI.py index 3294c0e1..ca37f189 100644 --- a/src/qumada/instrument/custom_drivers/ZI/MFLI.py +++ b/src/qumada/instrument/custom_drivers/ZI/MFLI.py @@ -41,11 +41,13 @@ class MFLI(Instrument): """ def __init__( - self, name: str, device: str, + self, + name: str, + device: str, serverhost: str = "localhost", - existing_session: Session = None, - allow_version_mismatch = False, - **kwargs + existing_session: Session = None, + allow_version_mismatch=False, + **kwargs, ): super().__init__(name, **kwargs) if isinstance(existing_session, Session): diff --git a/src/qumada/instrument/mapping/Dummies/DummyDac.py b/src/qumada/instrument/mapping/Dummies/DummyDac.py index 3c8c97fe..ffe04a1c 100644 --- a/src/qumada/instrument/mapping/Dummies/DummyDac.py +++ b/src/qumada/instrument/mapping/Dummies/DummyDac.py @@ -60,8 +60,8 @@ def ramp( if not start_values: start_values = [param.get() for param in parameters] - - print("Ramping...") + + print("Ramping...") instrument._triggered_ramp_channels( [param._instrument for param in parameters], start_values, end_values, ramp_time, num_points ) @@ -96,7 +96,7 @@ def pulse( def setup_trigger_in(): pass - + def force_trigger(self): self._instrument.force_trigger() - self._instrument._is_triggered.clear() \ No newline at end of file + self._instrument._is_triggered.clear() diff --git a/src/qumada/instrument/mapping/Harvard/Decadac.py b/src/qumada/instrument/mapping/Harvard/Decadac.py index 28430dca..ed3dc20f 100644 --- a/src/qumada/instrument/mapping/Harvard/Decadac.py +++ b/src/qumada/instrument/mapping/Harvard/Decadac.py @@ -19,12 +19,14 @@ # - Till Huckeman +import logging + from qcodes.instrument.parameter import Parameter from qumada.instrument.custom_drivers.Harvard.Decadac import Decadac from qumada.instrument.mapping import DECADAC_MAPPING from qumada.instrument.mapping.base import InstrumentMapping -import logging + logger = logging.getLogger(__name__) @@ -159,6 +161,6 @@ def trigger_in(self, trigger: str | None) -> None: def force_trigger(self): string = "" for b in range(0, 6): - for c in range(0,6): - string+=f"B{b};C{c};G0;" - self._instrument.write(string) \ No newline at end of file + for c in range(0, 6): + string += f"B{b};C{c};G0;" + self._instrument.write(string) diff --git a/src/qumada/instrument/mapping/QDevil/qdac.py b/src/qumada/instrument/mapping/QDevil/qdac.py index 7c65d786..a96da913 100644 --- a/src/qumada/instrument/mapping/QDevil/qdac.py +++ b/src/qumada/instrument/mapping/QDevil/qdac.py @@ -30,6 +30,7 @@ class QDacMapping(InstrumentMapping): def __init__(self): super().__init__(QDAC_MAPPING) self.max_ramp_channels = 8 + def ramp( self, parameters: list[Parameter], @@ -72,7 +73,7 @@ def ramp( def setup_trigger_in(self): raise Exception("QDac does not have a trigger input!") - + def force_trigger(self): pass # Not required as QDac has no trigger input and starts ramps instantly. diff --git a/src/qumada/instrument/mapping/QDevil/qdac2.py b/src/qumada/instrument/mapping/QDevil/qdac2.py index ace4d7e2..65f30136 100644 --- a/src/qumada/instrument/mapping/QDevil/qdac2.py +++ b/src/qumada/instrument/mapping/QDevil/qdac2.py @@ -182,7 +182,7 @@ def setup_trigger_in(self): "QDac2 does not have a trigger input \ not yet supported!" ) - + def force_trigger(self): pass # Currently no trigger inputs are supported, thus all ramps are started diff --git a/src/qumada/instrument/mapping/base.py b/src/qumada/instrument/mapping/base.py index 99dd7fe9..7d41a2cd 100644 --- a/src/qumada/instrument/mapping/base.py +++ b/src/qumada/instrument/mapping/base.py @@ -76,8 +76,10 @@ def pulse( **kwargs, ) -> None: """Defining qumada pulse. Requires proper implementation for each instrument""" - raise Exception("Pulse not properly implemented for this instrument!\ - You cannot use pulsed measurements with this instrument.") + raise Exception( + "Pulse not properly implemented for this instrument!\ + You cannot use pulsed measurements with this instrument." + ) @abstractmethod def setup_trigger_in(self, trigger_settings: dict) -> None: diff --git a/src/qumada/measurement/device_object.py b/src/qumada/measurement/device_object.py index 4c5a410b..7e785232 100644 --- a/src/qumada/measurement/device_object.py +++ b/src/qumada/measurement/device_object.py @@ -1,12 +1,11 @@ from __future__ import annotations +import json import logging from abc import ABC from copy import deepcopy -from typing import Any from time import sleep -import json - +from typing import Any import numpy as np from qcodes import Station @@ -15,7 +14,10 @@ from qumada.instrument.buffers.buffer import map_triggers, save_trigger_mapping from qumada.instrument.mapping import map_terminals_gui -from qumada.instrument.mapping.base import load_mapped_terminal_parameters, save_mapped_terminal_parameters +from qumada.instrument.mapping.base import ( + load_mapped_terminal_parameters, + save_mapped_terminal_parameters, +) from qumada.measurement.measurement import MeasurementScript, load_param_whitelist from qumada.measurement.scripts import ( Generic_1D_Hysteresis_buffered, @@ -120,40 +122,40 @@ def set_state(self, name: str, ramp=None, **kwargs): Returns ------- None - """ + """ self.update_terminal_parameters() if ramp is None: ramp = self.ramp self.load_from_dict(self.states[name]) self.set_stored_values(ramp=ramp, **kwargs) - + def save_state_to_file(self, name: str, path: str): """ - Saves the specified state to a json file. - - Parameters - ---------- - name : str - The name of the state to save. - path : str - The file path where the state will be saved. Has to be a json file! - - Returns - ------- - None - - Notes - ----- - Before saving, any setpoints in terminal parameters are cleared by setting them to `None` - to avoid problems with json.dump. - """ + Saves the specified state to a json file. + + Parameters + ---------- + name : str + The name of the state to save. + path : str + The file path where the state will be saved. Has to be a json file! + + Returns + ------- + None + + Notes + ----- + Before saving, any setpoints in terminal parameters are cleared by setting them to `None` + to avoid problems with json.dump. + """ for t in self.terminals: for param in self.terminals[t].terminal_parameters: self.terminals[t].terminal_parameters[param].properties["setpoints"] = None state = self.states[name] with open(file=path, mode="w") as f: - json.dump(state, f) - + json.dump(state, f) + def load_state_from_file(self, name: str, path: str): """ Loads a state from a json file and stores it in the device object. @@ -169,10 +171,10 @@ def load_state_from_file(self, name: str, path: str): ------- None """ - with open(file=path, mode="r") as f: + with open(file=path) as f: state = json.load(f) self.states[name] = state - + def set_state_from_file(self, name: str, path: str): """ Sets the devices state by loading it from a json file. @@ -319,17 +321,17 @@ def save_to_dict(self, priorize_stored_value=False): logger.warning(f"Couldn't find value for {terminal_name} {param_name}") return return_dict - - def map_terminals(self, - terminal_parameters: None | dict = None, - path: None | str = None, - skip_gui_if_mapped: bool = True, - ): + def map_terminals( + self, + terminal_parameters: None | dict = None, + path: None | str = None, + skip_gui_if_mapped: bool = True, + ): """ Maps devices terminal parameters using map_terminals_gui. You can pass an existing mapping as terminal_parameters. If a path is provided it first tries to use the provided mapping file. - + Parameters ---------- terminal_parameters : None | dict, optional @@ -355,33 +357,35 @@ def map_terminals(self, if not isinstance(self.station, Station): raise TypeError("No valid qcodes station found. Make sure you have set the station attribute correctly!") if path is not None: - load_mapped_terminal_parameters(terminal_parameters, self.station, path) - map_terminals_gui(self.station.components, - self.terminal_parameters, - terminal_parameters, - skip_gui_if_mapped = skip_gui_if_mapped) + load_mapped_terminal_parameters(terminal_parameters, self.station, path) + map_terminals_gui( + self.station.components, + self.terminal_parameters, + terminal_parameters, + skip_gui_if_mapped=skip_gui_if_mapped, + ) self.update_terminal_parameters() - - def mapping(self, - terminal_parameters: None | dict = None, - path: None | str = None, - skip_gui_if_mapped = True, - ): - #TODO: Remove! - logger.warning("Deprecation Warning: device.mapping was renamed to \ + def mapping( + self, + terminal_parameters: None | dict = None, + path: None | str = None, + skip_gui_if_mapped=True, + ): + # TODO: Remove! + logger.warning( + "Deprecation Warning: device.mapping was renamed to \ device.map_terminals. Device.mapping will be removed \ - in a future release!") + in a future release!" + ) self.map_terminals(terminal_parameters, path, skip_gui_if_mapped) - - - def save_terminal_mapping(self, - path: str): + + def save_terminal_mapping(self, path: str): """ Save terminal mapping to specified file (json). """ save_mapped_terminal_parameters(self.terminal_parameters, path) - + def map_triggers( self, components: None | dict = None, @@ -394,7 +398,7 @@ def map_triggers( Uses components of assigned station by default. Ignores already mapped triggers by default. You can provide a path in order to load and existing mapping. - + Parameters ---------- components : None|dict, optional @@ -410,14 +414,9 @@ def map_triggers( """ if components is None: components = self.station.components - map_triggers(components=components, - skip_mapped=skip_mapped, - path=path, - kwargs=kwargs) - - def save_trigger_mapping( - self, - path: str): + map_triggers(components=components, skip_mapped=skip_mapped, path=path, kwargs=kwargs) + + def save_trigger_mapping(self, path: str): """ Save trigger mapping to json file. @@ -518,7 +517,7 @@ def timetrace( map_triggers(station.components) data = script.run() return data - + def sweep_1d( self, params: Parameter | list[Parameter], @@ -526,12 +525,12 @@ def sweep_1d( num_points: int = 100, dynamic_values: None | list[float] = None, backsweep: bool = False, - name = None, - metadata = None, - station = None, - buffered = False, + name=None, + metadata=None, + station=None, + buffered=False, buffer_settings: dict | None = None, - priorize_stored_value = False, + priorize_stored_value=False, **kwargs, ): """ @@ -552,8 +551,8 @@ def sweep_1d( Number of points measured in each sweep. Doubled if backsweep is True. dynamic_values : None | list[float], optional List of values for the dynamic parameters (only if a list of params is provided). - Ignored if only one parameter is passed. - Parameters are kept at this value during the ramps of the other parameters. + Ignored if only one parameter is passed. + Parameters are kept at this value during the ramps of the other parameters. Current values of the parameters are used if it is None. Default is None. backsweep : bool, optional @@ -588,7 +587,7 @@ def sweep_1d( Notes ----- - Again: This measurement can only do linear ramps! - - Does one measurement for each param provided. + - Does one measurement for each param provided. - Ignores other dynamic parameters that are not in params - Records only gettable parameters. """ @@ -603,49 +602,55 @@ def sweep_1d( if dynamic_values is None: dynamic_values = [param() for param in params] assert len(params) == len(dynamic_values) - else: + else: assert len(params) == len(dynamic_values) for param, val in zip(params, dynamic_values): param(val) for param, setpoint, val in zip(params, sweep_range, dynamic_values): - data.append(*param.measured_ramp( - value = setpoint[-1], - num_points=len(setpoint), - start=setpoint[0], - station=station, - name=name, - metadata=metadata, - backsweep=backsweep, - buffered=buffered, - buffer_settings=buffer_settings, - priorize_stored_value=priorize_stored_value, - )) + data.append( + *param.measured_ramp( + value=setpoint[-1], + num_points=len(setpoint), + start=setpoint[0], + station=station, + name=name, + metadata=metadata, + backsweep=backsweep, + buffered=buffered, + buffer_settings=buffer_settings, + priorize_stored_value=priorize_stored_value, + ) + ) param(val) - + elif isinstance(params, Terminal_Parameter): assert dynamic_values != list if dynamic_values is not None: val = dynamic_values else: val = params() - data.append(*params.measured_ramp( - value=sweep_range[-1], - num_points=num_points, - start=sweep_range[0], - station=station, - name=name, - metadata=metadata, - backsweep=backsweep, - buffered=buffered, - buffer_settings=buffer_settings, - priorize_stored_value=priorize_stored_value, - )) + data.append( + *params.measured_ramp( + value=sweep_range[-1], + num_points=num_points, + start=sweep_range[0], + station=station, + name=name, + metadata=metadata, + backsweep=backsweep, + buffered=buffered, + buffer_settings=buffer_settings, + priorize_stored_value=priorize_stored_value, + ) + ) params(val) return data - + def sweep_2D(): - logger.exception("Deprecation Warning: sweep_2D was renamed to sweep_2d \ - for better naming consistency!") + logger.exception( + "Deprecation Warning: sweep_2D was renamed to sweep_2d \ + for better naming consistency!" + ) def sweep_2d( self, @@ -1256,11 +1261,10 @@ def value(self, value): self._value = value self.instrument_parameter(value) - @value.getter def value(self): return self.instrument_parameter() - + @property def instrument_parameter(self): return self._instrument_parameter diff --git a/src/qumada/measurement/measurement.py b/src/qumada/measurement/measurement.py index 4be66abd..77a966b5 100644 --- a/src/qumada/measurement/measurement.py +++ b/src/qumada/measurement/measurement.py @@ -31,8 +31,8 @@ from contextlib import suppress from datetime import datetime from functools import wraps -from typing import Any, Callable from time import sleep +from typing import Any, Callable import numpy as np import qcodes as qc @@ -44,7 +44,7 @@ from qumada.instrument.buffers import is_bufferable, is_triggerable from qumada.metadata import Metadata from qumada.utils.ramp_parameter import ramp_or_set_parameter, ramp_or_set_parameters -from qumada.utils.utils import flatten_array, _validate_mapping +from qumada.utils.utils import _validate_mapping, flatten_array logger = logging.getLogger(__name__) @@ -449,7 +449,7 @@ def generate_lists(self) -> None: } self.trigger_ins = { param.root_instrument._qumada_mapping for param in self.dynamic_channels if is_triggerable(param) - } #Independent of self.buffered to allow parallel ramping for unbuffered measurement initialization. + } # Independent of self.buffered to allow parallel ramping for unbuffered measurement initialization. self.sort_by_priority() self._lists_created = True self._relabel_instruments() @@ -495,7 +495,7 @@ def initialize(self, dyn_ramp_to_val=False, inactive_dyn_channels: list | None = setpoint_intervall = self.settings.get("setpoint_intervall", 0.1) trigger_start = self.settings.get("trigger_start", "software") # TODO: this should be set elsewhere trigger_reset = self.settings.get("trigger_reset", None) - trigger_type = self.settings.get("trigger_type", None), + trigger_type = (self.settings.get("trigger_type", None),) if not self._lists_created: self.generate_lists() # for item in self.compensated_parameters: @@ -663,10 +663,16 @@ def initialize(self, dyn_ramp_to_val=False, inactive_dyn_channels: list | None = else: raise Exception(f"{gettable_param} is not bufferable.") self.ready_triggers() - ramp_or_set_parameters(ramp_params, ramp_targets, ramp_rate, - ramp_time, setpoint_intervall, trigger_start=trigger_start, - trigger_type=trigger_type, trigger_reset=trigger_reset) - + ramp_or_set_parameters( + ramp_params, + ramp_targets, + ramp_rate, + ramp_time, + setpoint_intervall, + trigger_start=trigger_start, + trigger_type=trigger_type, + trigger_reset=trigger_reset, + ) @abstractmethod def run(self) -> list: @@ -676,7 +682,6 @@ def run(self) -> list: """ return [] - def clean_up(self, additional_actions: list[Callable] | None = None, **kwargs) -> None: """ Things to do after the measurement is complete. Cleans up subscribed paramteres for @@ -704,7 +709,7 @@ def ready_buffers(self, **kwargs) -> None: buffer.setup_buffer(settings=self.buffer_settings) buffer.start() self.ready_triggers() - + def ready_triggers(self, **kwargs): """ Prepare trigger inputs for not buffered instruments. @@ -785,8 +790,8 @@ def _insert_metadata_into_db(self, *args, insert_metadata_into_db: bool = True, metadata.save() except Exception as ex: print(f"Metadata could not inserted into database: {ex}") - - def trigger_measurement(self, parameters, setpoints, method = "ramp" ,sync_trigger=None): + + def trigger_measurement(self, parameters, setpoints, method="ramp", sync_trigger=None): TRIGGER_TYPES = ["software", "hardware", "manual"] trigger_start = self.settings.get("trigger_start", "manual") # TODO: this should be set elsewhere @@ -797,28 +802,28 @@ def trigger_measurement(self, parameters, setpoints, method = "ramp" ,sync_trigg default="software", default_key_error="software", ) - setpoints_mapping = {param : setpoint for param, setpoint in zip(parameters, setpoints)} + setpoints_mapping = {param: setpoint for param, setpoint in zip(parameters, setpoints)} if sync_trigger is None: sync_trigger = () buffer_timeout_multiplier = self.settings.get("buffer_timeout_multiplier", 20) # Some logic to sort instruments. Instruments with sync-triggers have to be added last, # as executing _qumada_pulse/_ramp with them instantly runs the pulse/ramp, before other instruments - # that wait for a trigger signal are added and prepared. + # that wait for a trigger signal are added and prepared. instruments_set = {param.root_instrument for param in parameters} instruments = [instrument for instrument in instruments_set if instrument not in sync_trigger] for instrument in instruments_set: if instrument in sync_trigger: instruments.append(instrument) - + for instr in instruments: instr_params = [param for param in parameters if param.root_instrument is instr] if method == "ramp": try: instr._qumada_ramp( - parameters = instr_params, - end_values = [setpoints_mapping[param][-1] for param in instr_params], - ramp_time = self._burst_duration, - sync_trigger = sync_trigger, + parameters=instr_params, + end_values=[setpoints_mapping[param][-1] for param in instr_params], + ramp_time=self._burst_duration, + sync_trigger=sync_trigger, ) except AttributeError as ex: logger.error( @@ -828,14 +833,14 @@ def trigger_measurement(self, parameters, setpoints, method = "ramp" ,sync_trigg Use the unbuffered script!" ) raise ex - + elif method == "pulse": try: instr._qumada_pulse( - parameters = instr_params, - setpoints = [setpoints_mapping[param] for param in instr_params], - delay = self._burst_duration / self.buffered_num_points, - sync_trigger = sync_trigger, + parameters=instr_params, + setpoints=[setpoints_mapping[param] for param in instr_params], + delay=self._burst_duration / self.buffered_num_points, + sync_trigger=sync_trigger, ) except AttributeError as ex: logger.error( @@ -849,7 +854,7 @@ def trigger_measurement(self, parameters, setpoints, method = "ramp" ,sync_trigg with NameError as ex: logger.error("Argument 'method' has to be eiter 'ramp' or 'pulse'") raise ex - + if trigger_type == "manual": logger.warning( "You are using manual triggering. If you want to pulse parameters on multiple" diff --git a/src/qumada/measurement/scripts/generic_measurement.py b/src/qumada/measurement/scripts/generic_measurement.py index 0156fa50..12e45c0a 100644 --- a/src/qumada/measurement/scripts/generic_measurement.py +++ b/src/qumada/measurement/scripts/generic_measurement.py @@ -558,17 +558,19 @@ def run(self): self.ready_buffers() t = time() - start try: - self.trigger_measurement(parameters = self.dynamic_channels, - setpoints = [sweep.get_setpoints() for sweep in self.dynamic_sweeps], - method="ramp", - sync_trigger=sync_trigger, - ) - + self.trigger_measurement( + parameters=self.dynamic_channels, + setpoints=[sweep.get_setpoints() for sweep in self.dynamic_sweeps], + method="ramp", + sync_trigger=sync_trigger, + ) + results = self.readout_buffers(timestamps=True) dynamic_param_results = [ - (dyn_channel, sweep.get_setpoints()) for dyn_channel, sweep in zip( - self.dynamic_channels, self.dynamic_sweeps)] - results.pop(-1) #removes timestamps from results + (dyn_channel, sweep.get_setpoints()) + for dyn_channel, sweep in zip(self.dynamic_channels, self.dynamic_sweeps) + ] + results.pop(-1) # removes timestamps from results datasaver.add_result( (timer, t), *dynamic_param_results, @@ -585,7 +587,7 @@ def run(self): raise ex except TimeoutError: logger.error(f"A timeout error occured. Skipping line at time {t}.") - #results = self.readout_buffers(timestamps=True) + # results = self.readout_buffers(timestamps=True) self.clean_up() datasets.append(datasaver.dataset) return datasets @@ -736,13 +738,15 @@ def run(self): results = [] self.ready_buffers() self.trigger_measurement( - parameters = [dynamic_param, *self.active_compensating_channels], - setpoints = [dynamic_sweep.get_setpoints(), - *[sweep.get_setpoints for sweep in active_comping_sweeps]], - method = "ramp", - sync_trigger=sync_trigger - ) - + parameters=[dynamic_param, *self.active_compensating_channels], + setpoints=[ + dynamic_sweep.get_setpoints(), + *[sweep.get_setpoints for sweep in active_comping_sweeps], + ], + method="ramp", + sync_trigger=sync_trigger, + ) + results = self.readout_buffers() comp_results = [] for ch, sw in zip(self.active_compensating_channels, active_comping_sweeps): diff --git a/src/qumada/utils/device_server.py b/src/qumada/utils/device_server.py new file mode 100644 index 00000000..2813bbc4 --- /dev/null +++ b/src/qumada/utils/device_server.py @@ -0,0 +1,196 @@ +import asyncio +import copy +import datetime +import json +import logging +import pathlib +import threading +import time + +import websockets +from qcodes.instrument.parameter import Parameter +from websockets.asyncio.server import ServerConnection + +from qumada.measurement.device_object import QumadaDevice +from qumada.utils.geometry import Gate, gate_list_to_string, load_from_file + +logger = logging.getLogger(__name__) + + +def _get_parameter_data(parameter: Parameter, cache_only): + value = parameter.cache.get(get_if_invalid=not cache_only) + timestamp = parameter.cache.timestamp + + return { + "value": value, + "timestamp": timestamp, + "name": parameter.full_name, + "label": parameter.label, + "unit": parameter.unit, + "vals": str(parameter.vals), + "instrument": parameter.instrument.full_name, + "instrument_class": parameter.instrument.__class__.__name__, + "root_instrument": parameter.root_instrument.full_name, + "root_instrument_class": parameter.root_instrument.__class__.__name__, + } + + +def _collect_data(*parameters: Parameter, cache_only): + data = [] + for parameter in parameters: + try: + parameter_data = _get_parameter_data(parameter, cache_only=cache_only) + except Exception as e: + parameter_data = {"exception": str(e)} + data.append(parameter_data) + + return { + "parameters": data, + "timestamp": time.time(), + } + + +class DataCollector: + def __init__(self, *parameters: Parameter, cache_only: bool = False, minimal_update_delta: float = 1 / 30): + self.lock = asyncio.Lock() + self.parameters = parameters + self.cache_only = cache_only + self.minimal_update_delta = minimal_update_delta + self._data = None + + async def get_data(self): + # although it should be fine to use a blocking lock here, + # an async lock is the more robust option in case the surrounding code changes. + async with self.lock: + if self._data is None: + delta = float("inf") + else: + delta = time.time() - self._data["timestamp"] + if delta >= self.minimal_update_delta: + self._data = _collect_data(*self.parameters, cache_only=self.cache_only) + return self._data + + +class DeviceWebSocket(threading.Thread): + def __init__(self, collector: DataCollector, ip="127.200.200.9", port=6789): + super().__init__() + self.ip = ip + self.port = port + + self.collector = collector + self.gate_geometry = None + + self._loop = None + self._loop_control_future = threading.Event() + + #: data is re-sent after this time even if nothing changed + self.maximal_update_interval = 1.0 + + @property + def address(self): + return f"ws://{self.ip}:{self.port}" + + def run(self): + if self._loop is not None: + raise RuntimeError("Loop already set") + + logger.info(f"Starting websocket server on {self.ip}:{self.port}") + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + try: + self._loop.run_until_complete(self._serve()) + finally: + pending = asyncio.all_tasks(self._loop) + for task in pending: + task.cancel() + self._loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + self._loop.close() + + def stop(self) -> None: + """ + May be called from any thread. It: + 1. signals the serving coroutine to exit, + 2. schedules loop.stop() thread-safely, and + 3. leaves it to the caller to `join()` if they need to block. + """ + self._loop_control_future.set() + if self._loop is not None and self._loop.is_running(): + # Wake the event-loop so it notices _stop_event quickly + self._loop.call_soon_threadsafe(self._loop.stop) + + async def _serve(self) -> None: + """ + Start the WebSocket server and block until `stop()` is called. + """ + async with websockets.serve(self._handle_connection, self.ip, self.port) as server: + logger.info("WebSocket server listening on %s:%d", self.ip, self.port) + + # Poll the threading.Event without blocking the loop forever. + while not self._loop_control_future.is_set(): + await asyncio.sleep(0.2) + + logger.info("Shutdown requested – closing listener and connections") + server.close() + await server.wait_closed() + logger.info("WebSocket server closed") + + def join(self, timeout=None): + if self.is_alive(): + self.stop() + super().join(timeout) + + async def _handle_connection(self, connection: ServerConnection): + def default_to_json(o): + if isinstance(o, datetime.datetime): + return o.timestamp() + else: + raise NotImplementedError(f"Cannot serialize {o!r} of type {type(o)}") + + last_update = 0.0 + last_message = None + while True: + data = await self.collector.get_data() + + if self.gate_geometry is not None: + data["gate_geometry"] = gate_list_to_string(self.gate_geometry) + + serialized = json.dumps(data, default=default_to_json) + + if serialized != last_message or time.time() - last_update > self.maximal_update_interval: + try: + await connection.send(serialized) + except websockets.exceptions.ConnectionClosed: + logger.debug("Connection closed") + break + else: + last_message = serialized + last_update = time.time() + + try: + await asyncio.sleep(self.collector.minimal_update_delta) + except asyncio.CancelledError: + logger.debug("Cancelled") + break + logger.info("Connection terminated") + + self.monitor_socket = None + + +def start_monitor_socket(device_object: QumadaDevice, gate_geometry: list[Gate] | str | pathlib.Path = None): + if getattr(device_object, "monitor_socket", None) is not None: + logger.info("Monitor socket already present. Restarting.") + device_object.monitor_socket.join() + parameters = [param for parameters in device_object.terminal_parameters.values() for param in parameters.values()] + + collector = DataCollector(*parameters) + device_object.monitor_socket = DeviceWebSocket(collector) + device_object.monitor_socket.start() + + if gate_geometry is not None: + if not isinstance(gate_geometry, list): + from qumada.utils.dxf import load_convert_and_cache + + gate_geometry = load_convert_and_cache(gate_geometry) + + device_object.monitor_socket.gate_geometry = gate_geometry diff --git a/src/qumada/utils/dxf.py b/src/qumada/utils/dxf.py new file mode 100644 index 00000000..4b31747d --- /dev/null +++ b/src/qumada/utils/dxf.py @@ -0,0 +1,555 @@ +import argparse +import copy +import itertools +import logging +import math +import operator +import pathlib +import re +import subprocess +import sys +import warnings +from copy import deepcopy +from typing import Tuple + +import ezdxf.document +import matplotlib.widgets +import numpy as np +import shapely.affinity +from matplotlib import pyplot as plt +from shapely.geometry import LineString, MultiLineString, Polygon, box +from shapely.plotting import plot_polygon + +from qumada.utils.geometry import Gate, load_from_file, store_to_file + +SELECT_ALPHA = 0.3 +DEFAULT_X_RNG = (-3.0, 3.0) +DEFAULT_Y_RNG = (-1.5, 1.5) +DEFAULT_LAYER_REGEX = r".*BEAM\_L." +DEFAULT_GRID_SIZE = 1e-3 + +logger = logging.getLogger(__name__) + + +def entity_to_geom(e): + """ + Best-effort conversion of an ezdxf entity to a Shapely geometry. + Extend this to cover more exotic entities as needed. + """ + if e.dxftype() == "LINE": + return LineString([e.dxf.start, e.dxf.end]) + + if e.dxftype() == "LWPOLYLINE": + vertices = [(x, y) for x, y, *_ in e.vertices_in_wcs()] + closed = bool(e.closed) if hasattr(e, "closed") else vertices[0] == vertices[-1] + if closed: + return Polygon(vertices) + else: + return LineString(vertices) + + if e.dxftype() == "POLYLINE": + points = [(x, y) for x, y, *_ in e.points_in_wcs()] + closed = bool(e.closed) if hasattr(e, "closed") else points[0] == points[-1] + if closed: + return Polygon(points) + else: + return LineString(points) + + raise NotImplementedError(e.dxftype()) + + +def iterate_all_entities(e, path=None): + if path is None: + path = [] + if e.dxftype() == "INSERT": + path.append(e.dxf.name) + for sub in e.virtual_entities(): + yield from iterate_all_entities(sub, path) + path.pop() + else: + yield e, list(path) + + +def _get_all_entity_bounding_box(doc: ezdxf.document.Drawing) -> tuple[float, float, float, float] | None: + minx = miny = maxx = maxy = float("nan") + + for model in doc.modelspace(): + for e, _ in iterate_all_entities(model): + e_minx, e_miny, e_maxx, e_maxy = entity_to_geom(e).bounds + minx = min(e_minx, minx) + miny = min(e_miny, miny) + maxx = max(e_maxx, maxx) + maxy = max(e_maxy, maxy) + + if math.isnan(minx): + return None + + return minx, miny, maxx, maxy + + +def _auto_cropping_box( + doc: ezdxf.document.Drawing, + keep_layer: dict[str, bool], + feature_size: float, + max_cropping_box_size: float, + grid_size: float, +) -> tuple[float, float, float, float] | None: + edge_set = [] + + for model in doc.modelspace(): + for e, _ in iterate_all_entities(model): + if not keep_layer[e.dxf.layer]: + continue + + raw_geom = entity_to_geom(e) + if raw_geom is None: + continue + geom = Polygon(raw_geom).simplify(grid_size) + + points = np.array(geom.boundary.xy).T.tolist() + + if not points: + continue + + points.append(points[0]) + for (x0, y0), (x1, y1) in itertools.pairwise(points): + if x0 == x1 and x1 == y1: + continue + + if (x0 - x1) ** 2 + (y0 - y1) ** 2 <= feature_size**2: + edge_set.append([[x0, y0], [x1, y1]]) + if not edge_set: + return None + + edge_set = np.array(edge_set) + + edge_center = np.mean(edge_set, axis=(0, 1)) + logger.debug("edge_center: %r", edge_center) + edge_distance = np.min(np.linalg.norm(edge_set - edge_center, axis=2), axis=1) + edge_mask = edge_distance <= max_cropping_box_size * 5 + included_edges = edge_set[edge_mask] + + minx, miny = np.min(included_edges, axis=(0, 1)).tolist() + maxx, maxy = np.max(included_edges, axis=(0, 1)).tolist() + return minx, miny, maxx, maxy + + +def get_gates_from_cropped_region( + doc: ezdxf.document.Drawing, + x_rng: tuple[float, float] = DEFAULT_X_RNG, + y_rng: tuple[float, float] = DEFAULT_Y_RNG, + layer_regex: str = DEFAULT_LAYER_REGEX, + grid_size: float = DEFAULT_GRID_SIZE, + auto_adjust_cropping: bool = False, +) -> list[Gate]: + """Selects all polygons (POLYLINE entities) from the selected region that live in a layer matched by the given + regular expression. The polygons are cropped to the region and returned as :py:`.Gate` objects. + + :py:attr:`.Gate.label_position` is only assigned to gates that touch the boundary. + + In some cases, continuous gates are returned in multiple pieces. Use :py:`.auto_merge` to merge overlapping gates in the same layer. + """ + regex = re.compile(layer_regex) + logger.debug("Using regular expression %r for layer selection", layer_regex) + + keep_layer = {layer.dxf.name: bool(regex.search(layer.dxf.name)) for layer in doc.layers} + + xmin, xmax = x_rng + ymin, ymax = y_rng + + if auto_adjust_cropping: + feature_size = min(xmax - xmin, ymax - ymin) / 3.0 + max_box_size = math.sqrt((xmax - xmin) ** 2 + (ymax - ymin) ** 2) + auto_xmin, auto_ymin, auto_xmax, auto_ymax = _auto_cropping_box( + doc, + keep_layer=keep_layer, + feature_size=feature_size, + max_cropping_box_size=max_box_size, + grid_size=grid_size, + ) + + x_center = (auto_xmax + auto_xmin) / 2.0 + y_center = (auto_ymax + auto_ymin) / 2.0 + offset = (x_center, y_center) + + logger.info("Shifting box by %r", offset) + + # shift initial box + xmin += x_center + xmax += x_center + ymin += y_center + ymax += y_center + + logger.info("Using x cropping range: %r and y cropping range %r", (xmin, xmax), (ymin, ymax)) + + roi_poly = Polygon(box(xmin, ymin, xmax, ymax)) + + chosen = [] + for model in doc.modelspace(): + for e, path in iterate_all_entities(model): + layer = e.dxf.layer + if not keep_layer[layer]: + continue + + raw_geom = entity_to_geom(e) + if raw_geom is None: + continue + + geom = Polygon(raw_geom).simplify(grid_size) + geom_roi = geom.intersection(roi_poly) + if geom_roi == roi_poly: + continue + + if geom_roi.is_empty: + continue + + geom_roi = shapely.set_precision(geom_roi, grid_size=grid_size) + + boundary = geom.intersection(roi_poly.boundary) + + if not boundary.is_empty and isinstance(boundary, (LineString, MultiLineString)): + label_position_point = boundary.line_interpolate_point(0.5, normalized=True) + label_position = label_position_point.x, label_position_point.y + else: + label_position = None + + label = f"G{len(chosen)}" + if label == "G36": + pass + + gate = Gate( + polygon=geom_roi, + label_position=label_position, + label=label, + path=path, + layer=layer, + ) + chosen.append(gate) + + return chosen + + +def _connect_all_touching(gates: list[Gate], grid_size: float) -> list[Gate]: + """Iteratively connects all touching gates "or"-ing their label positions.""" + assert len({gate.layer for gate in gates}) == 1 + + result = [] + not_intersecting = [] + intersecting = [] + for gate in gates: + not_intersecting.clear() + intersecting.clear() + for other in result: + # boundary = gate.polygon.intersection(other.polygon, grid_size=grid_size) + if gate.polygon.intersects(other.polygon): + intersecting.append(other) + else: + not_intersecting.append(other) + result.clear() + + if intersecting: + intersecting.append(gate) + label_position = None + for g in intersecting: + label_position = label_position or g.label_position + new_poly = shapely.union_all([g.polygon for g in intersecting], grid_size=grid_size) + to_append = intersecting[0] + to_append.polygon = new_poly + to_append.label_position = label_position + + result.append(to_append) + else: + result.append(copy.copy(gate)) + + result.extend(not_intersecting) + return result + + +def auto_merge(gates: list[Gate], grid_size: float = 1e-3): + """Merges touching gates in the same layer""" + + by_layer = {} + for gate in gates: + by_layer.setdefault(gate.layer, []).append(gate) + + for layer, layer_gates in by_layer.items(): + connected = _connect_all_touching(layer_gates, grid_size) + by_layer[layer] = connected + + result = sum(by_layer.values(), start=[]) + return result + + +def label_gates(gates: list[Gate]) -> list[Gate]: + if not gates: + return [] + + gates = [copy.deepcopy(gate) for gate in gates] + + gates = sorted(gates, key=lambda gate: 0.0 if gate.label_position is None else math.atan2(*gate.label_position)) + + axd = plt.figure(layout="constrained").subplot_mosaic( + """ + AAAA + BCDE + """, + height_ratios=[1, 0.1], + ) + ax = axd["A"] + fig = ax.get_figure() + prv = matplotlib.widgets.Button(ax=axd["B"], label="Previous") + txt = matplotlib.widgets.TextBox(ax=axd["C"], label="Name") + apply = matplotlib.widgets.Button(ax=axd["D"], label="Apply") + nxt = matplotlib.widgets.Button(ax=axd["E"], label="Next") + + layers = {gate.layer for gate in gates} + color_iter = iter(plt.rcParams["axes.prop_cycle"].by_key()["color"]) + layer_colors = dict(zip(layers, color_iter)) + + plots = [] + for idx, gate in enumerate(gates): + color = layer_colors[gate.layer] + poly_patch = plot_polygon(gate.polygon, facecolor="none", color=color, add_points=False, ax=ax) + + poly_patch.gate_index = idx + poly_patch.set_picker(True) + + label_plot = ax.annotate( + str(gate.label), gate.label_position, bbox=dict(boxstyle="round", fc="0.8"), ha="center", va="center" + ) + plots.append((poly_patch, label_plot)) + + def deselect_gate(idx): + poly_plot, label_plot = plots[idx] + poly_plot.set_facecolor("none") + txt.set_val("") + plt.draw() + + def select_gate(idx): + poly_plot, label_plot = plots[idx] + gate = gates[idx] + color = layer_colors[gate.layer] + poly_plot.set_facecolor((color, SELECT_ALPHA)) + if str(gate.label) != str(None): + txt.set_val(str(gate.label)) + plt.draw() + + current_gate = 0 + select_gate(current_gate) + + def apply_action(*_): + gate = gates[current_gate] + _, label_plot = plots[current_gate] + gate.label = txt.text + label_plot.set_text(txt.text) + plt.draw() + + def prev_action(*_): + nonlocal current_gate + deselect_gate(current_gate) + if current_gate == 0: + current_gate += len(gates) + current_gate -= 1 + select_gate(current_gate) + + def next_action(*_): + nonlocal current_gate + deselect_gate(current_gate) + current_gate += 1 + if current_gate == len(gates): + current_gate -= len(gates) + select_gate(current_gate) + + def pick_handler(event): + nonlocal current_gate + artist = event.artist + if hasattr(artist, "gate_index"): + gate_index = artist.gate_index + if gate_index != current_gate: + deselect_gate(current_gate) + current_gate = gate_index + select_gate(current_gate) + else: + raise NotImplementedError(event) + + prv.on_clicked(prev_action) + apply.on_clicked(apply_action) + nxt.on_clicked(next_action) + fig.canvas.mpl_connect("pick_event", pick_handler) + + widgets = [prv, apply, nxt, txt] + fig.widgets = widgets + + return gates + + +def load_convert_and_cache( + path: pathlib.Path | str, expected_number_of_gates: int | range | slice = slice(1, None) +) -> list[Gate]: + path = pathlib.Path(path) + + if not path.exists(): + raise FileNotFoundError(path) + + if path.suffix == ".dxf": + dxf_path = path + json_path = path.with_suffix(".json") + + if not json_path.exists(): + try: + _ = subprocess.run( + [sys.executable, "-m", "qumada.utils.dxf", dxf_path], check=True, stderr=subprocess.PIPE + ) + except subprocess.CalledProcessError as err: + err_msg = err.stderr.decode(errors="replace") + print("File conversion failed:\n", err_msg, file=sys.stderr) + raise + + elif path.suffix == ".json": + json_path = path + + else: + raise ValueError("Only dxf and json are supported", path) + + return load_from_file(json_path) + + +def get_parser(): + import argparse + + def to_range(s: str): + try: + min_s, max_s = s.split(":") + if not min_s: + min_s = "-inf" + if not max_s: + max_s = "inf" + return float(min_s), float(max_s) + except Exception as err: + raise argparse.ArgumentTypeError( + "Argument must be a 'min:max' pair of python floats or empty strings separated by a colon." + ) from err + + def to_slice(s: str): + try: + start, stop = s.split(":") + if start: + start = int(start) + else: + start = None + if stop: + stop = int(stop) + else: + stop = None + return slice(start, stop) + except Exception as err: + raise argparse.ArgumentTypeError( + "Argument must be a 'start:stop' pair of python integers or empty strings separated by a colon." + ) from err + + parser = argparse.ArgumentParser(description="Qumada dxf labeler") + parser.add_argument("dxf_path", type=pathlib.Path, help="path to dxf file") + parser.add_argument( + "--json-path", + type=pathlib.Path, + help="path to json file. default is the same as dxf with other ending", + default=None, + ) + parser.add_argument( + "--x-rng", + help="The gates are cropped to this range in x coordinates", + type=to_range, + metavar="[X_MIN={}]:[X_MAX={}]".format(*DEFAULT_X_RNG), + default=":".join(map(str, DEFAULT_X_RNG)), + ) + parser.add_argument( + "--y-rng", + help="The gates are cropped to this range in y coordinates", + type=to_range, + metavar="[Y_MIN={}]:[Y_MAX={}]".format(*DEFAULT_Y_RNG), + default=":".join(map(str, DEFAULT_Y_RNG)), + ) + parser.add_argument( + "--layer-regex", + help=f'Only gates from layers where the name matches this regex are considered. (Default: "{DEFAULT_LAYER_REGEX}")', + default=DEFAULT_LAYER_REGEX, + ) + parser.add_argument( + "--log-level", + help="Logging level", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + default="INFO", + ) + parser.add_argument( + "--auto-adjust-cropping", + help="If true(default), the cropping ranges are adjusted based on some unstable feature detection algorithm.", + choices=[True, False], + type=lambda x: x.lower() == "true", + default=True, + ) + + parser.add_argument( + "--expected-gate-number", + help="Expected number of gates in integer slice notation (excluding end). " + "The default will result in an error if there are no gates extracted", + metavar="[START]:[STOP]", + type=to_slice, + default="1:", + ) + + return parser + + +def _main( + dxf_path: str, + json_path: str, + layer_regex: str, + x_rng: tuple[float, float], + y_rng: tuple[float, float], + expected_gate_number: slice, + auto_adjust_cropping: bool, +): + matplotlib.use("qtagg") + doc = ezdxf.readfile(dxf_path) + raw_gates = get_gates_from_cropped_region( + doc, layer_regex=layer_regex, x_rng=x_rng, y_rng=y_rng, auto_adjust_cropping=auto_adjust_cropping + ) + logger.info(f"{len(raw_gates)} raw gates extracted.") + + merged_gates = auto_merge(raw_gates) + logger.info(f"{len(merged_gates)} gates left after merging.") + + if expected_gate_number.start is not None and len(merged_gates) < expected_gate_number.start: + raise ValueError(f"Only {len(merged_gates)} gates extracted but >= {expected_gate_number.start} were expected.") + if expected_gate_number.stop is not None and len(merged_gates) >= expected_gate_number.stop: + raise ValueError(f"Only {len(merged_gates)} gates extracted but < {expected_gate_number.stop} were expected.") + + if not merged_gates: + # this is apparently explicitly allowed by the user cause otherwise the expected_gate_number check should fail + logger.info("No gates extracted. Storing empty gates in json path") + store_to_file([], json_path) + return + + resulting_gates = label_gates(merged_gates) + plt.show(block=True) + store_to_file(resulting_gates, json_path) + + +if __name__ == "__main__": + parser = get_parser() + args = parser.parse_args() + if args.json_path is None: + args.json_path = args.dxf_path.with_suffix(".json") + + logging.basicConfig() + logger.setLevel(args.log_level) + + _main( + args.dxf_path, + args.json_path, + args.layer_regex, + args.x_rng, + args.y_rng, + args.expected_gate_number, + auto_adjust_cropping=args.auto_adjust_cropping, + ) diff --git a/src/qumada/utils/geometry.py b/src/qumada/utils/geometry.py new file mode 100644 index 00000000..9aa49690 --- /dev/null +++ b/src/qumada/utils/geometry.py @@ -0,0 +1,62 @@ +import dataclasses +import json +import pathlib + +from shapely import wkt +from shapely.geometry import LineString, Point, Polygon + + +@dataclasses.dataclass +class Gate: + """Representation of a gate electrode with the geometric properties and annotation information.""" + + polygon: Polygon + path: list[str] + layer: str + label: str | None + label_position: tuple[float, float] + + +def gate_list_to_string(gates: list[Gate]) -> str: + """Serialize a list of gates to a JSON string. + + The geometric information is stored in WKT format (well known text).""" + + def to_json(o): + if isinstance(o, Gate): + return dataclasses.asdict(o) + elif isinstance(o, Polygon): + return o.wkt + else: + return o + + txt = json.dumps(gates, indent=2, default=to_json) + return txt + + +def string_to_gate_list(txt: str) -> list[Gate]: + """Deserialize a JSON string produced by :py:`.gate_list_to_string` to a list of gates.""" + + data = json.loads(txt) + assert isinstance(data, list) + + gates = [] + for d in data: + d["polygon"] = wkt.loads(d["polygon"]) + d["label_position"] = tuple(d["label_position"]) + gates.append(Gate(**d)) + return gates + + +def store_to_file(gates: list[Gate], path: pathlib.Path): + """Store a list of gates in a file in json format.""" + path = pathlib.Path(path) + txt = gate_list_to_string(gates) + path.write_text(txt) + + +def load_from_file(path: pathlib.Path): + """Load a list of gates from a JSON file produced with :py:`.store_to_file`.""" + path = pathlib.Path(path) + txt = path.read_text() + return string_to_gate_list(txt) diff --git a/src/qumada/utils/load_from_sqlite_db.py b/src/qumada/utils/load_from_sqlite_db.py index 74fa6235..2d7d946f 100644 --- a/src/qumada/utils/load_from_sqlite_db.py +++ b/src/qumada/utils/load_from_sqlite_db.py @@ -24,6 +24,7 @@ """ from __future__ import annotations +import re from os import path import numpy as np @@ -32,7 +33,6 @@ from qcodes.dataset.plotting import plot_dataset from qumada.utils.browsefiles import browsefiles -import re # %% @@ -216,7 +216,7 @@ def pick_measurements(sample_name: str = None, preview_dialogue=False, measureme load_db() return pick_measurements(preview_dialogue=preview_dialogue, measurement_list=measurement_list) elif re.match(pattern, chosen): - for i in range(int(match.group(1)), int(match.group(2))+1): + for i in range(int(match.group(1)), int(match.group(2)) + 1): measurement_list.append(measurements[i]) else: chosen = int(chosen) @@ -289,14 +289,17 @@ def get_parameter_data(dataset=None, parameter_name=None, **kwargs): ) return zip(params, data, units, labels) + # %% -def get_parameter_name_by_label(dataset=None, parameter_label=None, appendix = ""): + +def get_parameter_name_by_label(dataset=None, parameter_label=None, appendix=""): for param in dataset.get_parameters(): if param.label == parameter_label + appendix: return param.name return None + # %% def separate_up_down(x_data, y_data): grad = np.gradient(x_data) diff --git a/src/qumada/utils/plotting.py b/src/qumada/utils/plotting.py index fdd7e7b0..9571b1c2 100644 --- a/src/qumada/utils/plotting.py +++ b/src/qumada/utils/plotting.py @@ -171,7 +171,7 @@ def plot_2D( if args: x_data, y_data, z_data = _handle_overload(x_data, y_data, z_data, *args, output_dimension=3) if ax is None or fig is None: - fig, ax = plt.subplots(figsize = kwargs.get("figsize", (10,10))) + fig, ax = plt.subplots(figsize=kwargs.get("figsize", (10, 10))) # Skalierung der Achsendaten und Einheiten x_values, y_values, z_values = x_data[1], y_data[1], z_data[1] @@ -196,7 +196,7 @@ def plot_2D( # Plotten der 2D-Daten im = ax.pcolormesh(x, y, z, shading="auto") cbar = fig.colorbar(im, ax=ax) - + if z_label is None: cbar.set_label(f"{z_data[3]} ({z_unit})") else: @@ -341,12 +341,12 @@ def plot_multiple_datasets( scale_axis=True, legend=True, exclude_string_from_legend: list = ["1D Sweep"], - legend_entries: None|list = None, - save_to_file = None, - close = False, - x_label = None, - y_label = None, - color_map = None, + legend_entries: None | list = None, + save_to_file=None, + close=False, + x_label=None, + y_label=None, + color_map=None, **kwargs, ): """ @@ -442,15 +442,15 @@ def plot_multiple_datasets( if font is not None: matplotlib.rc("font", 30) matplotlib.rc("font", size=40) - default_colors = plt.rcParams['axes.prop_cycle'].by_key()['color'] + default_colors = plt.rcParams["axes.prop_cycle"].by_key()["color"] if ax is None or fig is None: - fig, ax = plt.subplots(figsize = kwargs.get("figsize", (10, 10))) + fig, ax = plt.subplots(figsize=kwargs.get("figsize", (10, 10))) x_labels = [] y_labels = [] for i in range(len(datasets)): if color_map is None: - color = default_colors[i % len(default_colors)] + color = default_colors[i % len(default_colors)] else: color = color_map[i] if legend_entries is None: @@ -464,12 +464,12 @@ def plot_multiple_datasets( x_name = x_axis_parameters_name[i] else: x_name = x_axis_parameters_name - + if isinstance(y_axis_parameters_name, list): y_name = y_axis_parameters_name[i] else: y_name = y_axis_parameters_name - + x, y = _handle_overload( *get_parameter_data(datasets[i], y_axis_parameters_name), x_name=x_name, @@ -497,10 +497,10 @@ def plot_multiple_datasets( if not optimize_hysteresis_legend: f_label += " backsweep" if optimize_hysteresis_legend is True: - # Only one legend entry per dataset (instead of one for each fore-/backsweep) + # Only one legend entry per dataset (instead of one for each fore-/backsweep) if j > 0: f_label = None - if j%2 == False: # ;-) Ensuring the first sweep marker is always filled + if j % 2 == False: # ;-) Ensuring the first sweep marker is always filled fill_style = "full" else: fill_style = "none" @@ -512,8 +512,8 @@ def plot_multiple_datasets( linestyle=kwargs.get("linestyle", ""), label=f_label, markersize=kwargs.get("markersize", 20), - color = color, - fillstyle = fill_style + color=color, + fillstyle=fill_style, ) else: plt.plot( @@ -523,14 +523,14 @@ def plot_multiple_datasets( linestyle=kwargs.get("linestyle", ""), label=label, markersize=kwargs.get("markersize", 20), - color = color, + color=color, ) # Scale axes and update labels if scale_axis is True: x_scaling_factor, x_units[0] = _rescale_axis(ax.xaxis, np.concatenate(x_data), x_units[0], "x") y_scaling_factor, y_units[0] = _rescale_axis(ax.yaxis, np.concatenate(y_data), y_units[0], "y") - + if x_label is None: plt.xlabel(f"{x_labels[0]} ({x_units[0]})") else: @@ -539,7 +539,7 @@ def plot_multiple_datasets( plt.ylabel(f"{y_labels[0]} ({y_units[0]})") else: plt.ylabel(f"{y_label} ({y_units[0]})") - + # Update x and y labels leg_entries = ax.legend().get_texts() if legend is True: @@ -547,9 +547,9 @@ def plot_multiple_datasets( loc=kwargs.get("legend_position", "upper left"), fontsize=kwargs.get("legend_fontsize", 35), markerscale=kwargs.get("legend_markerscale", 1), - ncol = kwargs.get("legend_ncols", int(len(leg_entries)/9)+1), - columnspacing = kwargs.get("legend_columnspacing", 0.2), - handletextpad = kwargs.get("legend_handletextpad", -0.7), + ncol=kwargs.get("legend_ncols", int(len(leg_entries) / 9) + 1), + columnspacing=kwargs.get("legend_columnspacing", 0.2), + handletextpad=kwargs.get("legend_handletextpad", -0.7), ) plt.tight_layout() if save_to_file is not None: diff --git a/src/qumada/utils/ramp_parameter.py b/src/qumada/utils/ramp_parameter.py index 3c93aa72..ae0e7d7a 100644 --- a/src/qumada/utils/ramp_parameter.py +++ b/src/qumada/utils/ramp_parameter.py @@ -23,13 +23,14 @@ import logging import time +from copy import deepcopy from math import isclose from qumada.utils.generate_sweeps import generate_sweep -from copy import deepcopy LOG = logging.getLogger(__name__) + def has_ramp_method(parameter): try: parameter.root_instrument._qumada_ramp @@ -37,13 +38,15 @@ def has_ramp_method(parameter): except AttributeError: return False + def has_pulse_method(parameter): try: parameter.root_instrument._qumada_pulse return True except AttributeError: return False - + + def has_force_trigger_method(parameter): try: parameter.root_instrument._qumada_mapping.force_trigger @@ -51,9 +54,11 @@ def has_force_trigger_method(parameter): except AttributeError: return False + class Unsweepable_parameter(Exception): pass + class Unsettable_parameter(Exception): pass @@ -119,7 +124,7 @@ def ramp_parameter( LOG.debug(f"ramp rate: {ramp_rate}") LOG.debug(f"ramp time: {ramp_time}") - if isinstance(current_value, float|int) and not isinstance(current_value, bool): + if isinstance(current_value, float | int) and not isinstance(current_value, bool): LOG.debug(f"target: {target}") if isclose(current_value, target, rel_tol=tolerance): LOG.debug("Target value is sufficiently close to current_value, no need to ramp") @@ -151,7 +156,7 @@ def ramp_parameter( LOG.debug(f"sweep: {sweep}") for value in sweep: parameter.set(value) - time.sleep(ramp_time/num_points) + time.sleep(ramp_time / num_points) return True else: raise Unsweepable_parameter("Parameter has non-float values") @@ -176,70 +181,78 @@ def ramp_or_set_parameter( parameter.set(target) except Unsettable_parameter: pass - + + def ramp_or_set_parameters( - parameters: list, - targets: list[float], - ramp_rate: float | list[float] = 0.3, - ramp_time: float | list[float] = 5, - setpoint_interval: float |list[float] = 0.1, - tolerance: float = 1e-5, - trigger_start = None, - trigger_type = "software", - trigger_reset = None, - sync_trigger = None): + parameters: list, + targets: list[float], + ramp_rate: float | list[float] = 0.3, + ramp_time: float | list[float] = 5, + setpoint_interval: float | list[float] = 0.1, + tolerance: float = 1e-5, + trigger_start=None, + trigger_type="software", + trigger_reset=None, + sync_trigger=None, +): instruments = {param.root_instrument for param in parameters} - instruments_dict = {} #Will contain instruments as keys and their params with targets as vals. - #Check requirements for parallel ramps. + instruments_dict = {} # Will contain instruments as keys and their params with targets as vals. + # Check requirements for parallel ramps. if trigger_type is not None: for instr in instruments: - if has_ramp_method(instr) and has_force_trigger_method(instr) and hasattr(instr._qumada_mapping, "max_ramp_channels"): - instruments_dict[instr] = [] #Only instruments supporting ramps are added! + if ( + has_ramp_method(instr) + and has_force_trigger_method(instr) + and hasattr(instr._qumada_mapping, "max_ramp_channels") + ): + instruments_dict[instr] = [] # Only instruments supporting ramps are added! # Loop groups params according to their instruments for later execution of ramps. for param, target in zip(parameters, targets): if param._settable is False: LOG.warning(f"{param} is not _settable and cannot be ramped!") continue - - current_value = param.get() - #TODO: Possibly further improvements with cached val or known start. + + current_value = param.get() + # TODO: Possibly further improvements with cached val or known start. # Check if parameter should be ramped or set. - if isinstance(current_value, float|int) and not isinstance(current_value, bool): + if isinstance(current_value, float | int) and not isinstance(current_value, bool): LOG.debug(f"current value: {current_value}, target: {target}") if isclose(current_value, target, rel_tol=tolerance): LOG.debug("Target value is sufficiently close to current_value, no need to ramp") continue - if param.root_instrument in instruments_dict.keys(): #Only instruments supporting ramps - instruments_dict[param.root_instrument].append((param,target)) - else: #Everything that cannot be ramped with instrument ramp is ramped/set here + if param.root_instrument in instruments_dict.keys(): # Only instruments supporting ramps + instruments_dict[param.root_instrument].append((param, target)) + else: # Everything that cannot be ramped with instrument ramp is ramped/set here ramp_or_set_parameter(param, target, ramp_rate, ramp_time, setpoint_interval) # Now go through all instruments supporting ramps and start the ramps for instr, values in instruments_dict.items(): counter = 1 param_helper = [] target_helper = [] - #Params and targets are added until the max number of simultaneously rampable - #channels is reached. Then ramp is started and new params are added. + # Params and targets are added until the max number of simultaneously rampable + # channels is reached. Then ramp is started and new params are added. for param, target in values: param_helper.append(param) target_helper.append(target) - #TODO: Triggering logic won't work if sync trigger is used to trigger another + # TODO: Triggering logic won't work if sync trigger is used to trigger another # DAC (e.g. QDac in combination with Decadac) - if counter%instr._qumada_mapping.max_ramp_channels == 0 or counter == len(values): - LOG.debug(f"Ramping {param_helper} to {target_helper}") + if counter % instr._qumada_mapping.max_ramp_channels == 0 or counter == len(values): + LOG.debug(f"Ramping {param_helper} to {target_helper}") if sync_trigger is not None: - LOG.exception("You are using a sync trigger for ramps outside measurements. \ + LOG.exception( + "You are using a sync trigger for ramps outside measurements. \ If the sync trigger is required to start an another DACs/AWGs ramp \ this will not work. (E.g. QDac and Decadac). If you only need it to \ - start data acquisitions you're fine.") + start data acquisitions you're fine." + ) instr._qumada_ramp( param_helper, - end_values = target_helper, - ramp_time = min(ramp_time, 1/ramp_rate), #TODO: Is that fine/Safe enough? - sync_trigger=None - ) + end_values=target_helper, + ramp_time=min(ramp_time, 1 / ramp_rate), # TODO: Is that fine/Safe enough? + sync_trigger=None, + ) instr._qumada_mapping.force_trigger() - #TODO: Force trigger for AWGs/DACs? + # TODO: Force trigger for AWGs/DACs? time.sleep(ramp_time) try: trigger_reset() @@ -247,11 +260,4 @@ def ramp_or_set_parameters( LOG.info("No method to reset the trigger defined.") param_helper = [] target_helper = [] - counter +=1 - - - - - - - + counter += 1