From 5b4f6fd924ec24e125968ae2e1ec399a980ce4d4 Mon Sep 17 00:00:00 2001 From: Simon Humpohl Date: Tue, 15 Jul 2025 17:46:25 +0200 Subject: [PATCH 01/19] First version of device monitor --- pyproject.toml | 2 + src/qumada-monitor/README.md | 29 +++ src/qumada-monitor/__init__.py | 0 src/qumada-monitor/__main__.py | 13 ++ src/qumada-monitor/app.py | 227 ++++++++++++++++++++++ src/qumada/utils/device_server.py | 199 +++++++++++++++++++ src/qumada/utils/dxf.py | 309 ++++++++++++++++++++++++++++++ src/qumada/utils/geometry.py | 50 +++++ 8 files changed, 829 insertions(+) create mode 100644 src/qumada-monitor/README.md create mode 100644 src/qumada-monitor/__init__.py create mode 100644 src/qumada-monitor/__main__.py create mode 100644 src/qumada-monitor/app.py create mode 100644 src/qumada/utils/device_server.py create mode 100644 src/qumada/utils/dxf.py create mode 100644 src/qumada/utils/geometry.py diff --git a/pyproject.toml b/pyproject.toml index 780b1ae4..604483f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,12 +24,14 @@ dependencies = [ "jsonschema", "zhinst-toolkit >= 0.3.3", "pyqt5", + "shapely", "versioningit ~= 2.2.0", ] [project.optional-dependencies] gui = ["plottr"] spyder = ["spyder"] +monitor = ["dash", "ezdxf", "dash-extensions"] [project.urls] Repository = "https://github.com/qutech/qumada" diff --git a/src/qumada-monitor/README.md b/src/qumada-monitor/README.md new file mode 100644 index 00000000..f0921013 --- /dev/null +++ b/src/qumada-monitor/README.md @@ -0,0 +1,29 @@ +# QuMADA Device Monitor + +This is a dash app that is intended to monitor a qumada device. + +The feature that separates it from the qcodes monitor is the ability to display a proper layout. + +```python +from qumada.utils.device_server import start_monitor_socket + +device: 'QumadaDevice' = ... +my_dxf_file = ... + +start_monitor_socket(device, my_dxf_file) +``` + +This does the following steps under the hood: + +1. Create a list of `qumada.utils.geometry.Gate` objects with the correct labels + - Load a dxf file with `doc = ezdxf.readfile('my_layout.dxf')` + - Crop it to the desired region with `raw_gates = qumada.utils.dxf.get_gates_from_cropped_region` + - Merge gates from the same layer that overlap (necessary for some reason) with `merged_gates = qumada.utils.dxf.auto_merge` + - Give the gates the proper labels with a helper GUI: `gates = qumada.utils.dxf.label_gates(merged_gates)` + - Save the gate list to json with `qumada.utils.geometry.store_to_file(gates, 'my_gates.json')` +2. Start the device monitor server and set its gate_geometry + +To view the monitor you need to start the webserver by running `python -m qumada-monitor` in a shell + +The webserver will try to match parameters to gates based on the gate labels being present in the parameter labels. +Un-matched gates are nan. diff --git a/src/qumada-monitor/__init__.py b/src/qumada-monitor/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/qumada-monitor/__main__.py b/src/qumada-monitor/__main__.py new file mode 100644 index 00000000..123a86e4 --- /dev/null +++ b/src/qumada-monitor/__main__.py @@ -0,0 +1,13 @@ +import argparse + +from .app import make_app + +p = argparse.ArgumentParser(description="Dash front-end for your measurement WebSocket") +p.add_argument("--ws", default="ws://127.200.200.9:6789", help="WebSocket URL, e.g. ws://localhost:8765") +p.add_argument("--host", default="127.0.0.1", help="Dash host to bind") +p.add_argument("--port", default=8050, type=int, help="Dash port (default: 8050)") +p.add_argument("--debug", default=False, action="store_true", help="Enable debug mode") +args = p.parse_args() + +app = make_app(args.ws) +app.run(host=args.host, port=args.port, debug=args.debug) diff --git a/src/qumada-monitor/app.py b/src/qumada-monitor/app.py new file mode 100644 index 00000000..0cf32bc6 --- /dev/null +++ b/src/qumada-monitor/app.py @@ -0,0 +1,227 @@ +#!/usr/bin/env python3 +""" +Live quantum‑device monitor: geometry + voltages ➜ interactive SVG plot. + +Start with: + python dash_device_monitor.py --ws ws://localhost:8765 --colorscale Cividis +""" +import argparse, json, itertools, os, warnings +from collections import defaultdict + +import plotly.graph_objects as go +import plotly.express as px +from plotly.colors import sample_colorscale +from shapely.geometry import Polygon + +import dash.exceptions +from dash import Dash, html, dcc, dash_table, Input, Output, State, no_update +from dash_extensions import WebSocket # pip install dash-extensions + +# ---------------- helpers -------------------------------------------------- # +from qumada.utils.geometry import string_to_gate_list, Gate + + +def layer_palette(layers): + """Return dict layer -> rgba border colour (qualitative palette, repeats if needed).""" + palette = itertools.cycle(px.colors.qualitative.Plotly) + return {layer: next(palette) for layer in layers} + + +def voltage_colour(v, vabs_max, colorscale): + """Map voltage to rgba string using the chosen Plotly colourscale.""" # flat value + ratio = 0.5 + v / vabs_max / 2 + return sample_colorscale(colorscale, [ratio])[0] # :contentReference[oaicite:9]{index=9} + + +def _set_alpha(color: str, value) -> str: + if color.startswith("rgb("): + result = color.replace("rgb(", "rgba(") + result = result.replace(")", f",{value})") + elif color.startswith("rgba("): + result = color.rsplit(",", 1)[0] + result = f"{result},{value})" + else: + raise NotImplementedError(color) + return result + + +def gates_to_figure(gates: list[Gate], voltages: list[dict], colorscale): + """Build a Plotly figure from gate list + latest voltages dict.""" + if not gates: + return go.Figure() + + gate_runtime_info = [] + for gate in gates: + candidates = [] + for data in voltages: + if gate.label in data["label"]: + candidates.append(data) + if not candidates: + gate_runtime_info.append(None) + else: + if len(candidates) > 1: + data = min(candidates, key=lambda d: d["label"]) + else: + data, = candidates + gate_runtime_info.append(data) + + # 1) derive colour mapping + v_values = [info["value"] + for info in gate_runtime_info + if info is not None] + vabs_max = max(map(abs, v_values + [1.0])) + layers = sorted({g.layer for g in gates}) + border_col = layer_palette(layers) # :contentReference[oaicite:10]{index=10} + + fig = go.Figure() + for gate, info in zip(gates, gate_runtime_info): + poly: Polygon = gate.polygon + x, y = poly.exterior.xy + x = list(x) + y = list(y) + if info: + v = info["value"] + unit = info["unit"] + text = f"{gate.label}:
{v:.3f} {unit}" + fill_col = voltage_colour(v, vabs_max, colorscale) + else: + text = f"{gate.label}:
NaN" + fill_col = None + line_col = border_col[gate.layer] + + if fill_col is None: + fill_col = "rgba(0,0,0,0)" + else: + fill_col = _set_alpha(fill_col, 0.3) + + fig.add_trace( + go.Scatter( + x=x, + y=y, + fill="toself", # polygon fill trick :contentReference[oaicite:11]{index=11} + fillcolor=fill_col, + line=dict(color=line_col, width=1), + hoverinfo="text", + text=text, + showlegend=False, + mode='lines', + ) + ) + # add static label at predefined position + if getattr(gate, "label_position", None): + lx, ly = gate.label_position + fig.add_annotation(x=lx, y=ly, + text=text, + showarrow=False, + font=dict(size=10, color="black")) + + fig.update_layout( + xaxis=dict(scaleanchor="y", visible=False), + yaxis=dict(visible=False), + margin=dict(l=0, r=0, t=0, b=0), + plot_bgcolor="white", + uirevision=gates, + ) + return fig + + +# ---------------- Dash app -------------------------------------------------- # +def make_app(ws_url: str) -> Dash: + app = Dash(__name__, title="Quantum‑dot voltage monitor") + + app.layout = html.Div( + [ + html.H3("Live device layout"), + WebSocket(id="ws", url=ws_url), + dcc.Store(id="gate-geometry"), + dcc.Store(id="voltages"), + dcc.Store(id="parameters"), + dcc.Graph(id="layout-graph", style={"height": "700px"}), + dcc.Dropdown( + id="colorscale-dropdown", + value="rdbu", + options=[{"label": cs, "value": cs} for cs in px.colors.named_colorscales()], + clearable=False, + style={"width": "250px"}, + ), + html.Hr(), + dash_table.DataTable( + id="parameter-table", + columns=[{"name": "Label", "id": "label"}, + {"name": "Value", "id": "value"}, + {"name": "Unit", "id": "unit"}, + {"name": "Timestamp", "id": "timestamp"}, + {"name": "Name", "id": "name"}, + ], + style_cell={"fontFamily": "monospace", "padding": "2px 6px"}, + style_table={"max-height": "400px", "overflowY": "auto"}, + ), + ], + style={"font-family": "Source Sans Pro, sans-serif", "margin": "0 20px"}, + ) + + # ---------- 1) unpack every WebSocket message -------------------------- # + @app.callback( + Output("gate-geometry", "data"), + Output("parameters", "data"), + Input("ws", "message"), + prevent_initial_call=True, + ) + def _unpack_ws(msg): + if msg is None: + return no_update, no_update + + data = json.loads(msg["data"]) + + parameters = gate_geometry = no_update + if "parameters" in data: + parameters = data["parameters"] + if "gate_geometry" in data: + gate_geometry = data["gate_geometry"] + return gate_geometry, parameters + + @app.callback( + Output("voltages", "data"), + Input("parameters", "data"), + prevent_initial_call=True, + ) + def _select_voltages(parameters): + voltages = [] + for parameter in parameters: + assert "unit" in parameter, f"{parameter!r}" + if "V" in parameter["unit"]: + value = {attr: parameter[attr] for attr in ["name", "value", "label", "unit"]} + voltages.append(value) + return voltages + + @app.callback( + Output("parameter-table", "data"), + Input("parameters", "data"), + ) + def _parameter_table(parameters): + if not parameters: + return [] + + cols = ["label", "value", "unit", "timestamp", "name"] + return [ + {col: parameter[col] for col in cols} + for parameter in parameters + ] + + @app.callback( + Output("layout-graph", "figure"), + Input("gate-geometry", "data"), + Input("voltages", "data"), + Input("colorscale-dropdown", "value"), + prevent_initial_call=True, + ) + def _draw_layout(geom_str, volts, colorscale_selected): + if geom_str is None or volts is None: + raise dash.exceptions.PreventUpdate + + gates = string_to_gate_list(geom_str) + fig = gates_to_figure(gates, volts, colorscale_selected) + return fig + + return app + diff --git a/src/qumada/utils/device_server.py b/src/qumada/utils/device_server.py new file mode 100644 index 00000000..e6e763ca --- /dev/null +++ b/src/qumada/utils/device_server.py @@ -0,0 +1,199 @@ +import asyncio +import copy +import datetime +import pathlib +import threading +import time +import logging +import json + +import websockets +from websockets.asyncio.server import ServerConnection + +from qcodes.instrument.parameter import Parameter + +from qumada.measurement.device_object import QumadaDevice +from qumada.utils.geometry import gate_list_to_string, Gate, load_from_file + +logger = logging.getLogger(__name__) + + +def _get_parameter_data(parameter: Parameter, cache_only: bool = True): + value = parameter.cache.get(get_if_invalid=not cache_only) + timestamp = parameter.cache.timestamp + + return { + "value": value, + "timestamp": timestamp, + "name": parameter.full_name, + "label": parameter.label, + "unit": parameter.unit, + "vals": str(parameter.vals), + "instrument": parameter.instrument.full_name, + "instrument_class": parameter.instrument.__class__.__name__, + "root_instrument": parameter.root_instrument.full_name, + "root_instrument_class": parameter.root_instrument.__class__.__name__, + } + + +def _collect_data(*parameters: Parameter, cache_only: bool = True): + data = [] + for parameter in parameters: + try: + parameter_data = _get_parameter_data(parameter, cache_only=cache_only) + except Exception as e: + parameter_data = { + "exception": str(e) + } + data.append(parameter_data) + + return { + "parameters": data, + "timestamp": time.time(), + } + + +class DataCollector: + def __init__(self, *parameters: Parameter, cache_only: bool = True, minimal_update_delta: float = 1 / 30): + self.lock = asyncio.Lock() + self.parameters = parameters + self.cache_only = cache_only + self.minimal_update_delta = minimal_update_delta + self._data = None + + async def get_data(self): + # although it should be fine to use a blocking lock here, + # an async lock is the more robust option in case the surrounding code changes. + async with self.lock: + if self._data is None: + delta = float('inf') + else: + delta = time.time() - self._data['timestamp'] + if delta >= self.minimal_update_delta: + self._data = _collect_data(*self.parameters, cache_only=self.cache_only) + return self._data + + +class DeviceWebSocket(threading.Thread): + def __init__(self, collector: DataCollector, ip="127.200.200.9", port=6789): + super().__init__() + self.ip = ip + self.port = port + + self.collector = collector + self.gate_geometry = None + + self._loop = None + self._loop_control_future = threading.Event() + + #: data is re-sent after this time even if nothing changed + self.maximal_update_interval = 1. + + @property + def address(self): + return f"ws://{self.ip}:{self.port}" + + def run(self): + if self._loop is not None: + raise RuntimeError("Loop already set") + + logger.info(f"Starting websocket server on {self.ip}:{self.port}") + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + try: + self._loop.run_until_complete(self._serve()) + finally: + pending = asyncio.all_tasks(self._loop) + for task in pending: + task.cancel() + self._loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + self._loop.close() + + def stop(self) -> None: + """ + May be called from any thread. It: + 1. signals the serving coroutine to exit, + 2. schedules loop.stop() thread-safely, and + 3. leaves it to the caller to `join()` if they need to block. + """ + self._loop_control_future.set() + if self._loop is not None and self._loop.is_running(): + # Wake the event-loop so it notices _stop_event quickly + self._loop.call_soon_threadsafe(self._loop.stop) + + async def _serve(self) -> None: + """ + Start the WebSocket server and block until `stop()` is called. + """ + async with websockets.serve(self._handle_connection, self.ip, self.port) as server: + logger.info("WebSocket server listening on %s:%d", self.ip, self.port) + + # Poll the threading.Event without blocking the loop forever. + while not self._loop_control_future.is_set(): + await asyncio.sleep(0.2) + + logger.info("Shutdown requested – closing listener and connections") + server.close() + await server.wait_closed() + logger.info("WebSocket server closed") + + def join(self, timeout = None): + if self.is_alive(): + self.stop() + super().join(timeout) + + async def _handle_connection(self, connection: ServerConnection): + def default_to_json(o): + if isinstance(o, datetime.datetime): + return o.timestamp() + else: + raise NotImplementedError(f"Cannot serialize {o!r} of type {type(o)}") + + last_update = 0.0 + last_message = None + while True: + data = await self.collector.get_data() + + if self.gate_geometry is not None: + data["gate_geometry"] = gate_list_to_string(self.gate_geometry) + + serialized = json.dumps(data, default=default_to_json) + + if serialized != last_message or time.time() - last_update > self.maximal_update_interval: + try: + await connection.send(serialized) + except websockets.exceptions.ConnectionClosed: + logger.debug("Connection closed") + break + else: + last_message = serialized + last_update = time.time() + + try: + await asyncio.sleep(self.collector.minimal_update_delta) + except asyncio.CancelledError: + logger.debug("Cancelled") + break + logger.info("Connection terminated") + + + self.monitor_socket = None + + +def start_monitor_socket(device_object: QumadaDevice, gate_geometry: list[Gate] | str | pathlib.Path = None): + if getattr(device_object, 'monitor_socket', None) is not None: + logger.info("Monitor socket already present. Restarting.") + device_object.monitor_socket.join() + parameters = [param for parameters in device_object.terminal_parameters.values() for param in parameters.values()] + + collector = DataCollector(*parameters) + device_object.monitor_socket = DeviceWebSocket(collector) + device_object.monitor_socket.start() + + if gate_geometry is not None: + if not isinstance(gate_geometry, list): + from qumada.utils.dxf import load_convert_and_cache + gate_geometry = load_convert_and_cache(gate_geometry) + + device_object.monitor_socket.gate_geometry = gate_geometry diff --git a/src/qumada/utils/dxf.py b/src/qumada/utils/dxf.py new file mode 100644 index 00000000..47ebecd0 --- /dev/null +++ b/src/qumada/utils/dxf.py @@ -0,0 +1,309 @@ +import argparse +import copy +import pathlib +import re +import logging +import subprocess +import sys +from typing import Tuple + +import ezdxf.document +import matplotlib.widgets +from matplotlib import pyplot as plt +from shapely.geometry import ( + MultiLineString +) +from shapely.geometry import box, Polygon, LineString +from shapely.plotting import plot_polygon + +from qumada.utils.geometry import Gate, store_to_file, load_from_file + +SELECT_ALPHA = 0.3 + +logger = logging.getLogger(__name__) + + +def entity_to_geom(e): + """ + Best-effort conversion of an ezdxf entity to a Shapely geometry. + Extend this to cover more exotic entities as needed. + """ + if e.dxftype() == "LINE": + return LineString([e.dxf.start, e.dxf.end]) + + if e.dxftype() in {"LWPOLYLINE", "POLYLINE"}: + if hasattr(e, "get_points"): + points = e.get_points() + else: + points = e.points_in_wcs() + pts = [tuple(p)[:2] for p in points] # ignore bulge for now + closed = bool(e.closed) if hasattr(e, "closed") else pts[0] == pts[-1] + return Polygon(pts) if closed else LineString(pts) + + raise NotImplementedError(e.dxftype()) + + + +def iterate_all_entities(e, path = None): + if path is None: + path = [] + if e.dxftype() == "INSERT": + path.append(e.dxf.name) + for sub in e.virtual_entities(): + yield from iterate_all_entities(sub, path) + path.pop() + else: + yield e, list(path) + + + +def get_gates_from_cropped_region( + doc: ezdxf.document.Drawing, + x_rng: Tuple[float, float] = (-3., 3.), + y_rng: Tuple[float, float] = (-1.5, 1.5), + layer_regex: str = r".*BEAM\_L.", +) -> list[Gate]: + regex = re.compile(layer_regex) + + keep_layer = { + layer.dxf.name: bool(regex.search(layer.dxf.name)) + for layer in doc.layers + } + + xmin, xmax = x_rng + ymin, ymax = y_rng + roi_poly = Polygon(box(xmin, ymin, xmax, ymax)) + + chosen = [] + for model in doc.modelspace(): + for e, path in iterate_all_entities(model): + layer = e.dxf.layer + if not keep_layer[layer]: + continue + + raw_geom = entity_to_geom(e) + if raw_geom is None: + continue + + geom = Polygon(raw_geom).simplify(1e-3) + geom_roi = geom.intersection(roi_poly) + if geom_roi == roi_poly: + continue + + if geom_roi.is_empty: + continue + + boundary = geom.intersection(roi_poly.boundary) + + if not boundary.is_empty and isinstance(boundary, (LineString, MultiLineString)): + label_position_point = boundary.line_interpolate_point(0.5, normalized=True) + label_position = label_position_point.x, label_position_point.y + else: + label_position = None + + label = f"G{len(chosen)}" + if label == "G36": + pass + + gate = Gate( + polygon=geom_roi, + label_position=label_position, + label=label, + path=path, + layer=layer, + ) + chosen.append(gate) + + return chosen + + +def auto_merge(gates: list[Gate]): + # merge touching gates that do not touch the boundary + connected = [] + unconnected = [] + for gate in gates: + if gate.label_position: + connected.append(gate) + else: + unconnected.append(gate) + + logger.info("Connected: %d", len(connected)) + logger.info("Unconnected: %d", len(unconnected)) + + while unconnected: + temp = [] + for gate in unconnected: + for con_gate in connected: + if gate.layer != con_gate.layer: + continue + boundary = gate.polygon.intersection(con_gate.polygon, grid_size=1e-3) + if boundary.is_empty: + continue + + new_geom = gate.polygon.union(con_gate.polygon) + con_gate.polygon = new_geom + break + else: + temp.append(gate) + if len(temp) == len(unconnected): + break + unconnected = temp + + logger.info("Unconnected after auto-merge: %d", len(unconnected)) + for u in unconnected: + logger.debug("Unconnected %s in layer %r: %r", u.label, u.layer, u.polygon) + u.label_position = u.polygon.centroid.x, u.polygon.centroid.y + + return connected + unconnected + + +def label_gates(gates: list[Gate]) -> list[Gate]: + gates = [copy.deepcopy(gate) for gate in gates] + + axd = plt.figure(layout="constrained").subplot_mosaic( + """ + AAAA + BCDE + """, + height_ratios=[1, 0.1] + ) + ax = axd["A"] + fig = ax.get_figure() + prv = matplotlib.widgets.Button(ax=axd["B"], label="Previous") + txt = matplotlib.widgets.TextBox(ax=axd["C"], label="Name") + apply = matplotlib.widgets.Button(ax=axd["D"], label="Apply") + nxt = matplotlib.widgets.Button(ax=axd["E"], label="Next") + + layers = {gate.layer for gate in gates} + color_iter = iter(plt.rcParams['axes.prop_cycle'].by_key()['color']) + layer_colors = dict(zip(layers, color_iter)) + + + plots = [] + for idx, gate in enumerate(gates): + color = layer_colors[gate.layer] + poly_patch = plot_polygon( + gate.polygon, + facecolor="none", + color=color, + add_points=False, + ax=ax) + + poly_patch.gate_index = idx + poly_patch.set_picker(True) + + label_plot = ax.annotate(str(gate.label), + gate.label_position, + bbox=dict(boxstyle="round", fc="0.8"), + ha="center", va="center") + plots.append((poly_patch, label_plot)) + + def deselect_gate(idx): + poly_plot, label_plot = plots[idx] + poly_plot.set_facecolor("none") + txt.set_val("") + plt.draw() + + def select_gate(idx): + poly_plot, label_plot = plots[idx] + gate = gates[idx] + color = layer_colors[gate.layer] + poly_plot.set_facecolor((color, SELECT_ALPHA)) + if str(gate.label) != str(None): + txt.set_val(str(gate.label)) + plt.draw() + + + current_gate = 0 + select_gate(current_gate) + def apply_action(*_): + gate = gates[current_gate] + _, label_plot = plots[current_gate] + gate.label = txt.text + label_plot.set_text(txt.text) + plt.draw() + + def prev_action(*_): + nonlocal current_gate + deselect_gate(current_gate) + if current_gate == 0: + current_gate += len(gates) + current_gate -= 1 + select_gate(current_gate) + + def next_action(*_): + nonlocal current_gate + deselect_gate(current_gate) + current_gate += 1 + if current_gate == len(gates): + current_gate -= len(gates) + select_gate(current_gate) + + def pick_handler(event): + nonlocal current_gate + artist = event.artist + if hasattr(artist, "gate_index"): + gate_index = artist.gate_index + if gate_index != current_gate: + deselect_gate(current_gate) + current_gate = gate_index + select_gate(current_gate) + else: + raise NotImplementedError(event) + + prv.on_clicked(prev_action) + apply.on_clicked(apply_action) + nxt.on_clicked(next_action) + fig.canvas.mpl_connect('pick_event', pick_handler) + + widgets = [prv, apply, nxt, txt] + fig.widgets = widgets + + return gates + + +def load_convert_and_cache(path: pathlib.Path | str) -> list[Gate]: + path = pathlib.Path(path) + + if not path.exists(): + raise FileNotFoundError(path) + + if path.suffix == ".dxf": + dxf_path = path + json_path = path.with_suffix(".json") + + if not json_path.exists(): + subprocess.check_call([ + sys.executable, "-m", "qumada.utils.dxf", dxf_path]) + + + elif path.suffix == ".json": + json_path = path + + else: + raise ValueError("Only dxf and json are supported", path) + + return load_from_file(json_path) + + +def get_parser(): + import argparse + parser = argparse.ArgumentParser(description="Qumada dxf labeler") + parser.add_argument("dxf_path", type=pathlib.Path, help="path to dxf file") + parser.add_argument("--json-path", type=pathlib.Path, help="path to json file. default is the same as dxf with other ending", default=None) + return parser + + +if __name__ == "__main__": + matplotlib.use("Qt5Agg") + parser = get_parser() + args = parser.parse_args() + if args.json_path is None: + args.json_path = args.dxf_path.with_suffix(".json") + + doc = ezdxf.readfile(args.dxf_path) + raw_gates = get_gates_from_cropped_region(doc) + merged_gates = auto_merge(raw_gates) + gates = label_gates(merged_gates) + plt.show(block=True) + store_to_file(gates, args.json_path) diff --git a/src/qumada/utils/geometry.py b/src/qumada/utils/geometry.py new file mode 100644 index 00000000..4a44e25e --- /dev/null +++ b/src/qumada/utils/geometry.py @@ -0,0 +1,50 @@ +import dataclasses +import pathlib +import json + +from shapely.geometry import Point, LineString, Polygon +from shapely import wkt + +@dataclasses.dataclass +class Gate: + polygon: Polygon + path: list[str] + layer: str + label: str | None + label_position: tuple[float, float] + +def gate_list_to_string(gates: list[Gate]) -> str: + def to_json(o): + if isinstance(o, Gate): + return dataclasses.asdict(o) + elif isinstance(o, Polygon): + return o.wkt + else: + return o + + txt = json.dumps(gates, indent=2, default=to_json) + return txt + + +def string_to_gate_list(txt: str) -> list[Gate]: + data = json.loads(txt) + assert isinstance(data, list) + + gates = [] + for d in data: + d["polygon"] = wkt.loads(d["polygon"]) + d["label_position"] = tuple(d["label_position"]) + gates.append(Gate(**d)) + return gates + + +def store_to_file(gates: list[Gate], path: pathlib.Path): + path = pathlib.Path(path) + txt = gate_list_to_string(gates) + path.write_text(txt) + + +def load_from_file(path: pathlib.Path): + path = pathlib.Path(path) + txt = path.read_text() + return string_to_gate_list(txt) From 8d1b88fe1ec78e326c2aa5b8e445b567e7e678f3 Mon Sep 17 00:00:00 2001 From: Simon Humpohl Date: Wed, 16 Jul 2025 10:26:44 +0200 Subject: [PATCH 02/19] Add geometry documentation --- src/qumada/utils/geometry.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/qumada/utils/geometry.py b/src/qumada/utils/geometry.py index 4a44e25e..6695ff12 100644 --- a/src/qumada/utils/geometry.py +++ b/src/qumada/utils/geometry.py @@ -7,13 +7,18 @@ @dataclasses.dataclass class Gate: + """Representation of a gate electrode with the geometric properties and annotation information.""" polygon: Polygon path: list[str] layer: str label: str | None label_position: tuple[float, float] + def gate_list_to_string(gates: list[Gate]) -> str: + """Serialize a list of gates to a JSON string. + + The geometric information is stored in WKT format (well known text).""" def to_json(o): if isinstance(o, Gate): return dataclasses.asdict(o) @@ -27,6 +32,9 @@ def to_json(o): def string_to_gate_list(txt: str) -> list[Gate]: + """Deserialize a JSON string produced by :py:`.gate_list_to_string` to a list of gates. + """ + data = json.loads(txt) assert isinstance(data, list) @@ -39,12 +47,14 @@ def string_to_gate_list(txt: str) -> list[Gate]: def store_to_file(gates: list[Gate], path: pathlib.Path): + """Store a list of gates in a file in json format.""" path = pathlib.Path(path) txt = gate_list_to_string(gates) path.write_text(txt) def load_from_file(path: pathlib.Path): + """Load a list of gates from a JSON file produced with :py:`.store_to_file`.""" path = pathlib.Path(path) txt = path.read_text() return string_to_gate_list(txt) From 391b007b78dbedfcbd035b0570d6e26db6fde629 Mon Sep 17 00:00:00 2001 From: Simon Humpohl Date: Wed, 16 Jul 2025 10:27:19 +0200 Subject: [PATCH 03/19] Add dxf documentation and make code adhere to it --- src/qumada/utils/dxf.py | 97 ++++++++++++++++++++++++++--------------- 1 file changed, 62 insertions(+), 35 deletions(-) diff --git a/src/qumada/utils/dxf.py b/src/qumada/utils/dxf.py index 47ebecd0..555525a3 100644 --- a/src/qumada/utils/dxf.py +++ b/src/qumada/utils/dxf.py @@ -1,14 +1,19 @@ import argparse import copy +import math +import operator import pathlib import re import logging import subprocess import sys +import warnings +from copy import deepcopy from typing import Tuple import ezdxf.document import matplotlib.widgets +import shapely from matplotlib import pyplot as plt from shapely.geometry import ( MultiLineString @@ -62,7 +67,15 @@ def get_gates_from_cropped_region( x_rng: Tuple[float, float] = (-3., 3.), y_rng: Tuple[float, float] = (-1.5, 1.5), layer_regex: str = r".*BEAM\_L.", + grid_size: float = 1e-3, ) -> list[Gate]: + """Selects all polygons (POLYLINE entities) from the selected region that live in a layer matched by the given + regular expression. The polygons are cropped to the region and returned as :py:`.Gate` objects. + + :py:attr:`.Gate.label_position` is only assigned to gates that touch the boundary. + + In some cases, continuous gates are returned in multiple pieces. Use :py:`.auto_merge` to merge overlapping gates in the same layer. + """ regex = re.compile(layer_regex) keep_layer = { @@ -85,7 +98,7 @@ def get_gates_from_cropped_region( if raw_geom is None: continue - geom = Polygon(raw_geom).simplify(1e-3) + geom = Polygon(raw_geom).simplify(grid_size) geom_roi = geom.intersection(roi_poly) if geom_roi == roi_poly: continue @@ -93,6 +106,8 @@ def get_gates_from_cropped_region( if geom_roi.is_empty: continue + geom_roi = shapely.set_precision(geom_roi, grid_size=grid_size) + boundary = geom.intersection(roi_poly.boundary) if not boundary.is_empty and isinstance(boundary, (LineString, MultiLineString)): @@ -117,49 +132,61 @@ def get_gates_from_cropped_region( return chosen -def auto_merge(gates: list[Gate]): - # merge touching gates that do not touch the boundary - connected = [] - unconnected = [] +def _connect_all_touching(gates: list[Gate], grid_size: float): + assert len({gate.layer for gate in gates}) == 1 + + result = [] + not_intersecting = [] + intersecting = [] for gate in gates: - if gate.label_position: - connected.append(gate) - else: - unconnected.append(gate) - - logger.info("Connected: %d", len(connected)) - logger.info("Unconnected: %d", len(unconnected)) - - while unconnected: - temp = [] - for gate in unconnected: - for con_gate in connected: - if gate.layer != con_gate.layer: - continue - boundary = gate.polygon.intersection(con_gate.polygon, grid_size=1e-3) - if boundary.is_empty: - continue - - new_geom = gate.polygon.union(con_gate.polygon) - con_gate.polygon = new_geom - break + not_intersecting.clear() + intersecting.clear() + for other in result: + # boundary = gate.polygon.intersection(other.polygon, grid_size=grid_size) + if gate.polygon.intersects(other.polygon): + intersecting.append(other) else: - temp.append(gate) - if len(temp) == len(unconnected): - break - unconnected = temp + not_intersecting.append(other) + result.clear() + + if intersecting: + intersecting.append(gate) + label_position = None + for g in intersecting: + label_position = label_position or g.label_position + new_poly = shapely.union_all([g.polygon for g in intersecting], grid_size=grid_size) + to_append = intersecting[0] + to_append.polygon = new_poly + to_append.label_position = label_position + + result.append(to_append) + else: + result.append(copy.copy(gate)) - logger.info("Unconnected after auto-merge: %d", len(unconnected)) - for u in unconnected: - logger.debug("Unconnected %s in layer %r: %r", u.label, u.layer, u.polygon) - u.label_position = u.polygon.centroid.x, u.polygon.centroid.y + result.extend(not_intersecting) + return result - return connected + unconnected + +def auto_merge(gates: list[Gate], grid_size: float = 1e-3): + """Merges touching gates in the same layer""" + + by_layer = {} + for gate in gates: + by_layer.setdefault(gate.layer, []).append(gate) + + for layer, layer_gates in by_layer.items(): + connected = _connect_all_touching(layer_gates, grid_size) + by_layer[layer] = connected + + result = sum(by_layer.values(), start=[]) + return result def label_gates(gates: list[Gate]) -> list[Gate]: gates = [copy.deepcopy(gate) for gate in gates] + gates = sorted(gates, key=lambda gate: 0.0 if gate.label_position is None else math.atan2(*gate.label_position)) + axd = plt.figure(layout="constrained").subplot_mosaic( """ AAAA From 0d8f9ca8df3fef3fc603c96f0d1b64a1adfdd190 Mon Sep 17 00:00:00 2001 From: Simon Humpohl Date: Wed, 16 Jul 2025 10:28:38 +0200 Subject: [PATCH 04/19] More doc and fix warning --- src/qumada/utils/dxf.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/qumada/utils/dxf.py b/src/qumada/utils/dxf.py index 555525a3..2d2a8901 100644 --- a/src/qumada/utils/dxf.py +++ b/src/qumada/utils/dxf.py @@ -132,7 +132,8 @@ def get_gates_from_cropped_region( return chosen -def _connect_all_touching(gates: list[Gate], grid_size: float): +def _connect_all_touching(gates: list[Gate], grid_size: float) -> list[Gate]: + """Iteratively connects all touching gates "or"-ing their label positions.""" assert len({gate.layer for gate in gates}) == 1 result = [] @@ -331,6 +332,6 @@ def get_parser(): doc = ezdxf.readfile(args.dxf_path) raw_gates = get_gates_from_cropped_region(doc) merged_gates = auto_merge(raw_gates) - gates = label_gates(merged_gates) + resulting_gates = label_gates(merged_gates) plt.show(block=True) - store_to_file(gates, args.json_path) + store_to_file(resulting_gates, args.json_path) From 16b7de3d8936d3f78a08c8f836bfa26a783e6ce8 Mon Sep 17 00:00:00 2001 From: Simon Humpohl Date: Wed, 16 Jul 2025 13:07:21 +0200 Subject: [PATCH 05/19] Improve update logic --- src/qumada-monitor/app.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/qumada-monitor/app.py b/src/qumada-monitor/app.py index 0cf32bc6..cd89472a 100644 --- a/src/qumada-monitor/app.py +++ b/src/qumada-monitor/app.py @@ -120,7 +120,6 @@ def gates_to_figure(gates: list[Gate], voltages: list[dict], colorscale): yaxis=dict(visible=False), margin=dict(l=0, r=0, t=0, b=0), plot_bgcolor="white", - uirevision=gates, ) return fig @@ -213,14 +212,21 @@ def _parameter_table(parameters): Input("gate-geometry", "data"), Input("voltages", "data"), Input("colorscale-dropdown", "value"), - prevent_initial_call=True, ) def _draw_layout(geom_str, volts, colorscale_selected): if geom_str is None or volts is None: - raise dash.exceptions.PreventUpdate - - gates = string_to_gate_list(geom_str) - fig = gates_to_figure(gates, volts, colorscale_selected) + fig = go.Figure() + else: + gates = string_to_gate_list(geom_str) + fig = gates_to_figure(gates, volts, colorscale_selected) + + fig.update_layout( + xaxis=dict(scaleanchor="y", visible=False), + yaxis=dict(visible=False), + margin=dict(l=0, r=0, t=0, b=0), + plot_bgcolor="white", + uirevision=hash(geom_str), + ) return fig return app From ec78aefdf6ce7ab561b73f62ef6d914b5a1d284e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 1 Aug 2025 12:25:56 +0000 Subject: [PATCH 06/19] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- docs/device_object.rst | 2 +- src/qumada-monitor/app.py | 57 +++-- .../instrument/custom_drivers/ZI/MFLI.py | 10 +- .../instrument/mapping/Dummies/DummyDac.py | 8 +- .../instrument/mapping/Harvard/Decadac.py | 10 +- src/qumada/instrument/mapping/QDevil/qdac.py | 3 +- src/qumada/instrument/mapping/QDevil/qdac2.py | 2 +- src/qumada/instrument/mapping/base.py | 6 +- src/qumada/measurement/device_object.py | 210 +++++++++--------- src/qumada/measurement/measurement.py | 55 ++--- .../scripts/generic_measurement.py | 38 ++-- src/qumada/utils/device_server.py | 25 +-- src/qumada/utils/dxf.py | 81 +++---- src/qumada/utils/geometry.py | 10 +- src/qumada/utils/load_from_sqlite_db.py | 9 +- src/qumada/utils/plotting.py | 46 ++-- src/qumada/utils/ramp_parameter.py | 98 ++++---- 17 files changed, 341 insertions(+), 329 deletions(-) diff --git a/docs/device_object.rst b/docs/device_object.rst index 9b1116c4..3bbb8b8b 100644 --- a/docs/device_object.rst +++ b/docs/device_object.rst @@ -194,7 +194,7 @@ This stores all parameter values (of parameters that can be set). With you can reset it to the stored configuration. Alternatively you can use "device.save_state(name)" and "device.set_state(name)" to store and set multiple working points with -custom names. They can also be accessed via "device.states" in case you forgot the name. To store a state in json-file, use device.save_to_file(name, path), you can load it +custom names. They can also be accessed via "device.states" in case you forgot the name. To store a state in json-file, use device.save_to_file(name, path), you can load it via "device.load_state_from_file(name, path)" to load it (use "device.set_state(name)" afterwards to set it) or with "device.set_state_from_file(name, path)" to directly apply the configuration to the device. For all of those methods the parameters are ramped to the final state by default (with the default QuMada ramp rate), again depending on the "device.ramp" setting. diff --git a/src/qumada-monitor/app.py b/src/qumada-monitor/app.py index cd89472a..3bc52dde 100644 --- a/src/qumada-monitor/app.py +++ b/src/qumada-monitor/app.py @@ -5,20 +5,23 @@ Start with: python dash_device_monitor.py --ws ws://localhost:8765 --colorscale Cividis """ -import argparse, json, itertools, os, warnings +import argparse +import itertools +import json +import os +import warnings from collections import defaultdict -import plotly.graph_objects as go +import dash.exceptions import plotly.express as px +import plotly.graph_objects as go +from dash import Dash, Input, Output, State, dash_table, dcc, html, no_update +from dash_extensions import WebSocket # pip install dash-extensions from plotly.colors import sample_colorscale from shapely.geometry import Polygon -import dash.exceptions -from dash import Dash, html, dcc, dash_table, Input, Output, State, no_update -from dash_extensions import WebSocket # pip install dash-extensions - # ---------------- helpers -------------------------------------------------- # -from qumada.utils.geometry import string_to_gate_list, Gate +from qumada.utils.geometry import Gate, string_to_gate_list def layer_palette(layers): @@ -28,9 +31,9 @@ def layer_palette(layers): def voltage_colour(v, vabs_max, colorscale): - """Map voltage to rgba string using the chosen Plotly colourscale.""" # flat value + """Map voltage to rgba string using the chosen Plotly colourscale.""" # flat value ratio = 0.5 + v / vabs_max / 2 - return sample_colorscale(colorscale, [ratio])[0] # :contentReference[oaicite:9]{index=9} + return sample_colorscale(colorscale, [ratio])[0] # :contentReference[oaicite:9]{index=9} def _set_alpha(color: str, value) -> str: @@ -62,16 +65,14 @@ def gates_to_figure(gates: list[Gate], voltages: list[dict], colorscale): if len(candidates) > 1: data = min(candidates, key=lambda d: d["label"]) else: - data, = candidates + (data,) = candidates gate_runtime_info.append(data) # 1) derive colour mapping - v_values = [info["value"] - for info in gate_runtime_info - if info is not None] + v_values = [info["value"] for info in gate_runtime_info if info is not None] vabs_max = max(map(abs, v_values + [1.0])) layers = sorted({g.layer for g in gates}) - border_col = layer_palette(layers) # :contentReference[oaicite:10]{index=10} + border_col = layer_palette(layers) # :contentReference[oaicite:10]{index=10} fig = go.Figure() for gate, info in zip(gates, gate_runtime_info): @@ -98,22 +99,19 @@ def gates_to_figure(gates: list[Gate], voltages: list[dict], colorscale): go.Scatter( x=x, y=y, - fill="toself", # polygon fill trick :contentReference[oaicite:11]{index=11} + fill="toself", # polygon fill trick :contentReference[oaicite:11]{index=11} fillcolor=fill_col, line=dict(color=line_col, width=1), hoverinfo="text", text=text, showlegend=False, - mode='lines', + mode="lines", ) ) # add static label at predefined position if getattr(gate, "label_position", None): lx, ly = gate.label_position - fig.add_annotation(x=lx, y=ly, - text=text, - showarrow=False, - font=dict(size=10, color="black")) + fig.add_annotation(x=lx, y=ly, text=text, showarrow=False, font=dict(size=10, color="black")) fig.update_layout( xaxis=dict(scaleanchor="y", visible=False), @@ -146,12 +144,13 @@ def make_app(ws_url: str) -> Dash: html.Hr(), dash_table.DataTable( id="parameter-table", - columns=[{"name": "Label", "id": "label"}, - {"name": "Value", "id": "value"}, - {"name": "Unit", "id": "unit"}, - {"name": "Timestamp", "id": "timestamp"}, - {"name": "Name", "id": "name"}, - ], + columns=[ + {"name": "Label", "id": "label"}, + {"name": "Value", "id": "value"}, + {"name": "Unit", "id": "unit"}, + {"name": "Timestamp", "id": "timestamp"}, + {"name": "Name", "id": "name"}, + ], style_cell={"fontFamily": "monospace", "padding": "2px 6px"}, style_table={"max-height": "400px", "overflowY": "auto"}, ), @@ -202,10 +201,7 @@ def _parameter_table(parameters): return [] cols = ["label", "value", "unit", "timestamp", "name"] - return [ - {col: parameter[col] for col in cols} - for parameter in parameters - ] + return [{col: parameter[col] for col in cols} for parameter in parameters] @app.callback( Output("layout-graph", "figure"), @@ -230,4 +226,3 @@ def _draw_layout(geom_str, volts, colorscale_selected): return fig return app - diff --git a/src/qumada/instrument/custom_drivers/ZI/MFLI.py b/src/qumada/instrument/custom_drivers/ZI/MFLI.py index 3294c0e1..ca37f189 100644 --- a/src/qumada/instrument/custom_drivers/ZI/MFLI.py +++ b/src/qumada/instrument/custom_drivers/ZI/MFLI.py @@ -41,11 +41,13 @@ class MFLI(Instrument): """ def __init__( - self, name: str, device: str, + self, + name: str, + device: str, serverhost: str = "localhost", - existing_session: Session = None, - allow_version_mismatch = False, - **kwargs + existing_session: Session = None, + allow_version_mismatch=False, + **kwargs, ): super().__init__(name, **kwargs) if isinstance(existing_session, Session): diff --git a/src/qumada/instrument/mapping/Dummies/DummyDac.py b/src/qumada/instrument/mapping/Dummies/DummyDac.py index 3c8c97fe..ffe04a1c 100644 --- a/src/qumada/instrument/mapping/Dummies/DummyDac.py +++ b/src/qumada/instrument/mapping/Dummies/DummyDac.py @@ -60,8 +60,8 @@ def ramp( if not start_values: start_values = [param.get() for param in parameters] - - print("Ramping...") + + print("Ramping...") instrument._triggered_ramp_channels( [param._instrument for param in parameters], start_values, end_values, ramp_time, num_points ) @@ -96,7 +96,7 @@ def pulse( def setup_trigger_in(): pass - + def force_trigger(self): self._instrument.force_trigger() - self._instrument._is_triggered.clear() \ No newline at end of file + self._instrument._is_triggered.clear() diff --git a/src/qumada/instrument/mapping/Harvard/Decadac.py b/src/qumada/instrument/mapping/Harvard/Decadac.py index 28430dca..ed3dc20f 100644 --- a/src/qumada/instrument/mapping/Harvard/Decadac.py +++ b/src/qumada/instrument/mapping/Harvard/Decadac.py @@ -19,12 +19,14 @@ # - Till Huckeman +import logging + from qcodes.instrument.parameter import Parameter from qumada.instrument.custom_drivers.Harvard.Decadac import Decadac from qumada.instrument.mapping import DECADAC_MAPPING from qumada.instrument.mapping.base import InstrumentMapping -import logging + logger = logging.getLogger(__name__) @@ -159,6 +161,6 @@ def trigger_in(self, trigger: str | None) -> None: def force_trigger(self): string = "" for b in range(0, 6): - for c in range(0,6): - string+=f"B{b};C{c};G0;" - self._instrument.write(string) \ No newline at end of file + for c in range(0, 6): + string += f"B{b};C{c};G0;" + self._instrument.write(string) diff --git a/src/qumada/instrument/mapping/QDevil/qdac.py b/src/qumada/instrument/mapping/QDevil/qdac.py index 7c65d786..a96da913 100644 --- a/src/qumada/instrument/mapping/QDevil/qdac.py +++ b/src/qumada/instrument/mapping/QDevil/qdac.py @@ -30,6 +30,7 @@ class QDacMapping(InstrumentMapping): def __init__(self): super().__init__(QDAC_MAPPING) self.max_ramp_channels = 8 + def ramp( self, parameters: list[Parameter], @@ -72,7 +73,7 @@ def ramp( def setup_trigger_in(self): raise Exception("QDac does not have a trigger input!") - + def force_trigger(self): pass # Not required as QDac has no trigger input and starts ramps instantly. diff --git a/src/qumada/instrument/mapping/QDevil/qdac2.py b/src/qumada/instrument/mapping/QDevil/qdac2.py index ace4d7e2..65f30136 100644 --- a/src/qumada/instrument/mapping/QDevil/qdac2.py +++ b/src/qumada/instrument/mapping/QDevil/qdac2.py @@ -182,7 +182,7 @@ def setup_trigger_in(self): "QDac2 does not have a trigger input \ not yet supported!" ) - + def force_trigger(self): pass # Currently no trigger inputs are supported, thus all ramps are started diff --git a/src/qumada/instrument/mapping/base.py b/src/qumada/instrument/mapping/base.py index 99dd7fe9..7d41a2cd 100644 --- a/src/qumada/instrument/mapping/base.py +++ b/src/qumada/instrument/mapping/base.py @@ -76,8 +76,10 @@ def pulse( **kwargs, ) -> None: """Defining qumada pulse. Requires proper implementation for each instrument""" - raise Exception("Pulse not properly implemented for this instrument!\ - You cannot use pulsed measurements with this instrument.") + raise Exception( + "Pulse not properly implemented for this instrument!\ + You cannot use pulsed measurements with this instrument." + ) @abstractmethod def setup_trigger_in(self, trigger_settings: dict) -> None: diff --git a/src/qumada/measurement/device_object.py b/src/qumada/measurement/device_object.py index 4c5a410b..7e785232 100644 --- a/src/qumada/measurement/device_object.py +++ b/src/qumada/measurement/device_object.py @@ -1,12 +1,11 @@ from __future__ import annotations +import json import logging from abc import ABC from copy import deepcopy -from typing import Any from time import sleep -import json - +from typing import Any import numpy as np from qcodes import Station @@ -15,7 +14,10 @@ from qumada.instrument.buffers.buffer import map_triggers, save_trigger_mapping from qumada.instrument.mapping import map_terminals_gui -from qumada.instrument.mapping.base import load_mapped_terminal_parameters, save_mapped_terminal_parameters +from qumada.instrument.mapping.base import ( + load_mapped_terminal_parameters, + save_mapped_terminal_parameters, +) from qumada.measurement.measurement import MeasurementScript, load_param_whitelist from qumada.measurement.scripts import ( Generic_1D_Hysteresis_buffered, @@ -120,40 +122,40 @@ def set_state(self, name: str, ramp=None, **kwargs): Returns ------- None - """ + """ self.update_terminal_parameters() if ramp is None: ramp = self.ramp self.load_from_dict(self.states[name]) self.set_stored_values(ramp=ramp, **kwargs) - + def save_state_to_file(self, name: str, path: str): """ - Saves the specified state to a json file. - - Parameters - ---------- - name : str - The name of the state to save. - path : str - The file path where the state will be saved. Has to be a json file! - - Returns - ------- - None - - Notes - ----- - Before saving, any setpoints in terminal parameters are cleared by setting them to `None` - to avoid problems with json.dump. - """ + Saves the specified state to a json file. + + Parameters + ---------- + name : str + The name of the state to save. + path : str + The file path where the state will be saved. Has to be a json file! + + Returns + ------- + None + + Notes + ----- + Before saving, any setpoints in terminal parameters are cleared by setting them to `None` + to avoid problems with json.dump. + """ for t in self.terminals: for param in self.terminals[t].terminal_parameters: self.terminals[t].terminal_parameters[param].properties["setpoints"] = None state = self.states[name] with open(file=path, mode="w") as f: - json.dump(state, f) - + json.dump(state, f) + def load_state_from_file(self, name: str, path: str): """ Loads a state from a json file and stores it in the device object. @@ -169,10 +171,10 @@ def load_state_from_file(self, name: str, path: str): ------- None """ - with open(file=path, mode="r") as f: + with open(file=path) as f: state = json.load(f) self.states[name] = state - + def set_state_from_file(self, name: str, path: str): """ Sets the devices state by loading it from a json file. @@ -319,17 +321,17 @@ def save_to_dict(self, priorize_stored_value=False): logger.warning(f"Couldn't find value for {terminal_name} {param_name}") return return_dict - - def map_terminals(self, - terminal_parameters: None | dict = None, - path: None | str = None, - skip_gui_if_mapped: bool = True, - ): + def map_terminals( + self, + terminal_parameters: None | dict = None, + path: None | str = None, + skip_gui_if_mapped: bool = True, + ): """ Maps devices terminal parameters using map_terminals_gui. You can pass an existing mapping as terminal_parameters. If a path is provided it first tries to use the provided mapping file. - + Parameters ---------- terminal_parameters : None | dict, optional @@ -355,33 +357,35 @@ def map_terminals(self, if not isinstance(self.station, Station): raise TypeError("No valid qcodes station found. Make sure you have set the station attribute correctly!") if path is not None: - load_mapped_terminal_parameters(terminal_parameters, self.station, path) - map_terminals_gui(self.station.components, - self.terminal_parameters, - terminal_parameters, - skip_gui_if_mapped = skip_gui_if_mapped) + load_mapped_terminal_parameters(terminal_parameters, self.station, path) + map_terminals_gui( + self.station.components, + self.terminal_parameters, + terminal_parameters, + skip_gui_if_mapped=skip_gui_if_mapped, + ) self.update_terminal_parameters() - - def mapping(self, - terminal_parameters: None | dict = None, - path: None | str = None, - skip_gui_if_mapped = True, - ): - #TODO: Remove! - logger.warning("Deprecation Warning: device.mapping was renamed to \ + def mapping( + self, + terminal_parameters: None | dict = None, + path: None | str = None, + skip_gui_if_mapped=True, + ): + # TODO: Remove! + logger.warning( + "Deprecation Warning: device.mapping was renamed to \ device.map_terminals. Device.mapping will be removed \ - in a future release!") + in a future release!" + ) self.map_terminals(terminal_parameters, path, skip_gui_if_mapped) - - - def save_terminal_mapping(self, - path: str): + + def save_terminal_mapping(self, path: str): """ Save terminal mapping to specified file (json). """ save_mapped_terminal_parameters(self.terminal_parameters, path) - + def map_triggers( self, components: None | dict = None, @@ -394,7 +398,7 @@ def map_triggers( Uses components of assigned station by default. Ignores already mapped triggers by default. You can provide a path in order to load and existing mapping. - + Parameters ---------- components : None|dict, optional @@ -410,14 +414,9 @@ def map_triggers( """ if components is None: components = self.station.components - map_triggers(components=components, - skip_mapped=skip_mapped, - path=path, - kwargs=kwargs) - - def save_trigger_mapping( - self, - path: str): + map_triggers(components=components, skip_mapped=skip_mapped, path=path, kwargs=kwargs) + + def save_trigger_mapping(self, path: str): """ Save trigger mapping to json file. @@ -518,7 +517,7 @@ def timetrace( map_triggers(station.components) data = script.run() return data - + def sweep_1d( self, params: Parameter | list[Parameter], @@ -526,12 +525,12 @@ def sweep_1d( num_points: int = 100, dynamic_values: None | list[float] = None, backsweep: bool = False, - name = None, - metadata = None, - station = None, - buffered = False, + name=None, + metadata=None, + station=None, + buffered=False, buffer_settings: dict | None = None, - priorize_stored_value = False, + priorize_stored_value=False, **kwargs, ): """ @@ -552,8 +551,8 @@ def sweep_1d( Number of points measured in each sweep. Doubled if backsweep is True. dynamic_values : None | list[float], optional List of values for the dynamic parameters (only if a list of params is provided). - Ignored if only one parameter is passed. - Parameters are kept at this value during the ramps of the other parameters. + Ignored if only one parameter is passed. + Parameters are kept at this value during the ramps of the other parameters. Current values of the parameters are used if it is None. Default is None. backsweep : bool, optional @@ -588,7 +587,7 @@ def sweep_1d( Notes ----- - Again: This measurement can only do linear ramps! - - Does one measurement for each param provided. + - Does one measurement for each param provided. - Ignores other dynamic parameters that are not in params - Records only gettable parameters. """ @@ -603,49 +602,55 @@ def sweep_1d( if dynamic_values is None: dynamic_values = [param() for param in params] assert len(params) == len(dynamic_values) - else: + else: assert len(params) == len(dynamic_values) for param, val in zip(params, dynamic_values): param(val) for param, setpoint, val in zip(params, sweep_range, dynamic_values): - data.append(*param.measured_ramp( - value = setpoint[-1], - num_points=len(setpoint), - start=setpoint[0], - station=station, - name=name, - metadata=metadata, - backsweep=backsweep, - buffered=buffered, - buffer_settings=buffer_settings, - priorize_stored_value=priorize_stored_value, - )) + data.append( + *param.measured_ramp( + value=setpoint[-1], + num_points=len(setpoint), + start=setpoint[0], + station=station, + name=name, + metadata=metadata, + backsweep=backsweep, + buffered=buffered, + buffer_settings=buffer_settings, + priorize_stored_value=priorize_stored_value, + ) + ) param(val) - + elif isinstance(params, Terminal_Parameter): assert dynamic_values != list if dynamic_values is not None: val = dynamic_values else: val = params() - data.append(*params.measured_ramp( - value=sweep_range[-1], - num_points=num_points, - start=sweep_range[0], - station=station, - name=name, - metadata=metadata, - backsweep=backsweep, - buffered=buffered, - buffer_settings=buffer_settings, - priorize_stored_value=priorize_stored_value, - )) + data.append( + *params.measured_ramp( + value=sweep_range[-1], + num_points=num_points, + start=sweep_range[0], + station=station, + name=name, + metadata=metadata, + backsweep=backsweep, + buffered=buffered, + buffer_settings=buffer_settings, + priorize_stored_value=priorize_stored_value, + ) + ) params(val) return data - + def sweep_2D(): - logger.exception("Deprecation Warning: sweep_2D was renamed to sweep_2d \ - for better naming consistency!") + logger.exception( + "Deprecation Warning: sweep_2D was renamed to sweep_2d \ + for better naming consistency!" + ) def sweep_2d( self, @@ -1256,11 +1261,10 @@ def value(self, value): self._value = value self.instrument_parameter(value) - @value.getter def value(self): return self.instrument_parameter() - + @property def instrument_parameter(self): return self._instrument_parameter diff --git a/src/qumada/measurement/measurement.py b/src/qumada/measurement/measurement.py index 4be66abd..77a966b5 100644 --- a/src/qumada/measurement/measurement.py +++ b/src/qumada/measurement/measurement.py @@ -31,8 +31,8 @@ from contextlib import suppress from datetime import datetime from functools import wraps -from typing import Any, Callable from time import sleep +from typing import Any, Callable import numpy as np import qcodes as qc @@ -44,7 +44,7 @@ from qumada.instrument.buffers import is_bufferable, is_triggerable from qumada.metadata import Metadata from qumada.utils.ramp_parameter import ramp_or_set_parameter, ramp_or_set_parameters -from qumada.utils.utils import flatten_array, _validate_mapping +from qumada.utils.utils import _validate_mapping, flatten_array logger = logging.getLogger(__name__) @@ -449,7 +449,7 @@ def generate_lists(self) -> None: } self.trigger_ins = { param.root_instrument._qumada_mapping for param in self.dynamic_channels if is_triggerable(param) - } #Independent of self.buffered to allow parallel ramping for unbuffered measurement initialization. + } # Independent of self.buffered to allow parallel ramping for unbuffered measurement initialization. self.sort_by_priority() self._lists_created = True self._relabel_instruments() @@ -495,7 +495,7 @@ def initialize(self, dyn_ramp_to_val=False, inactive_dyn_channels: list | None = setpoint_intervall = self.settings.get("setpoint_intervall", 0.1) trigger_start = self.settings.get("trigger_start", "software") # TODO: this should be set elsewhere trigger_reset = self.settings.get("trigger_reset", None) - trigger_type = self.settings.get("trigger_type", None), + trigger_type = (self.settings.get("trigger_type", None),) if not self._lists_created: self.generate_lists() # for item in self.compensated_parameters: @@ -663,10 +663,16 @@ def initialize(self, dyn_ramp_to_val=False, inactive_dyn_channels: list | None = else: raise Exception(f"{gettable_param} is not bufferable.") self.ready_triggers() - ramp_or_set_parameters(ramp_params, ramp_targets, ramp_rate, - ramp_time, setpoint_intervall, trigger_start=trigger_start, - trigger_type=trigger_type, trigger_reset=trigger_reset) - + ramp_or_set_parameters( + ramp_params, + ramp_targets, + ramp_rate, + ramp_time, + setpoint_intervall, + trigger_start=trigger_start, + trigger_type=trigger_type, + trigger_reset=trigger_reset, + ) @abstractmethod def run(self) -> list: @@ -676,7 +682,6 @@ def run(self) -> list: """ return [] - def clean_up(self, additional_actions: list[Callable] | None = None, **kwargs) -> None: """ Things to do after the measurement is complete. Cleans up subscribed paramteres for @@ -704,7 +709,7 @@ def ready_buffers(self, **kwargs) -> None: buffer.setup_buffer(settings=self.buffer_settings) buffer.start() self.ready_triggers() - + def ready_triggers(self, **kwargs): """ Prepare trigger inputs for not buffered instruments. @@ -785,8 +790,8 @@ def _insert_metadata_into_db(self, *args, insert_metadata_into_db: bool = True, metadata.save() except Exception as ex: print(f"Metadata could not inserted into database: {ex}") - - def trigger_measurement(self, parameters, setpoints, method = "ramp" ,sync_trigger=None): + + def trigger_measurement(self, parameters, setpoints, method="ramp", sync_trigger=None): TRIGGER_TYPES = ["software", "hardware", "manual"] trigger_start = self.settings.get("trigger_start", "manual") # TODO: this should be set elsewhere @@ -797,28 +802,28 @@ def trigger_measurement(self, parameters, setpoints, method = "ramp" ,sync_trigg default="software", default_key_error="software", ) - setpoints_mapping = {param : setpoint for param, setpoint in zip(parameters, setpoints)} + setpoints_mapping = {param: setpoint for param, setpoint in zip(parameters, setpoints)} if sync_trigger is None: sync_trigger = () buffer_timeout_multiplier = self.settings.get("buffer_timeout_multiplier", 20) # Some logic to sort instruments. Instruments with sync-triggers have to be added last, # as executing _qumada_pulse/_ramp with them instantly runs the pulse/ramp, before other instruments - # that wait for a trigger signal are added and prepared. + # that wait for a trigger signal are added and prepared. instruments_set = {param.root_instrument for param in parameters} instruments = [instrument for instrument in instruments_set if instrument not in sync_trigger] for instrument in instruments_set: if instrument in sync_trigger: instruments.append(instrument) - + for instr in instruments: instr_params = [param for param in parameters if param.root_instrument is instr] if method == "ramp": try: instr._qumada_ramp( - parameters = instr_params, - end_values = [setpoints_mapping[param][-1] for param in instr_params], - ramp_time = self._burst_duration, - sync_trigger = sync_trigger, + parameters=instr_params, + end_values=[setpoints_mapping[param][-1] for param in instr_params], + ramp_time=self._burst_duration, + sync_trigger=sync_trigger, ) except AttributeError as ex: logger.error( @@ -828,14 +833,14 @@ def trigger_measurement(self, parameters, setpoints, method = "ramp" ,sync_trigg Use the unbuffered script!" ) raise ex - + elif method == "pulse": try: instr._qumada_pulse( - parameters = instr_params, - setpoints = [setpoints_mapping[param] for param in instr_params], - delay = self._burst_duration / self.buffered_num_points, - sync_trigger = sync_trigger, + parameters=instr_params, + setpoints=[setpoints_mapping[param] for param in instr_params], + delay=self._burst_duration / self.buffered_num_points, + sync_trigger=sync_trigger, ) except AttributeError as ex: logger.error( @@ -849,7 +854,7 @@ def trigger_measurement(self, parameters, setpoints, method = "ramp" ,sync_trigg with NameError as ex: logger.error("Argument 'method' has to be eiter 'ramp' or 'pulse'") raise ex - + if trigger_type == "manual": logger.warning( "You are using manual triggering. If you want to pulse parameters on multiple" diff --git a/src/qumada/measurement/scripts/generic_measurement.py b/src/qumada/measurement/scripts/generic_measurement.py index 0156fa50..12e45c0a 100644 --- a/src/qumada/measurement/scripts/generic_measurement.py +++ b/src/qumada/measurement/scripts/generic_measurement.py @@ -558,17 +558,19 @@ def run(self): self.ready_buffers() t = time() - start try: - self.trigger_measurement(parameters = self.dynamic_channels, - setpoints = [sweep.get_setpoints() for sweep in self.dynamic_sweeps], - method="ramp", - sync_trigger=sync_trigger, - ) - + self.trigger_measurement( + parameters=self.dynamic_channels, + setpoints=[sweep.get_setpoints() for sweep in self.dynamic_sweeps], + method="ramp", + sync_trigger=sync_trigger, + ) + results = self.readout_buffers(timestamps=True) dynamic_param_results = [ - (dyn_channel, sweep.get_setpoints()) for dyn_channel, sweep in zip( - self.dynamic_channels, self.dynamic_sweeps)] - results.pop(-1) #removes timestamps from results + (dyn_channel, sweep.get_setpoints()) + for dyn_channel, sweep in zip(self.dynamic_channels, self.dynamic_sweeps) + ] + results.pop(-1) # removes timestamps from results datasaver.add_result( (timer, t), *dynamic_param_results, @@ -585,7 +587,7 @@ def run(self): raise ex except TimeoutError: logger.error(f"A timeout error occured. Skipping line at time {t}.") - #results = self.readout_buffers(timestamps=True) + # results = self.readout_buffers(timestamps=True) self.clean_up() datasets.append(datasaver.dataset) return datasets @@ -736,13 +738,15 @@ def run(self): results = [] self.ready_buffers() self.trigger_measurement( - parameters = [dynamic_param, *self.active_compensating_channels], - setpoints = [dynamic_sweep.get_setpoints(), - *[sweep.get_setpoints for sweep in active_comping_sweeps]], - method = "ramp", - sync_trigger=sync_trigger - ) - + parameters=[dynamic_param, *self.active_compensating_channels], + setpoints=[ + dynamic_sweep.get_setpoints(), + *[sweep.get_setpoints for sweep in active_comping_sweeps], + ], + method="ramp", + sync_trigger=sync_trigger, + ) + results = self.readout_buffers() comp_results = [] for ch, sw in zip(self.active_compensating_channels, active_comping_sweeps): diff --git a/src/qumada/utils/device_server.py b/src/qumada/utils/device_server.py index e6e763ca..174bf601 100644 --- a/src/qumada/utils/device_server.py +++ b/src/qumada/utils/device_server.py @@ -1,19 +1,18 @@ import asyncio import copy import datetime +import json +import logging import pathlib import threading import time -import logging -import json import websockets -from websockets.asyncio.server import ServerConnection - from qcodes.instrument.parameter import Parameter +from websockets.asyncio.server import ServerConnection from qumada.measurement.device_object import QumadaDevice -from qumada.utils.geometry import gate_list_to_string, Gate, load_from_file +from qumada.utils.geometry import Gate, gate_list_to_string, load_from_file logger = logging.getLogger(__name__) @@ -42,9 +41,7 @@ def _collect_data(*parameters: Parameter, cache_only: bool = True): try: parameter_data = _get_parameter_data(parameter, cache_only=cache_only) except Exception as e: - parameter_data = { - "exception": str(e) - } + parameter_data = {"exception": str(e)} data.append(parameter_data) return { @@ -66,9 +63,9 @@ async def get_data(self): # an async lock is the more robust option in case the surrounding code changes. async with self.lock: if self._data is None: - delta = float('inf') + delta = float("inf") else: - delta = time.time() - self._data['timestamp'] + delta = time.time() - self._data["timestamp"] if delta >= self.minimal_update_delta: self._data = _collect_data(*self.parameters, cache_only=self.cache_only) return self._data @@ -87,7 +84,7 @@ def __init__(self, collector: DataCollector, ip="127.200.200.9", port=6789): self._loop_control_future = threading.Event() #: data is re-sent after this time even if nothing changed - self.maximal_update_interval = 1. + self.maximal_update_interval = 1.0 @property def address(self): @@ -138,7 +135,7 @@ async def _serve(self) -> None: await server.wait_closed() logger.info("WebSocket server closed") - def join(self, timeout = None): + def join(self, timeout=None): if self.is_alive(): self.stop() super().join(timeout) @@ -177,12 +174,11 @@ def default_to_json(o): break logger.info("Connection terminated") - self.monitor_socket = None def start_monitor_socket(device_object: QumadaDevice, gate_geometry: list[Gate] | str | pathlib.Path = None): - if getattr(device_object, 'monitor_socket', None) is not None: + if getattr(device_object, "monitor_socket", None) is not None: logger.info("Monitor socket already present. Restarting.") device_object.monitor_socket.join() parameters = [param for parameters in device_object.terminal_parameters.values() for param in parameters.values()] @@ -194,6 +190,7 @@ def start_monitor_socket(device_object: QumadaDevice, gate_geometry: list[Gate] if gate_geometry is not None: if not isinstance(gate_geometry, list): from qumada.utils.dxf import load_convert_and_cache + gate_geometry = load_convert_and_cache(gate_geometry) device_object.monitor_socket.gate_geometry = gate_geometry diff --git a/src/qumada/utils/dxf.py b/src/qumada/utils/dxf.py index 2d2a8901..87d9621c 100644 --- a/src/qumada/utils/dxf.py +++ b/src/qumada/utils/dxf.py @@ -1,10 +1,10 @@ import argparse import copy +import logging import math import operator import pathlib import re -import logging import subprocess import sys import warnings @@ -15,13 +15,10 @@ import matplotlib.widgets import shapely from matplotlib import pyplot as plt -from shapely.geometry import ( - MultiLineString -) -from shapely.geometry import box, Polygon, LineString +from shapely.geometry import LineString, MultiLineString, Polygon, box from shapely.plotting import plot_polygon -from qumada.utils.geometry import Gate, store_to_file, load_from_file +from qumada.utils.geometry import Gate, load_from_file, store_to_file SELECT_ALPHA = 0.3 @@ -48,8 +45,7 @@ def entity_to_geom(e): raise NotImplementedError(e.dxftype()) - -def iterate_all_entities(e, path = None): +def iterate_all_entities(e, path=None): if path is None: path = [] if e.dxftype() == "INSERT": @@ -61,11 +57,10 @@ def iterate_all_entities(e, path = None): yield e, list(path) - def get_gates_from_cropped_region( doc: ezdxf.document.Drawing, - x_rng: Tuple[float, float] = (-3., 3.), - y_rng: Tuple[float, float] = (-1.5, 1.5), + x_rng: tuple[float, float] = (-3.0, 3.0), + y_rng: tuple[float, float] = (-1.5, 1.5), layer_regex: str = r".*BEAM\_L.", grid_size: float = 1e-3, ) -> list[Gate]: @@ -78,13 +73,10 @@ def get_gates_from_cropped_region( """ regex = re.compile(layer_regex) - keep_layer = { - layer.dxf.name: bool(regex.search(layer.dxf.name)) - for layer in doc.layers - } + keep_layer = {layer.dxf.name: bool(regex.search(layer.dxf.name)) for layer in doc.layers} - xmin, xmax = x_rng - ymin, ymax = y_rng + xmin, xmax = x_rng + ymin, ymax = y_rng roi_poly = Polygon(box(xmin, ymin, xmax, ymax)) chosen = [] @@ -115,11 +107,11 @@ def get_gates_from_cropped_region( label_position = label_position_point.x, label_position_point.y else: label_position = None - + label = f"G{len(chosen)}" if label == "G36": pass - + gate = Gate( polygon=geom_roi, label_position=label_position, @@ -128,7 +120,7 @@ def get_gates_from_cropped_region( layer=layer, ) chosen.append(gate) - + return chosen @@ -154,7 +146,7 @@ def _connect_all_touching(gates: list[Gate], grid_size: float) -> list[Gate]: intersecting.append(gate) label_position = None for g in intersecting: - label_position = label_position or g.label_position + label_position = label_position or g.label_position new_poly = shapely.union_all([g.polygon for g in intersecting], grid_size=grid_size) to_append = intersecting[0] to_append.polygon = new_poly @@ -193,7 +185,7 @@ def label_gates(gates: list[Gate]) -> list[Gate]: AAAA BCDE """, - height_ratios=[1, 0.1] + height_ratios=[1, 0.1], ) ax = axd["A"] fig = ax.get_figure() @@ -201,29 +193,22 @@ def label_gates(gates: list[Gate]) -> list[Gate]: txt = matplotlib.widgets.TextBox(ax=axd["C"], label="Name") apply = matplotlib.widgets.Button(ax=axd["D"], label="Apply") nxt = matplotlib.widgets.Button(ax=axd["E"], label="Next") - + layers = {gate.layer for gate in gates} - color_iter = iter(plt.rcParams['axes.prop_cycle'].by_key()['color']) + color_iter = iter(plt.rcParams["axes.prop_cycle"].by_key()["color"]) layer_colors = dict(zip(layers, color_iter)) - plots = [] for idx, gate in enumerate(gates): color = layer_colors[gate.layer] - poly_patch = plot_polygon( - gate.polygon, - facecolor="none", - color=color, - add_points=False, - ax=ax) - + poly_patch = plot_polygon(gate.polygon, facecolor="none", color=color, add_points=False, ax=ax) + poly_patch.gate_index = idx poly_patch.set_picker(True) - - label_plot = ax.annotate(str(gate.label), - gate.label_position, - bbox=dict(boxstyle="round", fc="0.8"), - ha="center", va="center") + + label_plot = ax.annotate( + str(gate.label), gate.label_position, bbox=dict(boxstyle="round", fc="0.8"), ha="center", va="center" + ) plots.append((poly_patch, label_plot)) def deselect_gate(idx): @@ -241,9 +226,9 @@ def select_gate(idx): txt.set_val(str(gate.label)) plt.draw() - current_gate = 0 select_gate(current_gate) + def apply_action(*_): gate = gates[current_gate] _, label_plot = plots[current_gate] @@ -266,7 +251,7 @@ def next_action(*_): if current_gate == len(gates): current_gate -= len(gates) select_gate(current_gate) - + def pick_handler(event): nonlocal current_gate artist = event.artist @@ -278,12 +263,12 @@ def pick_handler(event): select_gate(current_gate) else: raise NotImplementedError(event) - + prv.on_clicked(prev_action) apply.on_clicked(apply_action) nxt.on_clicked(next_action) - fig.canvas.mpl_connect('pick_event', pick_handler) - + fig.canvas.mpl_connect("pick_event", pick_handler) + widgets = [prv, apply, nxt, txt] fig.widgets = widgets @@ -301,9 +286,7 @@ def load_convert_and_cache(path: pathlib.Path | str) -> list[Gate]: json_path = path.with_suffix(".json") if not json_path.exists(): - subprocess.check_call([ - sys.executable, "-m", "qumada.utils.dxf", dxf_path]) - + subprocess.check_call([sys.executable, "-m", "qumada.utils.dxf", dxf_path]) elif path.suffix == ".json": json_path = path @@ -316,9 +299,15 @@ def load_convert_and_cache(path: pathlib.Path | str) -> list[Gate]: def get_parser(): import argparse + parser = argparse.ArgumentParser(description="Qumada dxf labeler") parser.add_argument("dxf_path", type=pathlib.Path, help="path to dxf file") - parser.add_argument("--json-path", type=pathlib.Path, help="path to json file. default is the same as dxf with other ending", default=None) + parser.add_argument( + "--json-path", + type=pathlib.Path, + help="path to json file. default is the same as dxf with other ending", + default=None, + ) return parser diff --git a/src/qumada/utils/geometry.py b/src/qumada/utils/geometry.py index 6695ff12..9aa49690 100644 --- a/src/qumada/utils/geometry.py +++ b/src/qumada/utils/geometry.py @@ -1,13 +1,15 @@ import dataclasses -import pathlib import json +import pathlib -from shapely.geometry import Point, LineString, Polygon from shapely import wkt +from shapely.geometry import LineString, Point, Polygon + @dataclasses.dataclass class Gate: """Representation of a gate electrode with the geometric properties and annotation information.""" + polygon: Polygon path: list[str] layer: str @@ -19,6 +21,7 @@ def gate_list_to_string(gates: list[Gate]) -> str: """Serialize a list of gates to a JSON string. The geometric information is stored in WKT format (well known text).""" + def to_json(o): if isinstance(o, Gate): return dataclasses.asdict(o) @@ -32,8 +35,7 @@ def to_json(o): def string_to_gate_list(txt: str) -> list[Gate]: - """Deserialize a JSON string produced by :py:`.gate_list_to_string` to a list of gates. - """ + """Deserialize a JSON string produced by :py:`.gate_list_to_string` to a list of gates.""" data = json.loads(txt) assert isinstance(data, list) diff --git a/src/qumada/utils/load_from_sqlite_db.py b/src/qumada/utils/load_from_sqlite_db.py index 74fa6235..2d7d946f 100644 --- a/src/qumada/utils/load_from_sqlite_db.py +++ b/src/qumada/utils/load_from_sqlite_db.py @@ -24,6 +24,7 @@ """ from __future__ import annotations +import re from os import path import numpy as np @@ -32,7 +33,6 @@ from qcodes.dataset.plotting import plot_dataset from qumada.utils.browsefiles import browsefiles -import re # %% @@ -216,7 +216,7 @@ def pick_measurements(sample_name: str = None, preview_dialogue=False, measureme load_db() return pick_measurements(preview_dialogue=preview_dialogue, measurement_list=measurement_list) elif re.match(pattern, chosen): - for i in range(int(match.group(1)), int(match.group(2))+1): + for i in range(int(match.group(1)), int(match.group(2)) + 1): measurement_list.append(measurements[i]) else: chosen = int(chosen) @@ -289,14 +289,17 @@ def get_parameter_data(dataset=None, parameter_name=None, **kwargs): ) return zip(params, data, units, labels) + # %% -def get_parameter_name_by_label(dataset=None, parameter_label=None, appendix = ""): + +def get_parameter_name_by_label(dataset=None, parameter_label=None, appendix=""): for param in dataset.get_parameters(): if param.label == parameter_label + appendix: return param.name return None + # %% def separate_up_down(x_data, y_data): grad = np.gradient(x_data) diff --git a/src/qumada/utils/plotting.py b/src/qumada/utils/plotting.py index fdd7e7b0..9571b1c2 100644 --- a/src/qumada/utils/plotting.py +++ b/src/qumada/utils/plotting.py @@ -171,7 +171,7 @@ def plot_2D( if args: x_data, y_data, z_data = _handle_overload(x_data, y_data, z_data, *args, output_dimension=3) if ax is None or fig is None: - fig, ax = plt.subplots(figsize = kwargs.get("figsize", (10,10))) + fig, ax = plt.subplots(figsize=kwargs.get("figsize", (10, 10))) # Skalierung der Achsendaten und Einheiten x_values, y_values, z_values = x_data[1], y_data[1], z_data[1] @@ -196,7 +196,7 @@ def plot_2D( # Plotten der 2D-Daten im = ax.pcolormesh(x, y, z, shading="auto") cbar = fig.colorbar(im, ax=ax) - + if z_label is None: cbar.set_label(f"{z_data[3]} ({z_unit})") else: @@ -341,12 +341,12 @@ def plot_multiple_datasets( scale_axis=True, legend=True, exclude_string_from_legend: list = ["1D Sweep"], - legend_entries: None|list = None, - save_to_file = None, - close = False, - x_label = None, - y_label = None, - color_map = None, + legend_entries: None | list = None, + save_to_file=None, + close=False, + x_label=None, + y_label=None, + color_map=None, **kwargs, ): """ @@ -442,15 +442,15 @@ def plot_multiple_datasets( if font is not None: matplotlib.rc("font", 30) matplotlib.rc("font", size=40) - default_colors = plt.rcParams['axes.prop_cycle'].by_key()['color'] + default_colors = plt.rcParams["axes.prop_cycle"].by_key()["color"] if ax is None or fig is None: - fig, ax = plt.subplots(figsize = kwargs.get("figsize", (10, 10))) + fig, ax = plt.subplots(figsize=kwargs.get("figsize", (10, 10))) x_labels = [] y_labels = [] for i in range(len(datasets)): if color_map is None: - color = default_colors[i % len(default_colors)] + color = default_colors[i % len(default_colors)] else: color = color_map[i] if legend_entries is None: @@ -464,12 +464,12 @@ def plot_multiple_datasets( x_name = x_axis_parameters_name[i] else: x_name = x_axis_parameters_name - + if isinstance(y_axis_parameters_name, list): y_name = y_axis_parameters_name[i] else: y_name = y_axis_parameters_name - + x, y = _handle_overload( *get_parameter_data(datasets[i], y_axis_parameters_name), x_name=x_name, @@ -497,10 +497,10 @@ def plot_multiple_datasets( if not optimize_hysteresis_legend: f_label += " backsweep" if optimize_hysteresis_legend is True: - # Only one legend entry per dataset (instead of one for each fore-/backsweep) + # Only one legend entry per dataset (instead of one for each fore-/backsweep) if j > 0: f_label = None - if j%2 == False: # ;-) Ensuring the first sweep marker is always filled + if j % 2 == False: # ;-) Ensuring the first sweep marker is always filled fill_style = "full" else: fill_style = "none" @@ -512,8 +512,8 @@ def plot_multiple_datasets( linestyle=kwargs.get("linestyle", ""), label=f_label, markersize=kwargs.get("markersize", 20), - color = color, - fillstyle = fill_style + color=color, + fillstyle=fill_style, ) else: plt.plot( @@ -523,14 +523,14 @@ def plot_multiple_datasets( linestyle=kwargs.get("linestyle", ""), label=label, markersize=kwargs.get("markersize", 20), - color = color, + color=color, ) # Scale axes and update labels if scale_axis is True: x_scaling_factor, x_units[0] = _rescale_axis(ax.xaxis, np.concatenate(x_data), x_units[0], "x") y_scaling_factor, y_units[0] = _rescale_axis(ax.yaxis, np.concatenate(y_data), y_units[0], "y") - + if x_label is None: plt.xlabel(f"{x_labels[0]} ({x_units[0]})") else: @@ -539,7 +539,7 @@ def plot_multiple_datasets( plt.ylabel(f"{y_labels[0]} ({y_units[0]})") else: plt.ylabel(f"{y_label} ({y_units[0]})") - + # Update x and y labels leg_entries = ax.legend().get_texts() if legend is True: @@ -547,9 +547,9 @@ def plot_multiple_datasets( loc=kwargs.get("legend_position", "upper left"), fontsize=kwargs.get("legend_fontsize", 35), markerscale=kwargs.get("legend_markerscale", 1), - ncol = kwargs.get("legend_ncols", int(len(leg_entries)/9)+1), - columnspacing = kwargs.get("legend_columnspacing", 0.2), - handletextpad = kwargs.get("legend_handletextpad", -0.7), + ncol=kwargs.get("legend_ncols", int(len(leg_entries) / 9) + 1), + columnspacing=kwargs.get("legend_columnspacing", 0.2), + handletextpad=kwargs.get("legend_handletextpad", -0.7), ) plt.tight_layout() if save_to_file is not None: diff --git a/src/qumada/utils/ramp_parameter.py b/src/qumada/utils/ramp_parameter.py index 3c93aa72..ae0e7d7a 100644 --- a/src/qumada/utils/ramp_parameter.py +++ b/src/qumada/utils/ramp_parameter.py @@ -23,13 +23,14 @@ import logging import time +from copy import deepcopy from math import isclose from qumada.utils.generate_sweeps import generate_sweep -from copy import deepcopy LOG = logging.getLogger(__name__) + def has_ramp_method(parameter): try: parameter.root_instrument._qumada_ramp @@ -37,13 +38,15 @@ def has_ramp_method(parameter): except AttributeError: return False + def has_pulse_method(parameter): try: parameter.root_instrument._qumada_pulse return True except AttributeError: return False - + + def has_force_trigger_method(parameter): try: parameter.root_instrument._qumada_mapping.force_trigger @@ -51,9 +54,11 @@ def has_force_trigger_method(parameter): except AttributeError: return False + class Unsweepable_parameter(Exception): pass + class Unsettable_parameter(Exception): pass @@ -119,7 +124,7 @@ def ramp_parameter( LOG.debug(f"ramp rate: {ramp_rate}") LOG.debug(f"ramp time: {ramp_time}") - if isinstance(current_value, float|int) and not isinstance(current_value, bool): + if isinstance(current_value, float | int) and not isinstance(current_value, bool): LOG.debug(f"target: {target}") if isclose(current_value, target, rel_tol=tolerance): LOG.debug("Target value is sufficiently close to current_value, no need to ramp") @@ -151,7 +156,7 @@ def ramp_parameter( LOG.debug(f"sweep: {sweep}") for value in sweep: parameter.set(value) - time.sleep(ramp_time/num_points) + time.sleep(ramp_time / num_points) return True else: raise Unsweepable_parameter("Parameter has non-float values") @@ -176,70 +181,78 @@ def ramp_or_set_parameter( parameter.set(target) except Unsettable_parameter: pass - + + def ramp_or_set_parameters( - parameters: list, - targets: list[float], - ramp_rate: float | list[float] = 0.3, - ramp_time: float | list[float] = 5, - setpoint_interval: float |list[float] = 0.1, - tolerance: float = 1e-5, - trigger_start = None, - trigger_type = "software", - trigger_reset = None, - sync_trigger = None): + parameters: list, + targets: list[float], + ramp_rate: float | list[float] = 0.3, + ramp_time: float | list[float] = 5, + setpoint_interval: float | list[float] = 0.1, + tolerance: float = 1e-5, + trigger_start=None, + trigger_type="software", + trigger_reset=None, + sync_trigger=None, +): instruments = {param.root_instrument for param in parameters} - instruments_dict = {} #Will contain instruments as keys and their params with targets as vals. - #Check requirements for parallel ramps. + instruments_dict = {} # Will contain instruments as keys and their params with targets as vals. + # Check requirements for parallel ramps. if trigger_type is not None: for instr in instruments: - if has_ramp_method(instr) and has_force_trigger_method(instr) and hasattr(instr._qumada_mapping, "max_ramp_channels"): - instruments_dict[instr] = [] #Only instruments supporting ramps are added! + if ( + has_ramp_method(instr) + and has_force_trigger_method(instr) + and hasattr(instr._qumada_mapping, "max_ramp_channels") + ): + instruments_dict[instr] = [] # Only instruments supporting ramps are added! # Loop groups params according to their instruments for later execution of ramps. for param, target in zip(parameters, targets): if param._settable is False: LOG.warning(f"{param} is not _settable and cannot be ramped!") continue - - current_value = param.get() - #TODO: Possibly further improvements with cached val or known start. + + current_value = param.get() + # TODO: Possibly further improvements with cached val or known start. # Check if parameter should be ramped or set. - if isinstance(current_value, float|int) and not isinstance(current_value, bool): + if isinstance(current_value, float | int) and not isinstance(current_value, bool): LOG.debug(f"current value: {current_value}, target: {target}") if isclose(current_value, target, rel_tol=tolerance): LOG.debug("Target value is sufficiently close to current_value, no need to ramp") continue - if param.root_instrument in instruments_dict.keys(): #Only instruments supporting ramps - instruments_dict[param.root_instrument].append((param,target)) - else: #Everything that cannot be ramped with instrument ramp is ramped/set here + if param.root_instrument in instruments_dict.keys(): # Only instruments supporting ramps + instruments_dict[param.root_instrument].append((param, target)) + else: # Everything that cannot be ramped with instrument ramp is ramped/set here ramp_or_set_parameter(param, target, ramp_rate, ramp_time, setpoint_interval) # Now go through all instruments supporting ramps and start the ramps for instr, values in instruments_dict.items(): counter = 1 param_helper = [] target_helper = [] - #Params and targets are added until the max number of simultaneously rampable - #channels is reached. Then ramp is started and new params are added. + # Params and targets are added until the max number of simultaneously rampable + # channels is reached. Then ramp is started and new params are added. for param, target in values: param_helper.append(param) target_helper.append(target) - #TODO: Triggering logic won't work if sync trigger is used to trigger another + # TODO: Triggering logic won't work if sync trigger is used to trigger another # DAC (e.g. QDac in combination with Decadac) - if counter%instr._qumada_mapping.max_ramp_channels == 0 or counter == len(values): - LOG.debug(f"Ramping {param_helper} to {target_helper}") + if counter % instr._qumada_mapping.max_ramp_channels == 0 or counter == len(values): + LOG.debug(f"Ramping {param_helper} to {target_helper}") if sync_trigger is not None: - LOG.exception("You are using a sync trigger for ramps outside measurements. \ + LOG.exception( + "You are using a sync trigger for ramps outside measurements. \ If the sync trigger is required to start an another DACs/AWGs ramp \ this will not work. (E.g. QDac and Decadac). If you only need it to \ - start data acquisitions you're fine.") + start data acquisitions you're fine." + ) instr._qumada_ramp( param_helper, - end_values = target_helper, - ramp_time = min(ramp_time, 1/ramp_rate), #TODO: Is that fine/Safe enough? - sync_trigger=None - ) + end_values=target_helper, + ramp_time=min(ramp_time, 1 / ramp_rate), # TODO: Is that fine/Safe enough? + sync_trigger=None, + ) instr._qumada_mapping.force_trigger() - #TODO: Force trigger for AWGs/DACs? + # TODO: Force trigger for AWGs/DACs? time.sleep(ramp_time) try: trigger_reset() @@ -247,11 +260,4 @@ def ramp_or_set_parameters( LOG.info("No method to reset the trigger defined.") param_helper = [] target_helper = [] - counter +=1 - - - - - - - + counter += 1 From 05f774d072681559c8ab0f08c5ea68438141a0bb Mon Sep 17 00:00:00 2001 From: Simon Humpohl Date: Fri, 1 Aug 2025 14:27:47 +0200 Subject: [PATCH 07/19] Add dependency installation instruction --- src/qumada-monitor/README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/qumada-monitor/README.md b/src/qumada-monitor/README.md index f0921013..4e3af727 100644 --- a/src/qumada-monitor/README.md +++ b/src/qumada-monitor/README.md @@ -1,6 +1,7 @@ # QuMADA Device Monitor -This is a dash app that is intended to monitor a qumada device. +This is a dash app that is intended to monitor a qumada device. To use it you need to install the `monitor` dependencies for example via +`pip install qumada[monitor]` or with a local editable install `pip install -e .[monitor]`. The feature that separates it from the qcodes monitor is the ability to display a proper layout. From 36d15f7604b01f2c4f2fc78e70e8fc99b90e796b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 1 Aug 2025 12:45:12 +0000 Subject: [PATCH 08/19] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/qumada-monitor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/qumada-monitor/README.md b/src/qumada-monitor/README.md index 4e3af727..88486418 100644 --- a/src/qumada-monitor/README.md +++ b/src/qumada-monitor/README.md @@ -1,6 +1,6 @@ # QuMADA Device Monitor -This is a dash app that is intended to monitor a qumada device. To use it you need to install the `monitor` dependencies for example via +This is a dash app that is intended to monitor a qumada device. To use it you need to install the `monitor` dependencies for example via `pip install qumada[monitor]` or with a local editable install `pip install -e .[monitor]`. The feature that separates it from the qcodes monitor is the ability to display a proper layout. From 832bfeead9e50e0c513bb21a3b3874ac41a6951e Mon Sep 17 00:00:00 2001 From: Simon Humpohl Date: Tue, 26 Aug 2025 13:01:57 +0200 Subject: [PATCH 09/19] Improve error message --- src/qumada/utils/dxf.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/qumada/utils/dxf.py b/src/qumada/utils/dxf.py index 87d9621c..9594dbab 100644 --- a/src/qumada/utils/dxf.py +++ b/src/qumada/utils/dxf.py @@ -286,7 +286,12 @@ def load_convert_and_cache(path: pathlib.Path | str) -> list[Gate]: json_path = path.with_suffix(".json") if not json_path.exists(): - subprocess.check_call([sys.executable, "-m", "qumada.utils.dxf", dxf_path]) + try: + _ = subprocess.run([sys.executable, "-m", "qumada.utils.dxf", dxf_path], check=True, stderr=subprocess.PIPE) + except subprocess.CalledProcessError as err: + err_msg = err.stderr.decode(errors='replace') + print("File conversion failed:\n", err_msg, file=sys.stderr) + raise elif path.suffix == ".json": json_path = path From f6aea4d5fb708e932bfbcd20e5c1d1cdde29a068 Mon Sep 17 00:00:00 2001 From: Simon Humpohl Date: Tue, 26 Aug 2025 13:02:11 +0200 Subject: [PATCH 10/19] Use any qt backend --- src/qumada/utils/dxf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/qumada/utils/dxf.py b/src/qumada/utils/dxf.py index 9594dbab..1823d7fc 100644 --- a/src/qumada/utils/dxf.py +++ b/src/qumada/utils/dxf.py @@ -317,7 +317,7 @@ def get_parser(): if __name__ == "__main__": - matplotlib.use("Qt5Agg") + matplotlib.use("qtagg") parser = get_parser() args = parser.parse_args() if args.json_path is None: From e3b1c452b227506fd14f7ecb703c4872c43a546b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 26 Aug 2025 11:03:13 +0000 Subject: [PATCH 11/19] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/qumada/utils/dxf.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/qumada/utils/dxf.py b/src/qumada/utils/dxf.py index 1823d7fc..3e95414e 100644 --- a/src/qumada/utils/dxf.py +++ b/src/qumada/utils/dxf.py @@ -287,9 +287,11 @@ def load_convert_and_cache(path: pathlib.Path | str) -> list[Gate]: if not json_path.exists(): try: - _ = subprocess.run([sys.executable, "-m", "qumada.utils.dxf", dxf_path], check=True, stderr=subprocess.PIPE) + _ = subprocess.run( + [sys.executable, "-m", "qumada.utils.dxf", dxf_path], check=True, stderr=subprocess.PIPE + ) except subprocess.CalledProcessError as err: - err_msg = err.stderr.decode(errors='replace') + err_msg = err.stderr.decode(errors="replace") print("File conversion failed:\n", err_msg, file=sys.stderr) raise From c11f490352901629cc53b2c38e5a2bca8b01fd34 Mon Sep 17 00:00:00 2001 From: Simon Humpohl Date: Thu, 28 Aug 2025 14:08:39 +0200 Subject: [PATCH 12/19] Improve gate extraction documentation and error messages --- src/qumada/utils/dxf.py | 97 +++++++++++++++++++++++++++++++++++------ 1 file changed, 84 insertions(+), 13 deletions(-) diff --git a/src/qumada/utils/dxf.py b/src/qumada/utils/dxf.py index 1823d7fc..7099b063 100644 --- a/src/qumada/utils/dxf.py +++ b/src/qumada/utils/dxf.py @@ -21,6 +21,10 @@ from qumada.utils.geometry import Gate, load_from_file, store_to_file SELECT_ALPHA = 0.3 +DEFAULT_X_RNG = (-3.0, 3.0) +DEFAULT_Y_RNG = (-1.5, 1.5) +DEFAULT_LAYER_REGEX = r".*BEAM\_L." +DEFAULT_GRID_SIZE = 1e-3 logger = logging.getLogger(__name__) @@ -59,10 +63,10 @@ def iterate_all_entities(e, path=None): def get_gates_from_cropped_region( doc: ezdxf.document.Drawing, - x_rng: tuple[float, float] = (-3.0, 3.0), - y_rng: tuple[float, float] = (-1.5, 1.5), - layer_regex: str = r".*BEAM\_L.", - grid_size: float = 1e-3, + x_rng: tuple[float, float] = DEFAULT_X_RNG, + y_rng: tuple[float, float] = DEFAULT_Y_RNG, + layer_regex: str = DEFAULT_LAYER_REGEX, + grid_size: float = DEFAULT_GRID_SIZE, ) -> list[Gate]: """Selects all polygons (POLYLINE entities) from the selected region that live in a layer matched by the given regular expression. The polygons are cropped to the region and returned as :py:`.Gate` objects. @@ -275,7 +279,7 @@ def pick_handler(event): return gates -def load_convert_and_cache(path: pathlib.Path | str) -> list[Gate]: +def load_convert_and_cache(path: pathlib.Path | str, expected_number_of_gates: int | range | slice = slice(1, None)) -> list[Gate]: path = pathlib.Path(path) if not path.exists(): @@ -302,9 +306,40 @@ def load_convert_and_cache(path: pathlib.Path | str) -> list[Gate]: return load_from_file(json_path) + + + def get_parser(): import argparse + def to_range(s: str): + try: + min_s, max_s = s.split(":") + if not min_s: + min_s = "-inf" + if not max_s: + max_s = "inf" + return float(min_s), float(max_s) + except Exception as err: + raise argparse.ArgumentTypeError( + "Argument must be a 'min:max' pair of python floats or empty strings separated by a colon.") from err + + def to_slice(s: str): + try: + start, stop = s.split(":") + if start: + start = int(start) + else: + start = None + if stop: + stop = int(stop) + else: + stop = None + return slice(start, stop) + except Exception as err: + raise argparse.ArgumentTypeError( + "Argument must be a 'start:stop' pair of python integers or empty strings separated by a colon.") from err + parser = argparse.ArgumentParser(description="Qumada dxf labeler") parser.add_argument("dxf_path", type=pathlib.Path, help="path to dxf file") parser.add_argument( @@ -313,19 +348,55 @@ def get_parser(): help="path to json file. default is the same as dxf with other ending", default=None, ) + parser.add_argument("--x-rng", + help="The gates are cropped to this range in x coordinates", + type=to_range, + metavar="[X_MIN]:[X_MAX]", + default=':'.join(map(str, DEFAULT_X_RNG)), + ) + parser.add_argument("--y-rng", + help="The gates are cropped to this range in y coordinates", + type=to_range, + metavar="[Y_MIN]:[Y_MAX]", + default=':'.join(map(str, DEFAULT_X_RNG)), + ) + parser.add_argument("--layer-regex", + help="Only gates from layers where the name matches this regex are considered", + default=DEFAULT_LAYER_REGEX) + + parser.add_argument("--expected-gate-number", + help="Expected number of gates in integer slice notation (excluding end). " + "The default will result in an error if there are no gates extracted", + metavar="[START]:[STOP]", + type=to_slice, + default="1:") + return parser -if __name__ == "__main__": +def _main(dxf_path: str, json_path: str, layer_regex: str, x_rng: tuple[float, float], y_rng: tuple[float, float], expected_gate_number: slice): matplotlib.use("qtagg") - parser = get_parser() - args = parser.parse_args() - if args.json_path is None: - args.json_path = args.dxf_path.with_suffix(".json") + doc = ezdxf.readfile(dxf_path) + raw_gates = get_gates_from_cropped_region(doc, layer_regex=layer_regex, x_rng=x_rng, y_rng=y_rng) + print(f"{len(raw_gates)} raw gates extracted.") - doc = ezdxf.readfile(args.dxf_path) - raw_gates = get_gates_from_cropped_region(doc) merged_gates = auto_merge(raw_gates) + print(f"{len(merged_gates)} gates left after merging.") + + if expected_gate_number.start is not None and len(merged_gates) < expected_gate_number.start: + raise ValueError(f"Only {len(merged_gates)} gates extracted but >= {expected_gate_number.start} were expected.") + if expected_gate_number.stop is not None and len(merged_gates) >= expected_gate_number.stop: + raise ValueError(f"Only {len(merged_gates)} gates extracted but < {expected_gate_number.stop} were expected.") + resulting_gates = label_gates(merged_gates) plt.show(block=True) - store_to_file(resulting_gates, args.json_path) + store_to_file(resulting_gates, json_path) + + +if __name__ == "__main__": + parser = get_parser() + args = parser.parse_args() + if args.json_path is None: + args.json_path = args.dxf_path.with_suffix(".json") + _main(args.dxf_path, args.json_path, args.layer_regex, args.x_rng, args.y_rng, + args.expected_gate_number) From 1485c156c342991ebee5bc706f0ff128dabf45da Mon Sep 17 00:00:00 2001 From: Simon Humpohl Date: Thu, 28 Aug 2025 14:11:53 +0200 Subject: [PATCH 13/19] Properly handle an empty gate set --- src/qumada/utils/dxf.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/qumada/utils/dxf.py b/src/qumada/utils/dxf.py index 7099b063..d425b72b 100644 --- a/src/qumada/utils/dxf.py +++ b/src/qumada/utils/dxf.py @@ -180,6 +180,9 @@ def auto_merge(gates: list[Gate], grid_size: float = 1e-3): def label_gates(gates: list[Gate]) -> list[Gate]: + if not gates: + return [] + gates = [copy.deepcopy(gate) for gate in gates] gates = sorted(gates, key=lambda gate: 0.0 if gate.label_position is None else math.atan2(*gate.label_position)) @@ -388,6 +391,12 @@ def _main(dxf_path: str, json_path: str, layer_regex: str, x_rng: tuple[float, f if expected_gate_number.stop is not None and len(merged_gates) >= expected_gate_number.stop: raise ValueError(f"Only {len(merged_gates)} gates extracted but < {expected_gate_number.stop} were expected.") + if not merged_gates: + # this is apparently explicitly allowed by the user cause otherwise the expected_gate_number check should fail + print("No gates extracted. Storing empty gates in json path") + store_to_file([], json_path) + return + resulting_gates = label_gates(merged_gates) plt.show(block=True) store_to_file(resulting_gates, json_path) From c728ccf081285449a6ce38107468c0c9bc63e908 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 28 Aug 2025 12:13:45 +0000 Subject: [PATCH 14/19] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/qumada/utils/dxf.py | 75 ++++++++++++++++++++++++----------------- 1 file changed, 44 insertions(+), 31 deletions(-) diff --git a/src/qumada/utils/dxf.py b/src/qumada/utils/dxf.py index 40862f16..45463d11 100644 --- a/src/qumada/utils/dxf.py +++ b/src/qumada/utils/dxf.py @@ -282,7 +282,9 @@ def pick_handler(event): return gates -def load_convert_and_cache(path: pathlib.Path | str, expected_number_of_gates: int | range | slice = slice(1, None)) -> list[Gate]: +def load_convert_and_cache( + path: pathlib.Path | str, expected_number_of_gates: int | range | slice = slice(1, None) +) -> list[Gate]: path = pathlib.Path(path) if not path.exists(): @@ -311,9 +313,6 @@ def load_convert_and_cache(path: pathlib.Path | str, expected_number_of_gates: i return load_from_file(json_path) - - - def get_parser(): import argparse @@ -327,7 +326,8 @@ def to_range(s: str): return float(min_s), float(max_s) except Exception as err: raise argparse.ArgumentTypeError( - "Argument must be a 'min:max' pair of python floats or empty strings separated by a colon.") from err + "Argument must be a 'min:max' pair of python floats or empty strings separated by a colon." + ) from err def to_slice(s: str): try: @@ -343,7 +343,8 @@ def to_slice(s: str): return slice(start, stop) except Exception as err: raise argparse.ArgumentTypeError( - "Argument must be a 'start:stop' pair of python integers or empty strings separated by a colon.") from err + "Argument must be a 'start:stop' pair of python integers or empty strings separated by a colon." + ) from err parser = argparse.ArgumentParser(description="Qumada dxf labeler") parser.add_argument("dxf_path", type=pathlib.Path, help="path to dxf file") @@ -353,33 +354,46 @@ def to_slice(s: str): help="path to json file. default is the same as dxf with other ending", default=None, ) - parser.add_argument("--x-rng", - help="The gates are cropped to this range in x coordinates", - type=to_range, - metavar="[X_MIN]:[X_MAX]", - default=':'.join(map(str, DEFAULT_X_RNG)), - ) - parser.add_argument("--y-rng", - help="The gates are cropped to this range in y coordinates", - type=to_range, - metavar="[Y_MIN]:[Y_MAX]", - default=':'.join(map(str, DEFAULT_X_RNG)), - ) - parser.add_argument("--layer-regex", - help="Only gates from layers where the name matches this regex are considered", - default=DEFAULT_LAYER_REGEX) - - parser.add_argument("--expected-gate-number", - help="Expected number of gates in integer slice notation (excluding end). " - "The default will result in an error if there are no gates extracted", - metavar="[START]:[STOP]", - type=to_slice, - default="1:") + parser.add_argument( + "--x-rng", + help="The gates are cropped to this range in x coordinates", + type=to_range, + metavar="[X_MIN]:[X_MAX]", + default=":".join(map(str, DEFAULT_X_RNG)), + ) + parser.add_argument( + "--y-rng", + help="The gates are cropped to this range in y coordinates", + type=to_range, + metavar="[Y_MIN]:[Y_MAX]", + default=":".join(map(str, DEFAULT_X_RNG)), + ) + parser.add_argument( + "--layer-regex", + help="Only gates from layers where the name matches this regex are considered", + default=DEFAULT_LAYER_REGEX, + ) + + parser.add_argument( + "--expected-gate-number", + help="Expected number of gates in integer slice notation (excluding end). " + "The default will result in an error if there are no gates extracted", + metavar="[START]:[STOP]", + type=to_slice, + default="1:", + ) return parser -def _main(dxf_path: str, json_path: str, layer_regex: str, x_rng: tuple[float, float], y_rng: tuple[float, float], expected_gate_number: slice): +def _main( + dxf_path: str, + json_path: str, + layer_regex: str, + x_rng: tuple[float, float], + y_rng: tuple[float, float], + expected_gate_number: slice, +): matplotlib.use("qtagg") doc = ezdxf.readfile(dxf_path) raw_gates = get_gates_from_cropped_region(doc, layer_regex=layer_regex, x_rng=x_rng, y_rng=y_rng) @@ -409,5 +423,4 @@ def _main(dxf_path: str, json_path: str, layer_regex: str, x_rng: tuple[float, f args = parser.parse_args() if args.json_path is None: args.json_path = args.dxf_path.with_suffix(".json") - _main(args.dxf_path, args.json_path, args.layer_regex, args.x_rng, args.y_rng, - args.expected_gate_number) + _main(args.dxf_path, args.json_path, args.layer_regex, args.x_rng, args.y_rng, args.expected_gate_number) From c9eb98893f3c817fcb2ab9201ef81be61a34c1de Mon Sep 17 00:00:00 2001 From: Simon Humpohl Date: Thu, 28 Aug 2025 15:08:28 +0200 Subject: [PATCH 15/19] Include automatic centering and better logging control --- src/qumada/utils/dxf.py | 85 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 79 insertions(+), 6 deletions(-) diff --git a/src/qumada/utils/dxf.py b/src/qumada/utils/dxf.py index 45463d11..5dcb44a7 100644 --- a/src/qumada/utils/dxf.py +++ b/src/qumada/utils/dxf.py @@ -13,7 +13,7 @@ import ezdxf.document import matplotlib.widgets -import shapely +import shapely.affinity from matplotlib import pyplot as plt from shapely.geometry import LineString, MultiLineString, Polygon, box from shapely.plotting import plot_polygon @@ -61,12 +61,30 @@ def iterate_all_entities(e, path=None): yield e, list(path) +def _get_all_entity_bounding_box(doc: ezdxf.document.Drawing) -> tuple[float, float, float, float] | None: + minx = miny = maxx = maxy = float('nan') + + for model in doc.modelspace(): + for e, _ in iterate_all_entities(model): + e_minx, e_miny, e_maxx, e_maxy = entity_to_geom(e).bounds + minx = min(e_minx, minx) + miny = min(e_miny, miny) + maxx = max(e_maxx, maxx) + maxy = max(e_maxy, maxy) + + if math.isnan(minx): + return None + + return minx, miny, maxx, maxy + + def get_gates_from_cropped_region( doc: ezdxf.document.Drawing, x_rng: tuple[float, float] = DEFAULT_X_RNG, y_rng: tuple[float, float] = DEFAULT_Y_RNG, layer_regex: str = DEFAULT_LAYER_REGEX, grid_size: float = DEFAULT_GRID_SIZE, + recenter_coordinates: bool = False, ) -> list[Gate]: """Selects all polygons (POLYLINE entities) from the selected region that live in a layer matched by the given regular expression. The polygons are cropped to the region and returned as :py:`.Gate` objects. @@ -76,11 +94,35 @@ def get_gates_from_cropped_region( In some cases, continuous gates are returned in multiple pieces. Use :py:`.auto_merge` to merge overlapping gates in the same layer. """ regex = re.compile(layer_regex) + logger.debug("Using regular expression %r for layer selection", layer_regex) keep_layer = {layer.dxf.name: bool(regex.search(layer.dxf.name)) for layer in doc.layers} xmin, xmax = x_rng ymin, ymax = y_rng + + if recenter_coordinates: + doc_bounds = _get_all_entity_bounding_box(doc) + if not doc_bounds: + logger.warning("No entities in document") + return [] + + doc_minx, doc_miny, doc_maxx, doc_maxy = doc_bounds + x_offset = (doc_miny + doc_maxy) / 2 + y_offset = (doc_minx + doc_maxx) / 2 + + xmin += x_offset + xmax += x_offset + ymin += y_offset + ymax += y_offset + + offset = (x_offset, y_offset) + logger.info("Added offset of %r to the cropping ranges", offset) + else: + offset = None + + logger.info("Using x cropping range: %r and y cropping range %r", (xmin, xmax), (ymin, ymax)) + roi_poly = Polygon(box(xmin, ymin, xmax, ymax)) chosen = [] @@ -125,6 +167,13 @@ def get_gates_from_cropped_region( ) chosen.append(gate) + if offset is not None and offset != (0.0, 0.0): + xoff = -offset[0] + yoff = -offset[1] + + for gate in chosen: + gate.polygon = shapely.affinity.translate(gate.polygon, xoff, yoff) + return chosen @@ -373,6 +422,19 @@ def to_slice(s: str): help="Only gates from layers where the name matches this regex are considered", default=DEFAULT_LAYER_REGEX, ) + parser.add_argument( + "--log-level", + help="Logging level", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + default="INFO", + ) + parser.add_argument( + "--recenter-coordinates", + help="If true(default), the cropping ranges are relative to the center of the bounding box of all objects in the file.", + choices=[True, False], + type=bool, + default=True, + ) parser.add_argument( "--expected-gate-number", @@ -393,14 +455,21 @@ def _main( x_rng: tuple[float, float], y_rng: tuple[float, float], expected_gate_number: slice, + recenter_coordinates: bool, ): matplotlib.use("qtagg") doc = ezdxf.readfile(dxf_path) - raw_gates = get_gates_from_cropped_region(doc, layer_regex=layer_regex, x_rng=x_rng, y_rng=y_rng) - print(f"{len(raw_gates)} raw gates extracted.") + raw_gates = get_gates_from_cropped_region( + doc, + layer_regex=layer_regex, + x_rng=x_rng, + y_rng=y_rng, + recenter_coordinates=recenter_coordinates + ) + logger.info(f"{len(raw_gates)} raw gates extracted.") merged_gates = auto_merge(raw_gates) - print(f"{len(merged_gates)} gates left after merging.") + logger.info(f"{len(merged_gates)} gates left after merging.") if expected_gate_number.start is not None and len(merged_gates) < expected_gate_number.start: raise ValueError(f"Only {len(merged_gates)} gates extracted but >= {expected_gate_number.start} were expected.") @@ -409,7 +478,7 @@ def _main( if not merged_gates: # this is apparently explicitly allowed by the user cause otherwise the expected_gate_number check should fail - print("No gates extracted. Storing empty gates in json path") + logger.info("No gates extracted. Storing empty gates in json path") store_to_file([], json_path) return @@ -423,4 +492,8 @@ def _main( args = parser.parse_args() if args.json_path is None: args.json_path = args.dxf_path.with_suffix(".json") - _main(args.dxf_path, args.json_path, args.layer_regex, args.x_rng, args.y_rng, args.expected_gate_number) + + logging.basicConfig(level=args.log_level) + + _main(args.dxf_path, args.json_path, args.layer_regex, args.x_rng, args.y_rng, + args.expected_gate_number, args.recenter_coordinates) From 825c22dafd9cff761966f2981c85d9a17dcb66c7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 28 Aug 2025 13:09:48 +0000 Subject: [PATCH 16/19] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/qumada/utils/dxf.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/qumada/utils/dxf.py b/src/qumada/utils/dxf.py index 5dcb44a7..0316d9b1 100644 --- a/src/qumada/utils/dxf.py +++ b/src/qumada/utils/dxf.py @@ -62,7 +62,7 @@ def iterate_all_entities(e, path=None): def _get_all_entity_bounding_box(doc: ezdxf.document.Drawing) -> tuple[float, float, float, float] | None: - minx = miny = maxx = maxy = float('nan') + minx = miny = maxx = maxy = float("nan") for model in doc.modelspace(): for e, _ in iterate_all_entities(model): @@ -460,11 +460,7 @@ def _main( matplotlib.use("qtagg") doc = ezdxf.readfile(dxf_path) raw_gates = get_gates_from_cropped_region( - doc, - layer_regex=layer_regex, - x_rng=x_rng, - y_rng=y_rng, - recenter_coordinates=recenter_coordinates + doc, layer_regex=layer_regex, x_rng=x_rng, y_rng=y_rng, recenter_coordinates=recenter_coordinates ) logger.info(f"{len(raw_gates)} raw gates extracted.") @@ -495,5 +491,12 @@ def _main( logging.basicConfig(level=args.log_level) - _main(args.dxf_path, args.json_path, args.layer_regex, args.x_rng, args.y_rng, - args.expected_gate_number, args.recenter_coordinates) + _main( + args.dxf_path, + args.json_path, + args.layer_regex, + args.x_rng, + args.y_rng, + args.expected_gate_number, + args.recenter_coordinates, + ) From 0b14aaf98db109d0db29d9347a8ef638d4d7493a Mon Sep 17 00:00:00 2001 From: Simon Humpohl Date: Thu, 28 Aug 2025 17:20:14 +0200 Subject: [PATCH 17/19] Adjust cropping area based on feature sizes --- src/qumada/utils/dxf.py | 145 ++++++++++++++++++++++++++++------------ 1 file changed, 103 insertions(+), 42 deletions(-) diff --git a/src/qumada/utils/dxf.py b/src/qumada/utils/dxf.py index 5dcb44a7..4b9d2093 100644 --- a/src/qumada/utils/dxf.py +++ b/src/qumada/utils/dxf.py @@ -1,5 +1,6 @@ import argparse import copy +import itertools import logging import math import operator @@ -13,6 +14,8 @@ import ezdxf.document import matplotlib.widgets +import numpy as np + import shapely.affinity from matplotlib import pyplot as plt from shapely.geometry import LineString, MultiLineString, Polygon, box @@ -37,14 +40,25 @@ def entity_to_geom(e): if e.dxftype() == "LINE": return LineString([e.dxf.start, e.dxf.end]) - if e.dxftype() in {"LWPOLYLINE", "POLYLINE"}: - if hasattr(e, "get_points"): - points = e.get_points() + if e.dxftype() == "LWPOLYLINE": + vertices = [(x, y) + for x, y, *_ in e.vertices_in_wcs()] + closed = bool(e.closed) if hasattr(e, "closed") else vertices[0] == vertices[-1] + if closed: + return Polygon(vertices) + else: + return LineString(vertices) + + if e.dxftype() == "POLYLINE": + points = [ + (x, y) + for x, y, *_ in e.points_in_wcs() + ] + closed = bool(e.closed) if hasattr(e, "closed") else points[0] == points[-1] + if closed: + return Polygon(points) else: - points = e.points_in_wcs() - pts = [tuple(p)[:2] for p in points] # ignore bulge for now - closed = bool(e.closed) if hasattr(e, "closed") else pts[0] == pts[-1] - return Polygon(pts) if closed else LineString(pts) + return LineString(points) raise NotImplementedError(e.dxftype()) @@ -78,13 +92,64 @@ def _get_all_entity_bounding_box(doc: ezdxf.document.Drawing) -> tuple[float, fl return minx, miny, maxx, maxy +def _auto_cropping_box(doc: ezdxf.document.Drawing, + keep_layer: dict[str, bool], + feature_size: float, + max_cropping_box_size: float, + grid_size: float, + ) -> tuple[float, float, float, float] | None: + edge_set = [] + + for model in doc.modelspace(): + for e, _ in iterate_all_entities(model): + if not keep_layer[e.dxf.layer]: + continue + + raw_geom = entity_to_geom(e) + if raw_geom is None: + continue + geom = Polygon(raw_geom).simplify(grid_size) + + points = np.array(geom.boundary.xy).T.tolist() + + if not points: + continue + + points.append(points[0]) + for (x0, y0), (x1, y1) in itertools.pairwise(points): + if x0 == x1 and x1 == y1: + continue + + if (x0 - x1)**2 + (y0 - y1)**2 <= feature_size**2: + edge_set.append( + [ + [x0, y0], + [x1, y1] + ] + ) + if not edge_set: + return None + + edge_set = np.array(edge_set) + + edge_center = np.mean(edge_set, axis=(0, 1)) + logger.debug("edge_center: %r", edge_center) + edge_distance = np.min(np.linalg.norm(edge_set - edge_center, axis=2), axis=1) + edge_mask = edge_distance <= max_cropping_box_size * 5 + included_edges = edge_set[edge_mask] + + minx, miny = np.min(included_edges, axis=(0, 1)).tolist() + maxx, maxy = np.max(included_edges, axis=(0, 1)).tolist() + return minx, miny, maxx, maxy + + def get_gates_from_cropped_region( doc: ezdxf.document.Drawing, x_rng: tuple[float, float] = DEFAULT_X_RNG, y_rng: tuple[float, float] = DEFAULT_Y_RNG, layer_regex: str = DEFAULT_LAYER_REGEX, grid_size: float = DEFAULT_GRID_SIZE, - recenter_coordinates: bool = False, + auto_adjust_cropping: bool = False, ) -> list[Gate]: """Selects all polygons (POLYLINE entities) from the selected region that live in a layer matched by the given regular expression. The polygons are cropped to the region and returned as :py:`.Gate` objects. @@ -101,25 +166,27 @@ def get_gates_from_cropped_region( xmin, xmax = x_rng ymin, ymax = y_rng - if recenter_coordinates: - doc_bounds = _get_all_entity_bounding_box(doc) - if not doc_bounds: - logger.warning("No entities in document") - return [] + if auto_adjust_cropping: + feature_size = min(xmax - xmin, ymax - ymin) / 3. + max_box_size = math.sqrt((xmax - xmin)**2 + (ymax - ymin)**2) + auto_xmin, auto_ymin, auto_xmax, auto_ymax = _auto_cropping_box(doc, + keep_layer=keep_layer, + feature_size=feature_size, + max_cropping_box_size=max_box_size, + grid_size=grid_size, + ) - doc_minx, doc_miny, doc_maxx, doc_maxy = doc_bounds - x_offset = (doc_miny + doc_maxy) / 2 - y_offset = (doc_minx + doc_maxx) / 2 + x_center = (auto_xmax + auto_xmin) / 2.0 + y_center = (auto_ymax + auto_ymin) / 2.0 + offset = (x_center, y_center) - xmin += x_offset - xmax += x_offset - ymin += y_offset - ymax += y_offset + logger.info("Shifting box by %r", offset) - offset = (x_offset, y_offset) - logger.info("Added offset of %r to the cropping ranges", offset) - else: - offset = None + # shift initial box + xmin += x_center + xmax += x_center + ymin += y_center + ymax += y_center logger.info("Using x cropping range: %r and y cropping range %r", (xmin, xmax), (ymin, ymax)) @@ -167,13 +234,6 @@ def get_gates_from_cropped_region( ) chosen.append(gate) - if offset is not None and offset != (0.0, 0.0): - xoff = -offset[0] - yoff = -offset[1] - - for gate in chosen: - gate.polygon = shapely.affinity.translate(gate.polygon, xoff, yoff) - return chosen @@ -407,19 +467,19 @@ def to_slice(s: str): "--x-rng", help="The gates are cropped to this range in x coordinates", type=to_range, - metavar="[X_MIN]:[X_MAX]", + metavar="[X_MIN={}]:[X_MAX={}]".format(*DEFAULT_X_RNG), default=":".join(map(str, DEFAULT_X_RNG)), ) parser.add_argument( "--y-rng", help="The gates are cropped to this range in y coordinates", type=to_range, - metavar="[Y_MIN]:[Y_MAX]", - default=":".join(map(str, DEFAULT_X_RNG)), + metavar="[Y_MIN={}]:[Y_MAX={}]".format(*DEFAULT_Y_RNG), + default=":".join(map(str, DEFAULT_Y_RNG)), ) parser.add_argument( "--layer-regex", - help="Only gates from layers where the name matches this regex are considered", + help=f"Only gates from layers where the name matches this regex are considered. (Default: \"{DEFAULT_LAYER_REGEX}\")", default=DEFAULT_LAYER_REGEX, ) parser.add_argument( @@ -429,10 +489,10 @@ def to_slice(s: str): default="INFO", ) parser.add_argument( - "--recenter-coordinates", - help="If true(default), the cropping ranges are relative to the center of the bounding box of all objects in the file.", + "--auto-adjust-cropping", + help="If true(default), the cropping ranges are adjusted based on some unstable feature detection algorithm.", choices=[True, False], - type=bool, + type=lambda x: x.lower() == "true", default=True, ) @@ -455,7 +515,7 @@ def _main( x_rng: tuple[float, float], y_rng: tuple[float, float], expected_gate_number: slice, - recenter_coordinates: bool, + auto_adjust_cropping: bool, ): matplotlib.use("qtagg") doc = ezdxf.readfile(dxf_path) @@ -464,7 +524,7 @@ def _main( layer_regex=layer_regex, x_rng=x_rng, y_rng=y_rng, - recenter_coordinates=recenter_coordinates + auto_adjust_cropping=auto_adjust_cropping ) logger.info(f"{len(raw_gates)} raw gates extracted.") @@ -493,7 +553,8 @@ def _main( if args.json_path is None: args.json_path = args.dxf_path.with_suffix(".json") - logging.basicConfig(level=args.log_level) + logging.basicConfig() + logger.setLevel(args.log_level) _main(args.dxf_path, args.json_path, args.layer_regex, args.x_rng, args.y_rng, - args.expected_gate_number, args.recenter_coordinates) + args.expected_gate_number, auto_adjust_cropping=args.auto_adjust_cropping) From 258a8acc3bb0d9ffebb8f0738b895ba2df30b53d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 28 Aug 2025 15:24:31 +0000 Subject: [PATCH 18/19] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/qumada/utils/dxf.py | 61 ++++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/src/qumada/utils/dxf.py b/src/qumada/utils/dxf.py index dc8d3a14..4b31747d 100644 --- a/src/qumada/utils/dxf.py +++ b/src/qumada/utils/dxf.py @@ -15,7 +15,6 @@ import ezdxf.document import matplotlib.widgets import numpy as np - import shapely.affinity from matplotlib import pyplot as plt from shapely.geometry import LineString, MultiLineString, Polygon, box @@ -41,8 +40,7 @@ def entity_to_geom(e): return LineString([e.dxf.start, e.dxf.end]) if e.dxftype() == "LWPOLYLINE": - vertices = [(x, y) - for x, y, *_ in e.vertices_in_wcs()] + vertices = [(x, y) for x, y, *_ in e.vertices_in_wcs()] closed = bool(e.closed) if hasattr(e, "closed") else vertices[0] == vertices[-1] if closed: return Polygon(vertices) @@ -50,10 +48,7 @@ def entity_to_geom(e): return LineString(vertices) if e.dxftype() == "POLYLINE": - points = [ - (x, y) - for x, y, *_ in e.points_in_wcs() - ] + points = [(x, y) for x, y, *_ in e.points_in_wcs()] closed = bool(e.closed) if hasattr(e, "closed") else points[0] == points[-1] if closed: return Polygon(points) @@ -92,12 +87,13 @@ def _get_all_entity_bounding_box(doc: ezdxf.document.Drawing) -> tuple[float, fl return minx, miny, maxx, maxy -def _auto_cropping_box(doc: ezdxf.document.Drawing, - keep_layer: dict[str, bool], - feature_size: float, - max_cropping_box_size: float, - grid_size: float, - ) -> tuple[float, float, float, float] | None: +def _auto_cropping_box( + doc: ezdxf.document.Drawing, + keep_layer: dict[str, bool], + feature_size: float, + max_cropping_box_size: float, + grid_size: float, +) -> tuple[float, float, float, float] | None: edge_set = [] for model in doc.modelspace(): @@ -120,13 +116,8 @@ def _auto_cropping_box(doc: ezdxf.document.Drawing, if x0 == x1 and x1 == y1: continue - if (x0 - x1)**2 + (y0 - y1)**2 <= feature_size**2: - edge_set.append( - [ - [x0, y0], - [x1, y1] - ] - ) + if (x0 - x1) ** 2 + (y0 - y1) ** 2 <= feature_size**2: + edge_set.append([[x0, y0], [x1, y1]]) if not edge_set: return None @@ -167,14 +158,15 @@ def get_gates_from_cropped_region( ymin, ymax = y_rng if auto_adjust_cropping: - feature_size = min(xmax - xmin, ymax - ymin) / 3. - max_box_size = math.sqrt((xmax - xmin)**2 + (ymax - ymin)**2) - auto_xmin, auto_ymin, auto_xmax, auto_ymax = _auto_cropping_box(doc, - keep_layer=keep_layer, - feature_size=feature_size, - max_cropping_box_size=max_box_size, - grid_size=grid_size, - ) + feature_size = min(xmax - xmin, ymax - ymin) / 3.0 + max_box_size = math.sqrt((xmax - xmin) ** 2 + (ymax - ymin) ** 2) + auto_xmin, auto_ymin, auto_xmax, auto_ymax = _auto_cropping_box( + doc, + keep_layer=keep_layer, + feature_size=feature_size, + max_cropping_box_size=max_box_size, + grid_size=grid_size, + ) x_center = (auto_xmax + auto_xmin) / 2.0 y_center = (auto_ymax + auto_ymin) / 2.0 @@ -479,7 +471,7 @@ def to_slice(s: str): ) parser.add_argument( "--layer-regex", - help=f"Only gates from layers where the name matches this regex are considered. (Default: \"{DEFAULT_LAYER_REGEX}\")", + help=f'Only gates from layers where the name matches this regex are considered. (Default: "{DEFAULT_LAYER_REGEX}")', default=DEFAULT_LAYER_REGEX, ) parser.add_argument( @@ -552,5 +544,12 @@ def _main( logging.basicConfig() logger.setLevel(args.log_level) - _main(args.dxf_path, args.json_path, args.layer_regex, args.x_rng, args.y_rng, - args.expected_gate_number, auto_adjust_cropping=args.auto_adjust_cropping) + _main( + args.dxf_path, + args.json_path, + args.layer_regex, + args.x_rng, + args.y_rng, + args.expected_gate_number, + auto_adjust_cropping=args.auto_adjust_cropping, + ) From 965df5c38c0a4531495e4feb770877dbbeb3a040 Mon Sep 17 00:00:00 2001 From: Simon Humpohl Date: Tue, 9 Sep 2025 11:51:29 +0200 Subject: [PATCH 19/19] Do not bypass caching configuration by default. --- src/qumada/utils/device_server.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/qumada/utils/device_server.py b/src/qumada/utils/device_server.py index 174bf601..2813bbc4 100644 --- a/src/qumada/utils/device_server.py +++ b/src/qumada/utils/device_server.py @@ -17,7 +17,7 @@ logger = logging.getLogger(__name__) -def _get_parameter_data(parameter: Parameter, cache_only: bool = True): +def _get_parameter_data(parameter: Parameter, cache_only): value = parameter.cache.get(get_if_invalid=not cache_only) timestamp = parameter.cache.timestamp @@ -35,7 +35,7 @@ def _get_parameter_data(parameter: Parameter, cache_only: bool = True): } -def _collect_data(*parameters: Parameter, cache_only: bool = True): +def _collect_data(*parameters: Parameter, cache_only): data = [] for parameter in parameters: try: @@ -51,7 +51,7 @@ def _collect_data(*parameters: Parameter, cache_only: bool = True): class DataCollector: - def __init__(self, *parameters: Parameter, cache_only: bool = True, minimal_update_delta: float = 1 / 30): + def __init__(self, *parameters: Parameter, cache_only: bool = False, minimal_update_delta: float = 1 / 30): self.lock = asyncio.Lock() self.parameters = parameters self.cache_only = cache_only