diff --git a/CHANGELOG.md b/CHANGELOG.md index e87501c6..7060da36 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - #402: Support for Python 3.14. +- #407: Introduced new method `graphix.optimization.StandardizedPattern.extract_xzcorrections` and its wrapper `graphix.pattern.Pattern.extract_xzcorrections` which extract an `XZCorrections` instance from a pattern. + + ### Fixed - #392: `Pattern.remove_input_nodes` is required before the `Pattern.perform_pauli_measurements` method to ensure input nodes are removed and fixed in the |+> state. @@ -57,6 +60,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - #409: Axis labels are shown when visualizing a pattern. Legend is placed outside the plot so that the graph remains visible. +- #407: Fixed an unreported bug in `OpenGraph.is_equal_structurally` which failed to compare open graphs differing on the output nodes only. + ### Changed - #374: Adapted existing method `graphix.opengraph.OpenGraph.isclose` to the new API introduced in #358. @@ -70,6 +75,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 π. In particular, angles that appear in parameters of circuit instructions are now expressed in units of π. +- #407: + - Modified the constructor `XZCorrections.from_measured_nodes_mapping` so that it doesn't need to create an `nx.DiGraph` instance. This fixes an unreported bug in the method. + - Removed modules `graphix.gflow` and `graphix.find_pflow`. + ## [0.3.3] - 2025-10-23 ### Added diff --git a/docs/source/flow.rst b/docs/source/flow.rst deleted file mode 100644 index c556400f..00000000 --- a/docs/source/flow.rst +++ /dev/null @@ -1,30 +0,0 @@ -flow and gflow -============== - -:mod:`graphix.gflow` module -+++++++++++++++++++++++++++++++++++ - -This provides functions to find flow structures (causal flow, gflow, Pauli flow) in a graph, -to verify if a given flow structure is valid, and to extract flow structures from a given pattern. - -.. automodule:: graphix.gflow - -.. currentmodule:: graphix.gflow - -.. autofunction:: find_flow - -.. autofunction:: find_gflow - -.. autofunction:: find_pauliflow - -.. autofunction:: verify_flow - -.. autofunction:: verify_gflow - -.. autofunction:: verify_pauliflow - -.. autofunction:: flow_from_pattern - -.. autofunction:: gflow_from_pattern - -.. autofunction:: pauliflow_from_pattern diff --git a/docs/source/modifier.rst b/docs/source/modifier.rst index ff82ba2d..1c74ee3a 100644 --- a/docs/source/modifier.rst +++ b/docs/source/modifier.rst @@ -58,8 +58,6 @@ Pattern Manipulation .. automethod:: max_space - .. automethod:: get_layers - .. automethod:: to_qasm3 diff --git a/docs/source/references.rst b/docs/source/references.rst index dfc818d5..ac670e0b 100644 --- a/docs/source/references.rst +++ b/docs/source/references.rst @@ -10,7 +10,6 @@ Module reference simulator graphsim extraction - flow clifford visualization channels diff --git a/examples/rotation.py b/examples/rotation.py index 07b127f1..e72ebc12 100644 --- a/examples/rotation.py +++ b/examples/rotation.py @@ -53,7 +53,7 @@ # Since there's no two-qubit gates applied to the two qubits in the original gate sequence, # we see decoupled 1D graphs representing the evolution of single qubits. # The arrows are the ``information flow `` -# of the MBQC pattern, obtained using the flow-finding algorithm implemented in :class:`graphix.gflow.flow`. +# of the MBQC pattern, obtained using the flow-finding algorithm implemented in :class:`graphix.flow`. # Below we list the meaning of the node boundary and face colors. # # - Nodes with red boundaries are the *input nodes* where the computation starts. diff --git a/graphix/find_pflow.py b/graphix/find_pflow.py deleted file mode 100644 index 609c00cd..00000000 --- a/graphix/find_pflow.py +++ /dev/null @@ -1,613 +0,0 @@ -"""Pauli flow finding algorithm. - -This module implements the algorithm presented in [1]. For a given labelled open graph (G, I, O, meas_plane), this algorithm finds a maximally delayed Pauli flow [2] in polynomial time with the number of nodes, :math:`O(N^3)`. -If the input graph does not have Pauli measurements, the algorithm returns a general flow (gflow) if it exists by definition. - -References ----------- -[1] Mitosek and Backens, 2024 (arXiv:2410.23439). -[2] Browne et al., 2007 New J. Phys. 9 250 (arXiv:quant-ph/0702212) -""" - -from __future__ import annotations - -from copy import deepcopy -from typing import TYPE_CHECKING - -import numpy as np - -from graphix._linalg import MatGF2, solve_f2_linear_system -from graphix.fundamentals import Axis, Plane -from graphix.measurements import PauliMeasurement -from graphix.sim.base_backend import NodeIndex - -if TYPE_CHECKING: - from collections.abc import Set as AbstractSet - - from graphix.measurements import Measurement - from graphix.opengraph import OpenGraph - - -class OpenGraphIndex: - """A class for managing the mapping between node numbers of a given open graph and matrix indices in the Pauli flow finding algorithm. - - It reuses the class `:class: graphix.sim.base_backend.NodeIndex` introduced for managing the mapping between node numbers and qubit indices in the internal state of the backend. - - Attributes - ---------- - og (OpenGraph) - non_inputs (NodeIndex) : Mapping between matrix indices and non-input nodes (labelled with integers). - non_outputs (NodeIndex) : Mapping between matrix indices and non-output nodes (labelled with integers). - non_outputs_optim (NodeIndex) : Mapping between matrix indices and a subset of non-output nodes (labelled with integers). - - Notes - ----- - At initialization, `non_outputs_optim` is a copy of `non_outputs`. The nodes corresponding to zero-rows of the order-demand matrix are removed for calculating the P matrix more efficiently in the `:func: _find_pflow_general` routine. - """ - - def __init__(self, og: OpenGraph[Measurement]) -> None: - self.og = og - nodes = set(og.graph.nodes) - - # Nodes don't need to be sorted. We do it for debugging purposes, so we can check the matrices in intermediate steps of the algorithm. - - nodes_non_input = sorted(nodes - set(og.input_nodes)) - nodes_non_output = sorted(nodes - set(og.output_nodes)) - - self.non_inputs = NodeIndex() - self.non_inputs.extend(nodes_non_input) - - self.non_outputs = NodeIndex() - self.non_outputs.extend(nodes_non_output) - - # Needs to be a deep copy because it may be modified during runtime. - self.non_outputs_optim = deepcopy(self.non_outputs) - - -def _compute_reduced_adj(ogi: OpenGraphIndex) -> MatGF2: - r"""Return the reduced adjacency matrix (RAdj) of the input open graph. - - Parameters - ---------- - ogi : OpenGraphIndex - Open graph whose RAdj is computed. - - Returns - ------- - adj_red : MatGF2 - Reduced adjacency matrix. - - Notes - ----- - The adjacency matrix of a graph :math:`Adj_G` is an :math:`n \times n` matrix. - - The RAdj matrix of an open graph OG is an :math:`(n - n_O) \times (n - n_I)` submatrix of :math:`Adj_G` constructed by removing the output rows and input columns of :math:`Adj_G`. - - See Definition 3.3 in Mitosek and Backens, 2024 (arXiv:2410.23439). - """ - graph = ogi.og.graph - row_tags = ogi.non_outputs - col_tags = ogi.non_inputs - - adj_red = np.zeros((len(row_tags), len(col_tags)), dtype=np.uint8).view(MatGF2) - - for n1, n2 in graph.edges: - for u, v in ((n1, n2), (n2, n1)): - if u in row_tags and v in col_tags: - i, j = row_tags.index(u), col_tags.index(v) - adj_red[i, j] = 1 - - return adj_red - - -def _compute_pflow_matrices(ogi: OpenGraphIndex) -> tuple[MatGF2, MatGF2]: - r"""Construct flow-demand and order-demand matrices. - - Parameters - ---------- - ogi : OpenGraphIndex - Open graph whose flow-demand and order-demand matrices are computed. - - Returns - ------- - flow_demand_matrix : MatGF2 - order_demand_matrix : MatGF2 - - Notes - ----- - See Definitions 3.4 and 3.5, and Algorithm 1 in Mitosek and Backens, 2024 (arXiv:2410.23439). - """ - flow_demand_matrix = _compute_reduced_adj(ogi) - order_demand_matrix = flow_demand_matrix.copy() - - inputs_set = set(ogi.og.input_nodes) - meas = ogi.og.measurements - - row_tags = ogi.non_outputs - col_tags = ogi.non_inputs - - # TODO: integrate pauli measurements in open graphs - meas_planes = {i: m.plane for i, m in meas.items()} - meas_angles = {i: m.angle for i, m in meas.items()} - meas_plane_axis = { - node: pm.axis if (pm := PauliMeasurement.try_from(plane, meas_angles[node])) else plane - for node, plane in meas_planes.items() - } - - for v in row_tags: # v is a node tag - i = row_tags.index(v) - plane_axis_v = meas_plane_axis[v] - - if plane_axis_v in {Plane.YZ, Plane.XZ, Axis.Z}: - flow_demand_matrix[i, :] = 0 # Set row corresponding to node v to 0 - if plane_axis_v in {Plane.YZ, Plane.XZ, Axis.Y, Axis.Z} and v not in inputs_set: - j = col_tags.index(v) - flow_demand_matrix[i, j] = 1 # Set element (v, v) = 0 - if plane_axis_v in {Plane.XY, Axis.X, Axis.Y, Axis.Z}: - order_demand_matrix[i, :] = 0 # Set row corresponding to node v to 0 - if plane_axis_v in {Plane.XY, Plane.XZ} and v not in inputs_set: - j = col_tags.index(v) - order_demand_matrix[i, j] = 1 # Set element (v, v) = 1 - - return flow_demand_matrix, order_demand_matrix - - -def _find_pflow_simple(ogi: OpenGraphIndex) -> tuple[MatGF2, MatGF2] | None: - r"""Construct the correction matrix :math:`C` and the ordering matrix, :math:`NC` for an open graph with equal number of inputs and outputs. - - Parameters - ---------- - ogi : OpenGraphIndex - Open graph for which :math:`C` and :math:`NC` are computed. - - Returns - ------- - correction_matrix : MatGF2 - Matrix encoding the correction function. - ordering_matrix : MatGF2 - Matrix encoding the partial ordering between nodes. - - or `None` - if the input open graph does not have Pauli flow. - - Notes - ----- - - The ordering matrix is defined as the product of the order-demand matrix :math:`N` and the correction matrix. - - - The function only returns `None` when the flow-demand matrix is not invertible (meaning that `ogi` does not have Pauli flow). The condition that the ordering matrix :math:`NC` must encode a directed acyclic graph (DAG) is verified in a subsequent step by `:func: _compute_topological_generations`. - - See Definitions 3.4, 3.5 and 3.6, Theorems 3.1 and 4.1, and Algorithm 2 in Mitosek and Backens, 2024 (arXiv:2410.23439). - """ - flow_demand_matrix, order_demand_matrix = _compute_pflow_matrices(ogi) - - correction_matrix = flow_demand_matrix.right_inverse() # C matrix - - if correction_matrix is None: - return None # The flow-demand matrix is not invertible, therefore there's no flow. - - ordering_matrix = order_demand_matrix.mat_mul(correction_matrix) # NC matrix - - return correction_matrix, ordering_matrix - - -def _compute_p_matrix(ogi: OpenGraphIndex, nb_matrix: MatGF2) -> MatGF2 | None: - r"""Perform the steps 8 - 12 of the general case (larger number of outputs than inputs) algorithm. - - Parameters - ---------- - ogi : OpenGraphIndex - Open graph for which the matrix :math:`P` is computed. - nb_matrix : MatGF2 - Matrix :math:`N_B` - - Returns - ------- - p_matrix : MatGF2 - Matrix encoding the correction function. - - or `None` - if the input open graph does not have Pauli flow. - - Notes - ----- - See Theorem 4.4, steps 8 - 12 in Mitosek and Backens, 2024 (arXiv:2410.23439). - """ - n_no = len(ogi.non_outputs) # number of columns of P matrix. - n_oi_diff = len(ogi.og.output_nodes) - len(ogi.og.input_nodes) # number of rows of P matrix. - n_no_optim = len(ogi.non_outputs_optim) # number of rows and columns of the third block of the K_{LS} matrix. - - # Steps 8, 9 and 10 - kils_matrix = np.concatenate( - (nb_matrix[:, n_no:], nb_matrix[:, :n_no], np.eye(n_no_optim, dtype=np.uint8)), axis=1 - ).view(MatGF2) # N_R | N_L | 1 matrix. - kls_matrix = kils_matrix.gauss_elimination(ncols=n_oi_diff, copy=True) # RREF form is not needed, only REF. - - # Step 11 - p_matrix = np.zeros((n_oi_diff, n_no), dtype=np.uint8).view(MatGF2) - solved_nodes: set[int] = set() - non_outputs_set = set(ogi.non_outputs) - - # Step 12 - while solved_nodes != non_outputs_set: - solvable_nodes = _find_solvable_nodes(ogi, kls_matrix, non_outputs_set, solved_nodes, n_oi_diff) # Step 12.a - if not solvable_nodes: - return None - - _update_p_matrix(ogi, kls_matrix, p_matrix, solvable_nodes, n_oi_diff) # Steps 12.b, 12.c - _update_kls_matrix(ogi, kls_matrix, kils_matrix, solvable_nodes, n_oi_diff, n_no, n_no_optim) # Step 12.d - solved_nodes.update(solvable_nodes) - - return p_matrix - - -def _find_solvable_nodes( - ogi: OpenGraphIndex, - kls_matrix: MatGF2, - non_outputs_set: AbstractSet[int], - solved_nodes: AbstractSet[int], - n_oi_diff: int, -) -> set[int]: - """Return the set nodes whose associated linear system is solvable. - - A node is solvable if: - - It has not been solved yet. - - Its column in the second block of :math:`K_{LS}` (which determines the constants in each equation) has only zeros where it intersects rows for which all the coefficients in the first block are 0s. - - See Theorem 4.4, step 12.a in Mitosek and Backens, 2024 (arXiv:2410.23439). - """ - solvable_nodes: set[int] = set() - - row_idxs = np.flatnonzero( - ~kls_matrix[:, :n_oi_diff].any(axis=1) - ) # Row indices of the 0-rows in the first block of K_{LS}. - if row_idxs.size: - for v in non_outputs_set - solved_nodes: - j = n_oi_diff + ogi.non_outputs.index(v) # `n_oi_diff` is the column offset from the first block of K_{LS}. - if not kls_matrix[row_idxs, j].any(): - solvable_nodes.add(v) - else: - # If the first block of K_{LS} does not have 0-rows, all non-solved nodes are solvable. - solvable_nodes = set(non_outputs_set - solved_nodes) - - return solvable_nodes - - -def _update_p_matrix( - ogi: OpenGraphIndex, kls_matrix: MatGF2, p_matrix: MatGF2, solvable_nodes: AbstractSet[int], n_oi_diff: int -) -> None: - """Update `p_matrix`. - - The solution of the linear system associated with node :math:`v` in `solvable_nodes` corresponds to the column of `p_matrix` associated with node :math:`v`. - - See Theorem 4.4, steps 12.b and 12.c in Mitosek and Backens, 2024 (arXiv:2410.23439). - """ - for v in solvable_nodes: - j = ogi.non_outputs.index(v) - j_shift = n_oi_diff + j # `n_oi_diff` is the column offset from the first block of K_{LS}. - mat = MatGF2(kls_matrix[:, :n_oi_diff]) # First block of K_{LS}, in row echelon form. - b = MatGF2(kls_matrix[:, j_shift]) - x = solve_f2_linear_system(mat, b) - p_matrix[:, j] = x - - -def _update_kls_matrix( - ogi: OpenGraphIndex, - kls_matrix: MatGF2, - kils_matrix: MatGF2, - solvable_nodes: AbstractSet[int], - n_oi_diff: int, - n_no: int, - n_no_optim: int, -) -> None: - """Update `kls_matrix`. - - Bring the linear system encoded in :math:`K_{LS}` to the row-echelon form (REF) that would be achieved by Gaussian elimination if the row and column vectors corresponding to vertices in `solvable_nodes` where not included in the starting matrix. - - See Theorem 4.4, step 12.d in Mitosek and Backens, 2024 (arXiv:2410.23439). - """ - shift = n_oi_diff + n_no # `n_oi_diff` + `n_no` is the column offset from the first two blocks of K_{LS}. - row_permutation: list[int] - - def reorder(old_pos: int, new_pos: int) -> None: # Used in step 12.d.vi - """Reorder the elements of `row_permutation`. - - The element at `old_pos` is placed on the right of the element at `new_pos`. - Example: - ``` - row_permutation = [0, 1, 2, 3, 4] - reorder(1, 3) -> [0, 2, 3, 1, 4] - reorder(2, -1) -> [2, 0, 1, 3, 4] - ``` - """ - val = row_permutation.pop(old_pos) - row_permutation.insert(new_pos + (new_pos < old_pos), val) - - for v in solvable_nodes: - if ( - v in ogi.non_outputs_optim - ): # if `v` corresponded to a zero row in N_B, it was not present in `kls_matrix` because we removed it in the optimization process, so there's no need to do Gaussian elimination for that vertex. - # Step 12.d.ii - j = ogi.non_outputs_optim.index(v) - j_shift = shift + j - row_idxs = np.flatnonzero( - kls_matrix[:, j_shift] - ).tolist() # Row indices with 1s in column of node `v` in third block. - - # `row_idxs` can't be empty: - # The third block of K_{LS} is initially the identity matrix, so all columns have initially a 1. Row permutations and row additions in the Gaussian elimination routine can't remove all 1s from a given column. - k = row_idxs.pop() - - # Step 12.d.iii - kls_matrix[row_idxs] ^= kls_matrix[k] # Adding a row to previous rows preserves REF. - - # Step 12.d.iv - kls_matrix[k] ^= kils_matrix[j] # Row `k` may now break REF. - - # Step 12.d.v - pivots: list[np.int_] = [] # Store pivots for next step. - for i, row in enumerate(kls_matrix): - if i != k: - col_idxs = np.flatnonzero(row[:n_oi_diff]) # Column indices with 1s in first block. - if col_idxs.size == 0: - # Row `i` has all zeros in the first block. Only row `k` can break REF, so rows below have all zeros in the first block too. - break - pivots.append(p := col_idxs[0]) - if kls_matrix[k, p]: # Row `k` has a 1 in the column corresponding to the leading 1 of row `i`. - kls_matrix[k] ^= row - - row_permutation = list(range(n_no_optim)) # Row indices of `kls_matrix`. - n_pivots = len(pivots) - - col_idxs = np.flatnonzero(kls_matrix[k, :n_oi_diff]) - pk = col_idxs[0] if col_idxs.size else None # Pivot of row `k`. - - if pk and k >= n_pivots: # Row `k` is non-zero in the FB (first block) and it's among zero rows. - # Find row `new_pos` s.t. `pivots[new_pos] <= pk < pivots[new_pos+1]`. - new_pos = ( - int(np.argmax(np.array(pivots) > pk) - 1) if pivots else -1 - ) # `pivots` can be empty. If so, we bring row `k` to the top since it's non-zero. - elif pk: # Row `k` is non-zero in the FB and it's among non-zero rows. - # Find row `new_pos` s.t. `pivots[new_pos] <= pk < pivots[new_pos+1]` - new_pos = int(np.argmax(np.array(pivots) > pk) - 1) - # We skipped row `k` in loop of step 12.d.v, so `pivots[j]` can be the pivot of row `j` or `j+1`. - if new_pos >= k: - new_pos += 1 - elif k < n_pivots: # Row `k` is zero in the first block and it's among non-zero rows. - new_pos = ( - n_pivots # Move row `k` to the top of the zeros block (i.e., below the row of the last pivot). - ) - else: # Row `k` is zero in the first block and it's among zero rows. - new_pos = k # Do nothing. - - if new_pos != k: - reorder(k, new_pos) # Modify `row_permutation` in-place. - kls_matrix[:] = kls_matrix[ - row_permutation - ] # `[:]` is crucial to modify the data pointed by `kls_matrix`. - - -def _find_pflow_general(ogi: OpenGraphIndex) -> tuple[MatGF2, MatGF2] | None: - r"""Construct the generalized correction matrix :math:`C'C^B` and the generalized ordering matrix, :math:`NC'C^B` for an open graph with larger number of outputs than inputs. - - Parameters - ---------- - ogi : OpenGraphIndex - Open graph for which :math:`C'C^B` and :math:`NC'C^B` are computed. - - Returns - ------- - correction_matrix : MatGF2 - Matrix encoding the correction function. - ordering_matrix : MatGF2 - Matrix encoding the partial ordering between nodes. - - or `None` - if the input open graph does not have Pauli flow. - - Notes - ----- - - The function returns `None` if - a) The flow-demand matrix is not invertible, or - b) Not all linear systems of equations associated to the non-output nodes are solvable, - meaning that `ogi` does not have Pauli flow. - Condition (b) is satisfied when the flow-demand matrix :math:`M` does not have a right inverse :math:`C` such that :math:`NC` represents a directed acyclical graph (DAG). - - See Theorem 4.4 and Algorithm 3 in Mitosek and Backens, 2024 (arXiv:2410.23439). - """ - n_no = len(ogi.non_outputs) - n_oi_diff = len(ogi.og.output_nodes) - len(ogi.og.input_nodes) - - # Steps 1 and 2 - flow_demand_matrix, order_demand_matrix = _compute_pflow_matrices(ogi) - - # Steps 3 and 4 - correction_matrix_0 = flow_demand_matrix.right_inverse() # C0 matrix. - if correction_matrix_0 is None: - return None # The flow-demand matrix is not invertible, therefore there's no flow. - - # Steps 5, 6 and 7 - ker_flow_demand_matrix = flow_demand_matrix.null_space().transpose() # F matrix. - c_prime_matrix = np.concatenate((correction_matrix_0, ker_flow_demand_matrix), axis=1).view(MatGF2) - - row_idxs = np.flatnonzero(order_demand_matrix.any(axis=1)) # Row indices of the non-zero rows. - - if row_idxs.size: - # The p-matrix finding algorithm runs on the `order_demand_matrix` without the zero rows. - # This optimization is allowed because: - # - The zero rows remain zero after the change of basis (multiplication by `c_prime_matrix`). - # - The zero rows remain zero after gaussian elimination. - # - Removing the zero rows does not change the solvability condition of the open graph nodes. - nb_matrix_optim = ( - order_demand_matrix[row_idxs].view(MatGF2).mat_mul(c_prime_matrix) - ) # `view` is used to keep mypy happy without copying data. - for i in set(range(order_demand_matrix.shape[0])).difference(row_idxs): - ogi.non_outputs_optim.remove(ogi.non_outputs[i]) # Update the node-index mapping. - - # Steps 8 - 12 - if (p_matrix := _compute_p_matrix(ogi, nb_matrix_optim)) is None: - return None - else: - # If all rows of `order_demand_matrix` are zero, any matrix will solve the associated linear system of equations. - p_matrix = np.zeros((n_oi_diff, n_no), dtype=np.uint8).view(MatGF2) - - # Step 13 - cb_matrix = np.concatenate((np.eye(n_no, dtype=np.uint8), p_matrix), axis=0).view(MatGF2) - - correction_matrix = c_prime_matrix.mat_mul(cb_matrix) - ordering_matrix = order_demand_matrix.mat_mul(correction_matrix) - - return correction_matrix, ordering_matrix - - -def _compute_topological_generations(ordering_matrix: MatGF2) -> list[list[int]] | None: - """Stratify the directed acyclic graph (DAG) represented by the ordering matrix into generations. - - Parameters - ---------- - ordering_matrix : MatGF2 - Matrix encoding the partial ordering between nodes interpreted as the adjacency matrix of a directed graph. - - Returns - ------- - list[list[int]] - Topological generations. Integers represent the indices of the matrix `ordering_matrix`, not the labelling of the nodes. - - or `None` - if `ordering_matrix` is not a DAG. - - Notes - ----- - This function is adapted from `:func: networkx.algorithms.dag.topological_generations` so that it works directly on the adjacency matrix (which is the output of the Pauli-flow finding algorithm) instead of a `:class: nx.DiGraph` object. This avoids calling the function `nx.from_numpy_array` which can be expensive for certain graph instances. - - Here we use the convention that the element `ordering_matrix[i,j]` represents a link `j -> i`. NetworkX uses the opposite convention. - """ - adj_mat = ordering_matrix - - indegree_map: dict[int, int] = {} - zero_indegree: list[int] = [] - neighbors = {node: set(np.flatnonzero(row).astype(int)) for node, row in enumerate(adj_mat.T)} - for node, col in enumerate(adj_mat): - parents = np.flatnonzero(col) - if parents.size: - indegree_map[node] = parents.size - else: - zero_indegree.append(node) - - generations: list[list[int]] = [] - - while zero_indegree: - this_generation = zero_indegree - zero_indegree = [] - for node in this_generation: - for child in neighbors[node]: - indegree_map[child] -= 1 - if indegree_map[child] == 0: - zero_indegree.append(child) - del indegree_map[child] - generations.append(this_generation) - - if indegree_map: - return None - return generations - - -def _cnc_matrices2pflow( - ogi: OpenGraphIndex, - correction_matrix: MatGF2, - ordering_matrix: MatGF2, -) -> tuple[dict[int, set[int]], dict[int, int]] | None: - r"""Transform the correction and ordering matrices into a Pauli flow in its standard form (correction function and partial order). - - Parameters - ---------- - ogi : OpenGraphIndex - Open graph whose Pauli flow is calculated. - correction_matrix : MatGF2 - Matrix encoding the correction function. - ordering_matrix : MatGF2 - Matrix encoding the partial ordering between nodes (DAG). - - Returns - ------- - pf : dict[int, set[int]] - Pauli flow correction function. pf[i] is the set of qubits to be corrected for the measurement of qubit i. - l_k : dict[int, int] - Partial order between corrected qubits, such that the pair (`key`, `value`) corresponds to (node, depth). - - or `None` - if the ordering matrix is not a DAG, in which case the input open graph does not have Pauli flow. - - Notes - ----- - - The correction matrix :math:`C` is an :math:`(n - n_I) \times (n - n_O)` matrix related to the correction function :math:`c(v) = \{u \in I^c|C_{u,v} = 1\}`, where :math:`I^c` are the non-input nodes of `ogi`. In other words, the column :math:`v` of :math:`C` encodes the correction set of :math:`v`, :math:`c(v)`. - - - The Pauli flow's ordering :math:`<_c` is the transitive closure of :math:`\lhd_c`, where the latter is related to the ordering matrix :math:`NC` as :math:`v \lhd_c w \Leftrightarrow (NC)_{w,v} = 1`, for :math:`v, w, \in O^c` two non-output nodes of `ogi`. - - See Definition 3.6, Lemma 3.12, and Theorem 3.1 in Mitosek and Backens, 2024 (arXiv:2410.23439). - """ - row_tags = ogi.non_inputs - col_tags = ogi.non_outputs - - # Calculation of the partial ordering - - if (topo_gen := _compute_topological_generations(ordering_matrix)) is None: - return None # The NC matrix is not a DAG, therefore there's no flow. - - l_k = dict.fromkeys(ogi.og.output_nodes, 0) # Output nodes are always in layer 0. - - # If m >_c n, with >_c the flow order for two nodes m, n, then layer(n) > layer(m). - # Therefore, we iterate the topological sort of the graph in _reverse_ order to obtain the order of measurements. - for layer, idx in enumerate(reversed(topo_gen), start=1): - l_k.update({col_tags[i]: layer for i in idx}) - - # Calculation of the correction function - - pf: dict[int, set[int]] = {} - for node in col_tags: - i = col_tags.index(node) - correction_set = {row_tags[j] for j in np.flatnonzero(correction_matrix[:, i])} - pf[node] = correction_set - - return pf, l_k - - -def find_pflow(og: OpenGraph[Measurement]) -> tuple[dict[int, set[int]], dict[int, int]] | None: - """Return a Pauli flow of the input open graph if it exists. - - Parameters - ---------- - og : OpenGraph - Open graph whose Pauli flow is calculated. - - Returns - ------- - pf : dict[int, set[int]] - Pauli flow correction function. `pf[i]` is the set of qubits to be corrected for the measurement of qubit `i`. - l_k : dict[int, int] - Partial order between corrected qubits, such that the pair (`key`, `value`) corresponds to (node, depth). - - or `None` - if the input open graph does not have Pauli flow. - - Notes - ----- - See Theorems 3.1, 4.2 and 4.4, and Algorithms 2 and 3 in Mitosek and Backens, 2024 (arXiv:2410.23439). - """ - ni = len(og.input_nodes) - no = len(og.output_nodes) - - if ni > no: - return None - - ogi = OpenGraphIndex(og) - - cnc_matrices = _find_pflow_simple(ogi) if ni == no else _find_pflow_general(ogi) - if cnc_matrices is None: - return None - pflow = _cnc_matrices2pflow(ogi, *cnc_matrices) - if pflow is None: - return None - - pf, l_k = pflow - - return pf, l_k diff --git a/graphix/flow/core.py b/graphix/flow/core.py index 05e25af1..bdf9ee7c 100644 --- a/graphix/flow/core.py +++ b/graphix/flow/core.py @@ -3,6 +3,7 @@ from __future__ import annotations import dataclasses +from collections import defaultdict from collections.abc import Sequence from copy import copy from dataclasses import dataclass @@ -22,6 +23,7 @@ _PM_co, compute_partial_order_layers, ) +from graphix.flow._partial_order import compute_topological_generations from graphix.flow.exceptions import ( FlowError, FlowGenericError, @@ -118,52 +120,16 @@ def from_measured_nodes_mapping( x_corrections = x_corrections or {} z_corrections = z_corrections or {} - nodes_set = set(og.graph.nodes) - outputs_set = frozenset(og.output_nodes) non_outputs_set = set(og.measurements) if not non_outputs_set.issuperset(x_corrections.keys() | z_corrections.keys()): raise XZCorrectionsGenericError(XZCorrectionsGenericErrorReason.IncorrectKeys) - dag = _corrections_to_dag(x_corrections, z_corrections) - partial_order_layers = _dag_to_partial_order_layers(dag) + partial_order_layers = _corrections_to_partial_order_layers( + og, x_corrections, z_corrections + ) # Raises an `XZCorrectionsError` if mappings are not well formed. - if partial_order_layers is None: - raise XZCorrectionsGenericError(XZCorrectionsGenericErrorReason.ClosedLoop) - - # If there're no corrections, the partial order has 2 layers only: outputs and measured nodes. - if len(partial_order_layers) == 0: - partial_order_layers = [outputs_set] if outputs_set else [] - if non_outputs_set: - partial_order_layers.append(frozenset(non_outputs_set)) - return XZCorrections(og, x_corrections, z_corrections, tuple(partial_order_layers)) - - # If the open graph has outputs, the first element in the output of `_dag_to_partial_order_layers(dag)` may or may not contain all or some output nodes. - if outputs_set: - if measured_layer_0 := partial_order_layers[0] - outputs_set: - # `partial_order_layers[0]` contains (some or all) outputs and measured nodes - partial_order_layers = [ - outputs_set, - frozenset(measured_layer_0), - *partial_order_layers[1:], - ] - else: - # `partial_order_layers[0]` contains only (some or all) outputs - partial_order_layers = [ - outputs_set, - *partial_order_layers[1:], - ] - - ordered_nodes = frozenset.union(*partial_order_layers) - - if not ordered_nodes.issubset(nodes_set): - raise XZCorrectionsGenericError(XZCorrectionsGenericErrorReason.IncorrectValues) - - # We include all the non-output nodes not involved in the corrections in the last layer (first measured nodes). - if unordered_nodes := frozenset(nodes_set - ordered_nodes): - partial_order_layers[-1] |= unordered_nodes - - return XZCorrections(og, x_corrections, z_corrections, tuple(partial_order_layers)) + return XZCorrections(og, x_corrections, z_corrections, partial_order_layers) def to_pattern( self: XZCorrections[Measurement], @@ -1036,26 +1002,70 @@ def _corrections_to_dag( return nx.DiGraph(relations) -def _dag_to_partial_order_layers(dag: nx.DiGraph[int]) -> list[frozenset[int]] | None: - """Return the partial order encoded in a directed graph in a layer form if it exists. +def _corrections_to_partial_order_layers( + og: OpenGraph[_M_co], x_corrections: Mapping[int, AbstractSet[int]], z_corrections: Mapping[int, AbstractSet[int]] +) -> tuple[frozenset[int], ...]: + """Return the partial order encoded in the correction mappings in a layer form if it exists. Parameters ---------- - dag : nx.DiGraph[int] - A directed graph. + og : OpenGraph[_M_co] + The open graph with respect to which the XZ-corrections are defined. + x_corrections : Mapping[int, AbstractSet[int]] + Mapping of X-corrections: in each (`key`, `value`) pair, `key` is a measured node, and `value` is the set of nodes on which an X-correction must be applied depending on the measurement result of `key`. + z_corrections : Mapping[int, AbstractSet[int]] + Mapping of Z-corrections: in each (`key`, `value`) pair, `key` is a measured node, and `value` is the set of nodes on which an Z-correction must be applied depending on the measurement result of `key`. Returns ------- - list[set[int]] | None - Partial order between corrected qubits in a layer form or `None` if the input directed graph is not acyclical. + tuple[frozenset[int], ...] + Partial order between the open graph's in a layer form. The set `layers[i]` comprises the nodes in layer `i`. Nodes in layer `i` are "larger" in the partial order than nodes in layer `i+1`. - """ - try: - topo_gen = reversed(list(nx.topological_generations(dag))) - except nx.NetworkXUnfeasible: - return None - return [frozenset(layer) for layer in topo_gen] + Raises + ------ + XZCorrectionsError + If the input dictionaries are not well formed. In well-formed correction dictionaries: + - Keys are a subset of the measured nodes. + - Values correspond to nodes of the open graph. + - Corrections do not form closed loops. + """ + oset = frozenset(og.output_nodes) # First layer by convention if not empty + dag: defaultdict[int, set[int]] = defaultdict( + set + ) # `i: {j}` represents `i -> j`, i.e., a correction applied to qubit `j`, conditioned on the measurement outcome of qubit `i`. + indegree_map: dict[int, int] = {} + + for corrections in [x_corrections, z_corrections]: + for measured_node, corrected_nodes in corrections.items(): + if measured_node not in oset: + for corrected_node in corrected_nodes - oset: + if corrected_node not in dag[measured_node]: # Don't include multiple edges in the dag. + dag[measured_node].add(corrected_node) + indegree_map[corrected_node] = indegree_map.get(corrected_node, 0) + 1 + + zero_indegree = og.graph.nodes - oset - indegree_map.keys() + generations = compute_topological_generations(dag, indegree_map, zero_indegree) + if generations is None: + raise XZCorrectionsGenericError(XZCorrectionsGenericErrorReason.ClosedLoop) + + if len(generations) == 0: + if oset: + return (oset,) + return () + + ordered_nodes = frozenset.union(*generations) + + if not ordered_nodes.issubset(og.graph.nodes): + raise XZCorrectionsGenericError(XZCorrectionsGenericErrorReason.IncorrectValues) + + # We include all the non-output nodes not involved in the corrections in the last layer (first measured nodes). + if unordered_nodes := frozenset(og.graph.nodes - ordered_nodes - oset): + generations = *generations[:-1], frozenset(generations[-1] | unordered_nodes) + + if oset: + return oset, *generations[::-1] + return generations[::-1] def _check_correction_function_domain( diff --git a/graphix/gflow.py b/graphix/gflow.py deleted file mode 100644 index 62a7c849..00000000 --- a/graphix/gflow.py +++ /dev/null @@ -1,950 +0,0 @@ -"""Flow finding algorithm. - -For a given underlying graph (G, I, O, meas_plane), this method finds a (generalized) flow [NJP 9, 250 (2007)] in polynomial time. -In particular, this outputs gflow with minimum depth, maximally delayed gflow. - -Ref: Mhalla and Perdrix, International Colloquium on Automata, -Languages, and Programming (Springer, 2008), pp. 857-868. -Ref: Backens et al., Quantum 5, 421 (2021). - -""" - -from __future__ import annotations - -from copy import deepcopy -from typing import TYPE_CHECKING - -from typing_extensions import assert_never - -import graphix.find_pflow -import graphix.opengraph -from graphix.command import CommandKind -from graphix.fundamentals import Axis, Plane -from graphix.measurements import Measurement, PauliMeasurement -from graphix.parameter import Placeholder - -if TYPE_CHECKING: - from collections.abc import Mapping - from collections.abc import Set as AbstractSet - - import networkx as nx - - from graphix.parameter import ExpressionOrFloat - from graphix.pattern import Pattern - - -# TODO: This should be ensured by type-checking. -def check_meas_planes(meas_planes: dict[int, Plane]) -> None: - """Check that all planes are valid planes.""" - for node, plane in meas_planes.items(): - if not isinstance(plane, Plane): - raise TypeError(f"Measure plane for {node} is `{plane}`, which is not an instance of `Plane`") - - -# NOTE: In a future version this function will take an `OpenGraph` object as input. -def find_gflow( - graph: nx.Graph[int], - iset: AbstractSet[int], - oset: AbstractSet[int], - meas_planes: Mapping[int, Plane], - mode: str = "single", # noqa: ARG001 Compatibility with old API -) -> tuple[dict[int, set[int]], dict[int, int]] | tuple[None, None]: - r"""Return a maximally delayed general flow (gflow) of the input open graph if it exists. - - Parameters - ---------- - graph: :class:`networkx.Graph` - Graph (including input and output). - iset: AbstractSet[int] - Set of input nodes. - oset: AbstractSet[int] - Set of output nodes. - meas_planes: Mapping[int, Plane] - Measurement planes for each qubit. meas_planes[i] is the measurement plane for qubit i. - mode: str - Deprecated. Reminiscent of old API, it will be removed in future versions. - - Returns - ------- - dict[int, set[int]] - Gflow correction function. In a given pair (key, value), value is the set of qubits to be corrected for the measurement of qubit key. - dict[int, int] - Partial order between corrected qubits, such that the pair (key, value) corresponds to (node, depth). - - or None, None - if the input open graph does not have gflow. - - Notes - ----- - This function implements the algorithm in [1], see module graphix.find_pflow. - See [1] or [2] for a definition of gflow. - - References - ---------- - [1] Mitosek and Backens, 2024 (arXiv:2410.23439). - [2] Backens et al., Quantum 5, 421 (2021). - """ - meas = {node: Measurement(Placeholder("Angle"), plane) for node, plane in meas_planes.items()} - og = graphix.opengraph.OpenGraph( - graph=graph, - input_nodes=list(iset), - output_nodes=list(oset), - measurements=meas, - ) - gf = graphix.find_pflow.find_pflow(og) - if gf is None: - return None, None # This is to comply with old API. It will be change in the future to `None`` - return gf[0], gf[1] - - -def find_flow( - graph: nx.Graph[int], - iset: set[int], - oset: set[int], - meas_planes: dict[int, Plane] | None = None, -) -> tuple[dict[int, set[int]], dict[int, int]] | tuple[None, None]: - """Causal flow finding algorithm. - - For open graph g with input, output, and measurement planes, this returns causal flow. - For more detail of causal flow, see Danos and Kashefi, PRA 74, 052310 (2006). - - Original algorithm by Mhalla and Perdrix, - International Colloquium on Automata, Languages, and Programming (2008), - pp. 857-868. - - Parameters - ---------- - graph: :class:`networkx.Graph` - Graph (incl. input and output) - iset: set - set of node labels for input - oset: set - set of node labels for output - meas_planes: dict(int, Plane) - measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. - Note that an underlying graph has a causal flow only if all measurement planes are Plane.XY. - If not specified, all measurement planes are interpreted as Plane.XY. - - Returns - ------- - f: list of nodes - causal flow function. f[i] is the qubit to be measured after qubit i. - l_k: dict - layers obtained by gflow algorithm. l_k[d] is a node set of depth d. - """ - check_meas_planes(meas_planes) - nodes = set(graph.nodes) - edges = set(graph.edges) - - if meas_planes is None: - meas_planes = dict.fromkeys(nodes - oset, Plane.XY) - - for plane in meas_planes.values(): - if plane != Plane.XY: - return None, None - - l_k = dict.fromkeys(nodes, 0) - f = {} - k = 1 - v_c = oset - iset - return flowaux(nodes, edges, iset, oset, v_c, f, l_k, k) - - -def flowaux( - nodes: set[int], - edges: set[tuple[int, int]], - iset: set[int], - oset: set[int], - v_c: set[int], - f: dict[int, set[int]], - l_k: dict[int, int], - k: int, -): - """Find one layer of the flow. - - Ref: Mhalla and Perdrix, International Colloquium on Automata, - Languages, and Programming (Springer, 2008), pp. 857-868. - - Parameters - ---------- - nodes: set - labels of all qubits (nodes) - edges: set - edges - iset: set - set of node labels for input - oset: set - set of node labels for output - v_c: set - correction candidate qubits - f: dict - flow function. f[i] is the qubit to be measured after qubit i. - l_k: dict - layers obtained by flow algorithm. l_k[d] is a node set of depth d. - k: int - current layer number. - meas_planes: dict - measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. - - Outputs - ------- - f: list of nodes - causal flow function. f[i] is the qubit to be measured after qubit i. - l_k: dict - layers obtained by gflow algorithm. l_k[d] is a node set of depth d. - """ - v_out_prime = set() - c_prime = set() - - for q in v_c: - nb = search_neighbor(q, edges) - p_set = nb & (nodes - oset) - if len(p_set) == 1: - # Iterate over p_set assuming there is only one element p - (p,) = p_set - f[p] = {q} - l_k[p] = k - v_out_prime |= {p} - c_prime |= {q} - # determine whether there exists flow - if not v_out_prime: - if oset == nodes: - return f, l_k - return None, None - return flowaux( - nodes, - edges, - iset, - oset | v_out_prime, - (v_c - c_prime) | (v_out_prime & (nodes - iset)), - f, - l_k, - k + 1, - ) - - -# NOTE: In a future version this function will take an `OpenGraph` object as input. -def find_pauliflow( - graph: nx.Graph[int], - iset: AbstractSet[int], - oset: AbstractSet[int], - meas_planes: Mapping[int, Plane], - meas_angles: Mapping[int, ExpressionOrFloat], - mode: str = "single", # noqa: ARG001 Compatibility with old API -) -> tuple[dict[int, set[int]], dict[int, int]] | tuple[None, None]: - r"""Return a maximally delayed Pauli flow of the input open graph if it exists. - - Parameters - ---------- - graph: :class:`networkx.Graph` - Graph (including input and output). - iset: AbstractSet[int] - Set of input nodes. - oset: AbstractSet[int] - Set of output nodes. - meas_planes: Mapping[int, Plane] - Measurement planes for each qubit. meas_planes[i] is the measurement plane for qubit i. - meas_angles: Mapping[int, ExpressionOrFloat] - Measurement angles for each qubit. meas_angles[i] is the measurement angle for qubit i. - mode: str - Deprecated. Reminiscent of old API, it will be removed in future versions. - - Returns - ------- - dict[int, set[int]] - Pauli flow correction function. In a given pair (key, value), value is the set of qubits to be corrected for the measurement of qubit key. - dict[int, int] - Partial order between corrected qubits, such that the pair (key, value) corresponds to (node, depth). - - or None, None - if the input open graph does not have gflow. - - Notes - ----- - This function implements the algorithm in [1], see module graphix.find_pflow. - See [1] or [2] for a definition of Pauli flow. - - References - ---------- - [1] Mitosek and Backens, 2024 (arXiv:2410.23439). - [2] Browne et al., 2007 New J. Phys. 9 250 (arXiv:quant-ph/0702212) - """ - meas = {node: Measurement(angle, meas_planes[node]) for node, angle in meas_angles.items()} - og = graphix.opengraph.OpenGraph( - graph=graph, - input_nodes=list(iset), - output_nodes=list(oset), - measurements=meas, - ) - pf = graphix.find_pflow.find_pflow(og) - if pf is None: - return None, None # This is to comply with old API. It will be change in the future to `None`` - return pf[0], pf[1] - - -def flow_from_pattern(pattern: Pattern) -> tuple[dict[int, set[int]], dict[int, int]] | tuple[None, None]: - """Check if the pattern has a valid flow. If so, return the flow and layers. - - Parameters - ---------- - pattern: Pattern - pattern to be based on - - Returns - ------- - None, None: - The tuple ``(None, None)`` is returned if the pattern does not have a valid causal flow. - f: dict[int, set[int]] - flow function. g[i] is the set of qubits to be corrected for the measurement of qubit i. - l_k: dict[int, int] - layers obtained by flow algorithm. l_k[d] is a node set of depth d. - """ - if not pattern.is_standard(strict=True): - raise ValueError("The pattern should be standardized first.") - meas_planes = pattern.get_meas_plane() - for plane in meas_planes.values(): - if plane != Plane.XY: - return None, None - graph = pattern.extract_graph() - input_nodes = pattern.input_nodes if not pattern.input_nodes else set() - output_nodes = set(pattern.output_nodes) - - layers = pattern.get_layers() - l_k = {} - for l in layers[1]: - for n in layers[1][l]: - l_k[n] = l - lmax = max(l_k.values()) if l_k else 0 - for node, val in l_k.items(): - l_k[node] = lmax - val + 1 - for output_node in pattern.output_nodes: - l_k[output_node] = 0 - - xflow, zflow = get_corrections_from_pattern(pattern) - - if verify_flow(graph, input_nodes, output_nodes, xflow): # if xflow is valid - zflow_from_xflow = {} - for node, corrections in deepcopy(xflow).items(): - cand = find_odd_neighbor(graph, corrections) - {node} - if cand: - zflow_from_xflow[node] = cand - if zflow_from_xflow != zflow: # if zflow is consistent with xflow - return None, None - return xflow, l_k - return None, None - - -def gflow_from_pattern(pattern: Pattern) -> tuple[dict[int, set[int]], dict[int, int]] | tuple[None, None]: - """Check if the pattern has a valid gflow. If so, return the gflow and layers. - - Parameters - ---------- - pattern: Pattern - pattern to be based on - - Returns - ------- - None, None: - The tuple ``(None, None)`` is returned if the pattern does not have a valid gflow. - g: dict[int, set[int]] - gflow function. g[i] is the set of qubits to be corrected for the measurement of qubit i. - l_k: dict[int, int] - layers obtained by gflow algorithm. l_k[d] is a node set of depth d. - """ - if not pattern.is_standard(strict=True): - raise ValueError("The pattern should be standardized first.") - graph = pattern.extract_graph() - input_nodes = set(pattern.input_nodes) if pattern.input_nodes else set() - output_nodes = set(pattern.output_nodes) - meas_planes = pattern.get_meas_plane() - - layers = pattern.get_layers() - l_k = {} - for l in layers[1]: - for n in layers[1][l]: - l_k[n] = l - lmax = max(l_k.values()) if l_k else 0 - for node, val in l_k.items(): - l_k[node] = lmax - val + 1 - for output_node in pattern.output_nodes: - l_k[output_node] = 0 - - xflow, zflow = get_corrections_from_pattern(pattern) - for node, plane in meas_planes.items(): - if plane in {Plane.XZ, Plane.YZ}: - if node not in xflow: - xflow[node] = {node} - xflow[node] |= {node} - - if verify_gflow(graph, input_nodes, output_nodes, xflow, meas_planes): # if xflow is valid - zflow_from_xflow = {} - for node, corrections in deepcopy(xflow).items(): - cand = find_odd_neighbor(graph, corrections) - {node} - if cand: - zflow_from_xflow[node] = cand - if zflow_from_xflow != zflow: # if zflow is consistent with xflow - return None, None - return xflow, l_k - return None, None - - -# TODO: Shouldn't call `find_pauliflow` -def pauliflow_from_pattern( - pattern: Pattern, - mode="single", # noqa: ARG001 Compatibility with old API -) -> tuple[dict[int, set[int]], dict[int, int]] | tuple[None, None]: - """Check if the pattern has a valid Pauliflow. If so, return the Pauliflow and layers. - - Parameters - ---------- - pattern: Pattern - pattern to be based on - mode: str - The Pauliflow finding algorithm can yield multiple equivalent solutions. So there are two options - - "single": Returns a single solution - - "all": Returns all possible solutions - - Optional. Default is "single". - - Returns - ------- - None, None: - The tuple ``(None, None)`` is returned if the pattern does not have a valid Pauli flow. - p: dict[int, set[int]] - Pauli flow function. p[i] is the set of qubits to be corrected for the measurement of qubit i. - l_k: dict[int, int] - layers obtained by Pauli flow algorithm. l_k[d] is a node set of depth d. - """ - if not pattern.is_standard(strict=True): - raise ValueError("The pattern should be standardized first.") - graph = pattern.extract_graph() - input_nodes = set(pattern.input_nodes) if pattern.input_nodes else set() - output_nodes = set(pattern.output_nodes) if pattern.output_nodes else set() - meas_planes = pattern.get_meas_plane() - meas_angles = pattern.get_angles() - - return find_pauliflow(graph, input_nodes, output_nodes, meas_planes, meas_angles) - - -def get_corrections_from_pattern(pattern: Pattern) -> tuple[dict[int, set[int]], dict[int, set[int]]]: - """Get x and z corrections from pattern. - - Parameters - ---------- - pattern: graphix.Pattern object - pattern to be based on - - Returns - ------- - xflow: dict - xflow function. xflow[i] is the set of qubits to be corrected in the X basis for the measurement of qubit i. - zflow: dict - zflow function. zflow[i] is the set of qubits to be corrected in the Z basis for the measurement of qubit i. - """ - nodes = pattern.extract_nodes() - xflow = {} - zflow = {} - for cmd in pattern: - if cmd.kind == CommandKind.M: - target = cmd.node - xflow_source = cmd.s_domain & nodes - zflow_source = cmd.t_domain & nodes - for node in xflow_source: - if node not in xflow: - xflow[node] = set() - xflow[node] |= {target} - for node in zflow_source: - if node not in zflow: - zflow[node] = set() - zflow[node] |= {target} - if cmd.kind == CommandKind.X: - target = cmd.node - xflow_source = cmd.domain & nodes - for node in xflow_source: - if node not in xflow: - xflow[node] = set() - xflow[node] |= {target} - if cmd.kind == CommandKind.Z: - target = cmd.node - zflow_source = cmd.domain & nodes - for node in zflow_source: - if node not in zflow: - zflow[node] = set() - zflow[node] |= {target} - return xflow, zflow - - -def search_neighbor(node: int, edges: set[tuple[int, int]]) -> set[int]: - """Find neighborhood of node in edges. This is an ancillary method for `flowaux()`. - - Parameter - ------- - node: int - target node number whose neighboring nodes will be collected - edges: set of taples - set of edges in the graph - - Outputs - ------ - N: list of ints - neighboring nodes - """ - nb = set() - for edge in edges: - if node == edge[0]: - nb |= {edge[1]} - elif node == edge[1]: - nb |= {edge[0]} - return nb - - -def get_min_depth(l_k: Mapping[int, int]) -> int: - """Get minimum depth of graph. - - Parameters - ---------- - l_k: dict - layers obtained by flow or gflow - - Returns - ------- - d: int - minimum depth of graph - """ - return max(l_k.values()) - - -def find_odd_neighbor(graph: nx.Graph[int], vertices: AbstractSet[int]) -> set[int]: - """Return the set containing the odd neighbor of a set of vertices. - - Parameters - ---------- - graph : :class:`networkx.Graph` - Underlying graph - vertices : set - set of nodes indices to find odd neighbors - - Returns - ------- - odd_neighbors : set - set of indices for odd neighbor of set `vertices`. - """ - odd_neighbors = set() - for vertex in vertices: - neighbors = set(graph.neighbors(vertex)) - odd_neighbors ^= neighbors - return odd_neighbors - - -def get_layers(l_k: Mapping[int, int]) -> tuple[int, dict[int, set[int]]]: - """Get components of each layer. - - Parameters - ---------- - l_k: dict - layers obtained by flow or gflow algorithms - - Returns - ------- - d: int - minimum depth of graph - layers: dict of set - components of each layer - """ - d = get_min_depth(l_k) - layers: dict[int, set[int]] = {k: set() for k in range(d + 1)} - for i, val in l_k.items(): - layers[val] |= {i} - return d, layers - - -def get_dependence_flow( - inputs: set[int], - flow: dict[int, set[int]], - odd_flow: dict[int, set[int]], -) -> dict[int, set[int]]: - """Get dependence flow from flow. - - Parameters - ---------- - inputs: set[int] - set of input nodes - flow: dict[int, set] - flow function. flow[i] is the set of qubits to be corrected for the measurement of qubit i. - odd_flow: dict[int, set] - odd neighbors of flow or gflow. - odd_flow[i] is the set of odd neighbors of f(i), Odd(f(i)). - - Returns - ------- - dependence_flow: dict[int, set] - dependence flow function. dependence_flow[i] is the set of qubits to be corrected for the measurement of qubit i. - """ - dependence_flow = {u: set() for u in inputs} - # concatenate flow and odd_flow - combined_flow = {} - for node, corrections in flow.items(): - combined_flow[node] = corrections | odd_flow[node] - for node, corrections in combined_flow.items(): - for correction in corrections: - if correction not in dependence_flow: - dependence_flow[correction] = set() - dependence_flow[correction] |= {node} - return dependence_flow - - -def get_dependence_pauliflow( - inputs: set[int], - flow: dict[int, set[int]], - odd_flow: dict[int, set[int]], - ls: tuple[set[int], set[int], set[int]], -): - """Get dependence flow from Pauli flow. - - Parameters - ---------- - inputs: set[int] - set of input nodes - flow: dict[int, set[int]] - Pauli flow function. p[i] is the set of qubits to be corrected for the measurement of qubit i. - odd_flow: dict[int, set[int]] - odd neighbors of Pauli flow or gflow. Odd(p(i)) - ls: tuple - ls = (l_x, l_y, l_z) where l_x, l_y, l_z are sets of qubits whose measurement operators are X, Y, Z, respectively. - - Returns - ------- - dependence_pauliflow: dict[int, set[int]] - dependence flow function. dependence_pauliflow[i] is the set of qubits to be corrected for the measurement of qubit i. - """ - l_x, l_y, l_z = ls - dependence_pauliflow = {u: set() for u in inputs} - # concatenate p and odd_p - combined_flow = {} - for node, corrections in flow.items(): - combined_flow[node] = (corrections - (l_x | l_y)) | (odd_flow[node] - (l_y | l_z)) - for ynode in l_y: - if ynode in corrections.symmetric_difference(odd_flow[node]): - combined_flow[node] |= {ynode} - for node, corrections in combined_flow.items(): - for correction in corrections: - if correction not in dependence_pauliflow: - dependence_pauliflow[correction] = set() - dependence_pauliflow[correction] |= {node} - return dependence_pauliflow - - -def get_layers_from_flow( - flow: dict[int, set], - odd_flow: dict[int, set], - inputs: set[int], - outputs: set[int], - ls: tuple[set[int], set[int], set[int]] | None = None, -) -> tuple[dict[int, set], int]: - """Get layers from flow (incl. gflow, Pauli flow). - - Parameters - ---------- - flow: dict[int, set] - flow function. flow[i] is the set of qubits to be corrected for the measurement of qubit i. - odd_flow: dict[int, set] - odd neighbors of flow or gflow. Odd(f(node)) - inputs: set - set of input nodes - outputs: set - set of output nodes - ls: tuple - ls = (l_x, l_y, l_z) where l_x, l_y, l_z are sets of qubits whose measurement operators are X, Y, Z, respectively. - If not None, the layers are obtained based on Pauli flow. - - Returns - ------- - layers: dict[int, set] - layers obtained from flow - depth: int - depth of the layers - - Raises - ------ - ValueError - If the flow is not valid(e.g. there is no partial order). - """ - layers = {} - depth = 0 - if ls is None: - dependence_flow = get_dependence_flow(inputs, odd_flow, flow) - else: - dependence_flow = get_dependence_pauliflow(inputs, flow, odd_flow, ls) - left_nodes = set(flow.keys()) - for output in outputs: - if output in left_nodes: - raise ValueError("Invalid flow") - while True: - layers[depth] = set() - for node in left_nodes: - if node not in dependence_flow or len(dependence_flow[node]) == 0 or dependence_flow[node] == {node}: - layers[depth] |= {node} - left_nodes -= layers[depth] - for node in left_nodes: - dependence_flow[node] -= layers[depth] - if len(layers[depth]) == 0: - if len(left_nodes) == 0: - layers[depth] = outputs - depth += 1 - break - raise ValueError("Invalid flow") - depth += 1 - return layers, depth - - -def verify_flow( - graph: nx.Graph, - iset: set[int], - oset: set[int], - flow: dict[int, set], - meas_planes: dict[int, Plane] | None = None, -) -> bool: - """Check whether the flow is valid. - - Parameters - ---------- - graph: :class:`networkx.Graph` - Graph (incl. input and output) - flow: dict[int, set] - flow function. flow[i] is the set of qubits to be corrected for the measurement of qubit i. - meas_planes: dict[int, str] - optional: measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. - - - Returns - ------- - valid_flow: bool - True if the flow is valid. False otherwise. - """ - if meas_planes is None: - meas_planes = {} - check_meas_planes(meas_planes) - valid_flow = True - non_outputs = set(graph.nodes) - oset - # if meas_planes is given, check whether all measurement planes are "XY" - for node, plane in meas_planes.items(): - if plane != Plane.XY or node not in non_outputs: - return False - - odd_flow = {node: find_odd_neighbor(graph, corrections) for node, corrections in flow.items()} - - try: - _, _ = get_layers_from_flow(flow, odd_flow, iset, oset) - except ValueError: - return False - # check if v ~ f(v) for each node - edges = set(graph.edges) - for node, corrections in flow.items(): - if len(corrections) > 1: - return False - correction = next(iter(corrections)) - if (node, correction) not in edges and (correction, node) not in edges: - return False - return valid_flow - - -def verify_gflow( - graph: nx.Graph, - iset: set[int], - oset: set[int], - gflow: dict[int, set], - meas_planes: dict[int, Plane], -) -> bool: - """Check whether the gflow is valid. - - Parameters - ---------- - graph: :class:`networkx.Graph` - Graph (incl. input and output) - iset: set - set of node labels for input - oset: set - set of node labels for output - gflow: dict[int, set] - gflow function. gflow[i] is the set of qubits to be corrected for the measurement of qubit i. - .. seealso:: :func:`find_gflow` - meas_planes: dict[int, str] - measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. - - Returns - ------- - valid_gflow: bool - True if the gflow is valid. False otherwise. - """ - check_meas_planes(meas_planes) - valid_gflow = True - non_outputs = set(graph.nodes) - oset - odd_flow = {} - for non_output in non_outputs: - if non_output not in gflow: - gflow[non_output] = set() - odd_flow[non_output] = set() - else: - odd_flow[non_output] = find_odd_neighbor(graph, gflow[non_output]) - - try: - _, _ = get_layers_from_flow(gflow, odd_flow, iset, oset) - except ValueError: - return False - - # check for each measurement plane - for node, plane in meas_planes.items(): - # index = node_order.index(node) - if plane == Plane.XY: - valid_gflow &= (node not in gflow[node]) and (node in odd_flow[node]) - elif plane == Plane.XZ: - valid_gflow &= (node in gflow[node]) and (node in odd_flow[node]) - elif plane == Plane.YZ: - valid_gflow &= (node in gflow[node]) and (node not in odd_flow[node]) - - return valid_gflow - - -def verify_pauliflow( - graph: nx.Graph, - iset: set[int], - oset: set[int], - pauliflow: dict[int, set[int]], - meas_planes: dict[int, Plane], - meas_angles: dict[int, float], -) -> bool: - """Check whether the Pauliflow is valid. - - Parameters - ---------- - graph: :class:`networkx.Graph` - Graph (incl. input and output) - iset: set - set of node labels for input - oset: set - set of node labels for output - pauliflow: dict[int, set] - Pauli flow function. pauliflow[i] is the set of qubits to be corrected for the measurement of qubit i. - meas_planes: dict[int, Plane] - measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. - meas_angles: dict[int, float] - measurement angles for each qubits. meas_angles[i] is the measurement angle for qubit i. - - Returns - ------- - valid_pauliflow: bool - True if the Pauliflow is valid. False otherwise. - """ - check_meas_planes(meas_planes) - l_x, l_y, l_z = get_pauli_nodes(meas_planes, meas_angles) - - valid_pauliflow = True - non_outputs = set(graph.nodes) - oset - odd_flow = {} - for non_output in non_outputs: - if non_output not in pauliflow: - pauliflow[non_output] = set() - odd_flow[non_output] = set() - else: - odd_flow[non_output] = find_odd_neighbor(graph, pauliflow[non_output]) - - try: - layers, depth = get_layers_from_flow(pauliflow, odd_flow, iset, oset, (l_x, l_y, l_z)) - except ValueError: - return False - node_order = [] - for d in range(depth): - node_order.extend(list(layers[d])) - - for node, plane in meas_planes.items(): - if node in l_x: - valid_pauliflow &= node in odd_flow[node] - elif node in l_z: - valid_pauliflow &= node in pauliflow[node] - elif node in l_y: - valid_pauliflow &= node in pauliflow[node].symmetric_difference(odd_flow[node]) - elif plane == Plane.XY: - valid_pauliflow &= (node not in pauliflow[node]) and (node in odd_flow[node]) - elif plane == Plane.XZ: - valid_pauliflow &= (node in pauliflow[node]) and (node in odd_flow[node]) - elif plane == Plane.YZ: - valid_pauliflow &= (node in pauliflow[node]) and (node not in odd_flow[node]) - - return valid_pauliflow - - -def get_input_from_flow(flow: dict[int, set]) -> set: - """Get input nodes from flow. - - Parameters - ---------- - flow: dict[int, set] - flow function. flow[i] is the set of qubits to be corrected for the measurement of qubit i. - - Returns - ------- - inputs: set - set of input nodes - """ - non_output = set(flow.keys()) - for correction in flow.values(): - non_output -= correction - return non_output - - -def get_output_from_flow(flow: dict[int, set]) -> set: - """Get output nodes from flow. - - Parameters - ---------- - flow: dict[int, set] - flow function. flow[i] is the set of qubits to be corrected for the measurement of qubit i. - - Returns - ------- - outputs: set - set of output nodes - """ - non_outputs = set(flow.keys()) - non_inputs = set() - for correction in flow.values(): - non_inputs |= correction - return non_inputs - non_outputs - - -def get_pauli_nodes( - meas_planes: dict[int, Plane], meas_angles: Mapping[int, ExpressionOrFloat] -) -> tuple[set[int], set[int], set[int]]: - """Get sets of nodes measured in X, Y, Z basis. - - Parameters - ---------- - meas_planes: dict[int, Plane] - measurement planes for each node. - meas_angles: dict[int, float] - measurement angles for each node. - - Returns - ------- - l_x: set - set of nodes measured in X basis. - l_y: set - set of nodes measured in Y basis. - l_z: set - set of nodes measured in Z basis. - """ - check_meas_planes(meas_planes) - l_x, l_y, l_z = set(), set(), set() - for node, plane in meas_planes.items(): - pm = PauliMeasurement.try_from(plane, meas_angles[node]) - if pm is None: - continue - if pm.axis == Axis.X: - l_x |= {node} - elif pm.axis == Axis.Y: - l_y |= {node} - elif pm.axis == Axis.Z: - l_z |= {node} - else: - assert_never(pm.axis) - return l_x, l_y, l_z diff --git a/graphix/opengraph.py b/graphix/opengraph.py index 125f24ce..12714532 100644 --- a/graphix/opengraph.py +++ b/graphix/opengraph.py @@ -163,7 +163,7 @@ def is_equal_structurally(self, other: OpenGraph[AbstractMeasurement]) -> bool: ----- This method verifies the open graphs have: - Truly equal underlying graphs (not up to an isomorphism). - - Equal input and output nodes. + - Equal input and output nodes. This assumes equal types as well, i.e., if ``self.input_nodes`` is a ``list`` and ``other.input_nodes`` is a ``tuple``, this method will return ``False``. It assumes the open graphs are well formed. The static typer allows comparing the structure of two open graphs with different parametric type. @@ -171,7 +171,7 @@ def is_equal_structurally(self, other: OpenGraph[AbstractMeasurement]) -> bool: return ( nx.utils.graphs_equal(self.graph, other.graph) and self.input_nodes == other.input_nodes - and other.output_nodes == other.output_nodes + and self.output_nodes == other.output_nodes ) def neighbors(self, nodes: Collection[int]) -> set[int]: diff --git a/graphix/optimization.py b/graphix/optimization.py index be7fe96a..b4dbf667 100644 --- a/graphix/optimization.py +++ b/graphix/optimization.py @@ -19,7 +19,7 @@ from graphix.clifford import Clifford from graphix.command import CommandKind, Node from graphix.flow._partial_order import compute_topological_generations -from graphix.flow.core import CausalFlow, GFlow +from graphix.flow.core import CausalFlow, GFlow, XZCorrections from graphix.flow.exceptions import ( FlowGenericError, FlowGenericErrorReason, @@ -547,6 +547,44 @@ def extract_gflow(self) -> GFlow[Measurement]: gf.check_well_formed() return gf + def extract_xzcorrections(self) -> XZCorrections[Measurement]: + """Extract the XZ-corrections from the current measurement pattern. + + Returns + ------- + XZCorrections[Measurement] + The XZ-corrections associated with the current pattern. + + Raises + ------ + XZCorrectionsError + If the extracted correction dictionaries are not well formed. + ValueError + If `N` commands in the pattern do not represent a |+⟩ state or if the pattern corrections form closed loops. + """ + x_corr: dict[int, set[int]] = {} + z_corr: dict[int, set[int]] = {} + + pre_measured_nodes = self.results.keys() # Not included in the xz-corrections. + + for m in self.m_list: + _update_corrections(m.node, m.s_domain - pre_measured_nodes, x_corr) + _update_corrections(m.node, m.t_domain - pre_measured_nodes, z_corr) + + for node, domain in self.x_dict.items(): + _update_corrections(node, domain - pre_measured_nodes, x_corr) + + for node, domain in self.z_dict.items(): + _update_corrections(node, domain - pre_measured_nodes, z_corr) + + og = ( + self.extract_opengraph() + ) # Raises a `ValueError` if `N` commands in the pattern do not represent a |+⟩ state. + + return XZCorrections.from_measured_nodes_mapping( + og, x_corr, z_corr + ) # Raises a `XZCorrectionsError` if the input dictionaries are not well formed. + def _add_correction_domain(domain_dict: dict[Node, set[Node]], node: Node, domain: set[Node]) -> None: """Merge a correction domain into ``domain_dict`` for ``node``. diff --git a/graphix/pattern.py b/graphix/pattern.py index accb6121..9d36f972 100644 --- a/graphix/pattern.py +++ b/graphix/pattern.py @@ -39,7 +39,7 @@ from numpy.random import Generator - from graphix.flow.core import CausalFlow, GFlow + from graphix.flow.core import CausalFlow, GFlow, XZCorrections from graphix.parameter import ExpressionOrSupportsFloat, Parameter from graphix.sim import Backend, BackendState, Data from graphix.states import State @@ -967,40 +967,28 @@ def extract_gflow(self) -> GFlow[Measurement]: """ return optimization.StandardizedPattern.from_pattern(self).extract_gflow() - def get_layers(self) -> tuple[int, dict[int, set[int]]]: - """Construct layers(l_k) from dependency information. - - kth layer must be measured before measuring k+1th layer - and nodes in the same layer can be measured simultaneously. + def extract_xzcorrections(self) -> XZCorrections[Measurement]: + """Extract the XZ-corrections from the current measurement pattern. Returns ------- - depth : int - depth of graph - layers : dict of set - nodes grouped by layer index(k) + XZCorrections[Measurement] + The XZ-corrections associated with the current pattern. + + Raises + ------ + XZCorrectionsError + If the extracted correction dictionaries are not well formed. + ValueError + If `N` commands in the pattern do not represent a |+⟩ state or if the pattern corrections form closed loops. + + Notes + ----- + To ensure that applying the chain ``Pattern.extract_xzcorrections().to_pattern()`` to a strongly deterministic pattern returns a new pattern implementing the same unitary transformation, XZ-corrections must be extracted from a standardized pattern. This requirement arises for the same reason that flow extraction also operates correctly on standardized patterns only. + This equivalence holds as long as the original pattern contains no Clifford commands, since those are discarded during open-graph extraction. + See docstring in :func:`optimization.StandardizedPattern.extract_gflow` for additional information. """ - self.check_runnability() # prevent infinite loop: e.g., [N(0), M(0, s_domain={0})] - dependency = self._get_dependency() - measured = self.results.keys() - self.update_dependency(measured, dependency) - not_measured = set(self.__input_nodes) - for cmd in self.__seq: - if cmd.kind == CommandKind.N and cmd.node not in self.output_nodes: - not_measured |= {cmd.node} - depth = 0 - l_k: dict[int, set[int]] = {} - k = 0 - while not_measured: - l_k[k] = set() - for i in not_measured: - if not dependency[i]: - l_k[k] |= {i} - self.update_dependency(l_k[k], dependency) - not_measured -= l_k[k] - k += 1 - depth = k - return depth, l_k + return optimization.StandardizedPattern.from_pattern(self).extract_xzcorrections() def _measurement_order_depth(self) -> list[int]: """Obtain a measurement order which reduces the depth of a pattern. @@ -1218,7 +1206,8 @@ def extract_opengraph(self) -> OpenGraph[Measurement]: graph = nx.Graph(edges) graph.add_nodes_from(nodes) - return OpenGraph(graph, self.input_nodes, self.output_nodes, measurements) + # Inputs and outputs are casted to `tuple` to replicate the behavior of `:func: graphix.opitmization.StandardizedPattern.extract_opengraph`. + return OpenGraph(graph, tuple(self.__input_nodes), tuple(self.__output_nodes), measurements) def get_vops(self, conj: bool = False, include_identity: bool = False) -> dict[int, Clifford]: """Get local-Clifford decorations from measurement or Clifford commands. diff --git a/graphix/visualization.py b/graphix/visualization.py index c69ef828..785b7a1c 100644 --- a/graphix/visualization.py +++ b/graphix/visualization.py @@ -3,14 +3,12 @@ from __future__ import annotations import math -from copy import deepcopy from typing import TYPE_CHECKING import networkx as nx import numpy as np from matplotlib import pyplot as plt -from graphix import gflow from graphix.flow.exceptions import FlowError from graphix.fundamentals import Plane from graphix.measurements import PauliMeasurement @@ -235,8 +233,9 @@ def visualize_from_pattern( print("The pattern is not consistent with flow or gflow structure.") po_layers = pattern.extract_partial_order_layers() unfolded_layers = {node: layer_idx for layer_idx, layer in enumerate(po_layers[::-1]) for node in layer} - xflow, zflow = gflow.get_corrections_from_pattern(pattern) - xzflow: dict[int, set[int]] = deepcopy(xflow) + xzc = pattern.extract_xzcorrections() + xflow, zflow = xzc.x_corrections, xzc.z_corrections + xzflow = dict(xflow) for key, value in zflow.items(): if key in xzflow: xzflow[key] |= value diff --git a/pyproject.toml b/pyproject.toml index 7d79774f..d79ba4f9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -142,10 +142,8 @@ exclude = [ '^examples/qnn\.py$', '^examples/rotation\.py$', '^examples/tn_simulation\.py$', - '^graphix/gflow\.py$', '^graphix/linalg\.py$', '^tests/test_density_matrix\.py$', - '^tests/test_gflow\.py$', '^tests/test_linalg\.py$', '^tests/test_noisy_density_matrix\.py$', '^tests/test_statevec\.py$', @@ -169,10 +167,8 @@ exclude = [ "examples/qnn.py", "examples/rotation.py", "examples/tn_simulation.py", - "graphix/gflow.py", "graphix/linalg.py", "tests/test_density_matrix.py", - "tests/test_gflow.py", "tests/test_linalg.py", "tests/test_noisy_density_matrix.py", "tests/test_statevec.py", diff --git a/tests/test_gflow.py b/tests/test_gflow.py deleted file mode 100644 index 0d938be8..00000000 --- a/tests/test_gflow.py +++ /dev/null @@ -1,597 +0,0 @@ -from __future__ import annotations - -import itertools -from typing import TYPE_CHECKING, NamedTuple - -import networkx as nx -import pytest -from numpy.random import PCG64, Generator - -from graphix import command -from graphix.fundamentals import Plane -from graphix.gflow import ( - find_flow, - find_gflow, - find_pauliflow, - get_corrections_from_pattern, - verify_flow, - verify_gflow, - verify_pauliflow, -) -from graphix.pattern import Pattern -from graphix.random_objects import rand_circuit - -if TYPE_CHECKING: - from collections.abc import Iterable, Iterator - -seed = 30 - - -class GraphForTest(NamedTuple): - graph: nx.Graph - inputs: set[int] - outputs: set[int] - meas_planes: dict[int, str] - meas_angles: dict[int, float] | None - label: str - flow_exist: bool - gflow_exist: bool - pauliflow_exist: bool | None - - -def _graph1() -> GraphForTest: - # no measurement - # 1 - # | - # 2 - nodes = [1, 2] - edges = [(1, 2)] - graph = nx.Graph() - graph.add_nodes_from(nodes) - graph.add_edges_from(edges) - inputs = {1, 2} - outputs = {1, 2} - meas_planes = {} - return GraphForTest( - graph, - inputs, - outputs, - meas_planes, - None, - "no measurement", - flow_exist=True, - gflow_exist=True, - pauliflow_exist=None, - ) - - -def _graph2() -> GraphForTest: - # line graph with flow and gflow - # 1 - 2 - 3 - 4 - 5 - nodes = [1, 2, 3, 4, 5] - edges = [(1, 2), (2, 3), (3, 4), (4, 5)] - graph = nx.Graph() - graph.add_nodes_from(nodes) - graph.add_edges_from(edges) - inputs = {1} - outputs = {5} - meas_planes = { - 1: Plane.XY, - 2: Plane.XY, - 3: Plane.XY, - 4: Plane.XY, - } - return GraphForTest( - graph, - inputs, - outputs, - meas_planes, - None, - "line graph with flow and gflow", - flow_exist=True, - gflow_exist=True, - pauliflow_exist=None, - ) - - -def _graph3() -> GraphForTest: - # graph with flow and gflow - # 1 - 3 - 5 - # | - # 2 - 4 - 6 - nodes = [1, 2, 3, 4, 5, 6] - edges = [(1, 3), (2, 4), (3, 4), (3, 5), (4, 6)] - graph = nx.Graph() - graph.add_nodes_from(nodes) - graph.add_edges_from(edges) - inputs = {1, 2} - outputs = {5, 6} - meas_planes = { - 1: Plane.XY, - 2: Plane.XY, - 3: Plane.XY, - 4: Plane.XY, - } - return GraphForTest( - graph, - inputs, - outputs, - meas_planes, - None, - "graph with flow and gflow", - flow_exist=True, - gflow_exist=True, - pauliflow_exist=None, - ) - - -def _graph4() -> GraphForTest: - # graph with gflow but flow - # ______ - # / | - # 1 - 4 | - # / | - # / | - # / | - # 2 - 5 | - # \ / | - # X / - # / \ / - # 3 - 6 - nodes = [1, 2, 3, 4, 5, 6] - edges = [(1, 4), (1, 6), (2, 4), (2, 5), (2, 6), (3, 5), (3, 6)] - inputs = {1, 2, 3} - outputs = {4, 5, 6} - graph = nx.Graph() - graph.add_nodes_from(nodes) - graph.add_edges_from(edges) - meas_planes = {1: Plane.XY, 2: Plane.XY, 3: Plane.XY} - return GraphForTest( - graph, - inputs, - outputs, - meas_planes, - None, - "graph with gflow but no flow", - flow_exist=False, - gflow_exist=True, - pauliflow_exist=None, - ) - - -def _graph5() -> GraphForTest: - # graph with extended gflow but flow - # 0 - 1 - # /| | - # 4 | | - # \| | - # 2 - 5 - 3 - nodes = [0, 1, 2, 3, 4, 5] - edges = [(0, 1), (0, 2), (0, 4), (1, 5), (2, 4), (2, 5), (3, 5)] - inputs = {0, 1} - outputs = {4, 5} - graph = nx.Graph() - graph.add_nodes_from(nodes) - graph.add_edges_from(edges) - meas_planes = { - 0: Plane.XY, - 1: Plane.XY, - 2: Plane.XZ, - 3: Plane.YZ, - } - return GraphForTest( - graph, - inputs, - outputs, - meas_planes, - None, - "graph with extended gflow but no flow", - flow_exist=False, - gflow_exist=True, - pauliflow_exist=None, - ) - - -def _graph6() -> GraphForTest: - # graph with no flow and no gflow - # 1 - 3 - # \ / - # X - # / \ - # 2 - 4 - nodes = [1, 2, 3, 4] - edges = [(1, 3), (1, 4), (2, 3), (2, 4)] - inputs = {1, 2} - outputs = {3, 4} - graph = nx.Graph() - graph.add_nodes_from(nodes) - graph.add_edges_from(edges) - meas_planes = {1: Plane.XY, 2: Plane.XY} - return GraphForTest( - graph, - inputs, - outputs, - meas_planes, - None, - "graph with no flow and no gflow", - flow_exist=False, - gflow_exist=False, - pauliflow_exist=None, - ) - - -def _graph7() -> GraphForTest: - # graph with no flow or gflow but pauliflow, No.1 - # 3 - # | - # 2 - # | - # 0 - 1 - 4 - nodes = [0, 1, 2, 3, 4] - edges = [(0, 1), (1, 2), (1, 4), (2, 3)] - inputs = {0} - outputs = {4} - graph = nx.Graph() - graph.add_nodes_from(nodes) - graph.add_edges_from(edges) - meas_planes = { - 0: Plane.XY, - 1: Plane.XY, - 2: Plane.XY, - 3: Plane.XY, - } - meas_angles = {0: 0.1, 1: 0, 2: 0.1, 3: 0} - return GraphForTest( - graph, - inputs, - outputs, - meas_planes, - meas_angles, - "graph with no flow and no gflow but pauliflow, No.1", - flow_exist=False, - gflow_exist=False, - pauliflow_exist=True, - ) - - -def _graph8() -> GraphForTest: - # graph with no flow or gflow but pauliflow, No.2 - # 1 2 3 - # | / | - # 0 - - - 4 - nodes = [0, 1, 2, 3, 4] - edges = [(0, 1), (0, 2), (0, 4), (3, 4)] - inputs = {0} - outputs = {4} - graph = nx.Graph() - graph.add_nodes_from(nodes) - graph.add_edges_from(edges) - meas_planes = { - 0: Plane.YZ, - 1: Plane.XZ, - 2: Plane.XY, - 3: Plane.YZ, - } - meas_angles = {0: 0, 1: 0, 2: 0.5, 3: 0.5} - return GraphForTest( - graph, - inputs, - outputs, - meas_planes, - meas_angles, - "graph with no flow and no gflow but pauliflow, No.2", - flow_exist=False, - gflow_exist=False, - pauliflow_exist=False, - ) - - -def _graph9() -> GraphForTest: - # graph with no flow or gflow but pauliflow, No.3 - # 0 - 1 -- 3 - # \| /| - # |\ / | - # | /\ | - # 2 -- 4 - nodes = [0, 1, 2, 3, 4] - edges = [(0, 1), (0, 4), (1, 2), (1, 3), (2, 3), (2, 4), (3, 4)] - inputs = {0} - outputs = {3, 4} - graph = nx.Graph() - graph.add_nodes_from(nodes) - graph.add_edges_from(edges) - meas_planes = {0: Plane.YZ, 1: Plane.XZ, 2: Plane.XY} - meas_angles = {0: 0, 1: 0.1, 2: 0.5} - return GraphForTest( - graph, - inputs, - outputs, - meas_planes, - meas_angles, - "graph with no flow and no gflow but pauliflow, No.3", - flow_exist=False, - gflow_exist=False, - pauliflow_exist=False, - ) - - -def generate_test_graphs() -> list[GraphForTest]: - return [ - _graph1(), - _graph2(), - _graph3(), - _graph4(), - _graph5(), - _graph6(), - _graph7(), - _graph8(), - _graph9(), - ] - - -FlowTestCaseType = dict[str, dict[str, tuple[bool, dict[int, set[int]]]]] -FlowTestDataType = tuple[GraphForTest, tuple[bool, dict[int, set[int]]]] - -FLOW_TEST_CASES: FlowTestCaseType = { - "no measurement": { - "empty flow": (True, {}), - "measure output": (False, {1: {2}}), - }, - "line graph with flow and gflow": { - "correct flow": (True, {1: {2}, 2: {3}, 3: {4}, 4: {5}}), - "acausal flow": (False, {1: {3}, 3: {2, 4}, 2: {1}, 4: {5}}), - "gflow": (False, {1: {2, 5}, 2: {3, 5}, 3: {4, 5}, 4: {5}}), - }, - "graph with flow and gflow": { - "correct flow": (True, {1: {3}, 2: {4}, 3: {5}, 4: {6}}), - "acausal flow": (False, {1: {4}, 2: {3}, 3: {4}, 4: {1}}), - "gflow": (False, {1: {3, 5}, 2: {4, 5}, 3: {5, 6}, 4: {6}}), - }, -} - - -GFLOW_TEST_CASES: FlowTestCaseType = { - "no measurement": { - "empty flow": (True, {}), - "measure output": (False, {1: {2}}), - }, - "line graph with flow and gflow": { - "correct flow": (True, {1: {2}, 2: {3}, 3: {4}, 4: {5}}), - "acausal flow": (False, {1: {3}, 3: {2, 4}, 2: {1}, 4: {5}}), - "gflow": (True, {1: {2, 5}, 2: {3, 5}, 3: {4, 5}, 4: {5}}), - }, - "graph with flow and gflow": { - "correct flow": (True, {1: {3}, 2: {4}, 3: {5}, 4: {6}}), - "acausal flow": (False, {1: {4}, 2: {3}, 3: {4}, 4: {1}}), - "gflow": (True, {1: {3, 5}, 2: {4, 5}, 3: {5, 6}, 4: {6}}), - }, - "graph with extended gflow but no flow": { - "correct gflow": ( - True, - {0: {1, 2, 3, 4}, 1: {2, 3, 4, 5}, 2: {2, 4}, 3: {3}}, - ), - "correct gflow 2": (True, {0: {1, 2, 4}, 1: {3, 5}, 2: {2, 4}, 3: {3}}), - "incorrect gflow": ( - False, - {0: {1, 2, 3, 4}, 1: {2, 3, 4, 5}, 2: {2, 4}, 3: {3, 4}}, - ), - "incorrect gflow 2": ( - False, - {0: {1, 3, 4}, 1: {2, 3, 4, 5}, 2: {2, 4}, 3: {3}}, - ), - }, -} - -PAULIFLOW_TEST_CASES: FlowTestCaseType = { - "graph with no flow and no gflow but pauliflow, No.1": { - "correct pauliflow": (True, {0: {1}, 1: {4}, 2: {3}, 3: {2, 4}}), - "correct pauliflow 2": (True, {0: {1, 3}, 1: {3, 4}, 2: {3}, 3: {2, 3, 4}}), - "incorrect pauliflow": (False, {0: {1}, 1: {2}, 2: {3}, 3: {4}}), - "incorrect pauliflow 2": (False, {0: {1, 3}, 1: {3, 4}, 2: {3, 4}, 3: {2, 3, 4}}), - }, - "graph with no flow and no gflow but pauliflow, No.2": { - "correct pauliflow": (True, {0: {0, 1}, 1: {1}, 2: {2}, 3: {4}}), - "correct pauliflow 2": (True, {0: {0, 1, 2}, 1: {1}, 2: {2}, 3: {1, 2, 4}}), - "incorrect pauliflow": (False, {0: {1}, 1: {1, 2}, 2: {2, 3}, 3: {4}}), - "incorrect pauliflow 2": (False, {0: {0}, 1: {1}, 2: {3}, 3: {3}}), - }, - "graph with no flow and no gflow but pauliflow, No.3": { - "correct pauliflow": (True, {0: {0, 3, 4}, 1: {1, 2}, 2: {4}}), - "correct pauliflow 2": (True, {0: {0, 2, 4}, 1: {1, 3}, 2: {2, 3, 4}}), - "incorrect pauliflow": (False, {0: {0, 3, 4}, 1: {1}, 2: {3, 4}}), - "incorrect pauliflow 2": (False, {0: {0, 3}, 1: {1, 2, 3}, 2: {2, 3, 4}}), - }, -} - - -def iterate_compatible( - graphs: Iterable[GraphForTest], - cases: FlowTestCaseType, -) -> Iterator[FlowTestDataType]: - for g in graphs: - for k, v in cases.items(): - if g.label != k: - continue - for vv in v.values(): - yield (g, vv) - - -class RandomMeasGraph(NamedTuple): - graph: nx.Graph - vin: set[int] - vout: set[int] - meas_planes: dict[int, str] - meas_angles: dict[int, float] - - -def get_rand_graph(rng: Generator, n_nodes: int, edge_prob: float = 0.3) -> RandomMeasGraph: - graph = nx.Graph() - nodes = range(n_nodes) - graph.add_nodes_from(nodes) - edge_candidates = set(itertools.product(nodes, nodes)) - {(i, i) for i in nodes} - for edge in edge_candidates: - if rng.uniform() < edge_prob: - graph.add_edge(*edge) - - input_nodes_number = rng.integers(1, len(nodes) - 1) - vin = set(rng.choice(nodes, input_nodes_number, replace=False)) - output_nodes_number = rng.integers(1, len(nodes) - input_nodes_number) - vout = set(rng.choice(list(set(nodes) - vin), output_nodes_number, replace=False)) - - meas_planes = {} - meas_plane_candidates = [Plane.XY, Plane.XZ, Plane.YZ] - meas_angles = {} - meas_angle_candidates = [0, 0.25, 0.5, 0.75] - - for node in set(graph.nodes()) - vout: - meas_planes[node] = rng.choice(meas_plane_candidates) - meas_angles[node] = rng.choice(meas_angle_candidates) - - return RandomMeasGraph(graph, vin, vout, meas_planes, meas_angles) - - -class TestGflow: - @pytest.mark.parametrize("test_graph", generate_test_graphs()) - def test_flow(self, test_graph: GraphForTest) -> None: - f, _ = find_flow( - test_graph.graph, - test_graph.inputs, - test_graph.outputs, - test_graph.meas_planes, - ) - assert test_graph.flow_exist == (f is not None) - - @pytest.mark.parametrize("test_graph", generate_test_graphs()) - def test_gflow(self, test_graph: GraphForTest) -> None: - g, _ = find_gflow( - test_graph.graph, - test_graph.inputs, - test_graph.outputs, - test_graph.meas_planes, - ) - assert test_graph.gflow_exist == (g is not None) - - @pytest.mark.parametrize("data", iterate_compatible(generate_test_graphs(), FLOW_TEST_CASES)) - def test_verify_flow(self, data: FlowTestDataType) -> None: - test_graph, test_case = data - expected, flow = test_case - valid = verify_flow( - test_graph.graph, - test_graph.inputs, - test_graph.outputs, - flow, - test_graph.meas_planes, - ) - assert expected == valid - - @pytest.mark.parametrize("data", iterate_compatible(generate_test_graphs(), GFLOW_TEST_CASES)) - def test_verify_gflow(self, data: FlowTestDataType) -> None: - test_graph, test_case = data - expected, gflow = test_case - - valid = verify_gflow( - test_graph.graph, - test_graph.inputs, - test_graph.outputs, - gflow, - test_graph.meas_planes, - ) - assert expected == valid - - @pytest.mark.parametrize("data", iterate_compatible(generate_test_graphs(), PAULIFLOW_TEST_CASES)) - def test_verify_pauliflow(self, data: FlowTestDataType) -> None: - test_graph, test_case = data - expected, pauliflow = test_case - angles = test_graph.meas_angles - assert angles is not None - - valid = verify_pauliflow( - test_graph.graph, - test_graph.inputs, - test_graph.outputs, - pauliflow, - test_graph.meas_planes, - angles, - ) - assert expected == valid - - def test_with_rand_circ(self, fx_rng: Generator) -> None: - # test for large graph - # graph transpiled from circuit always has a flow - circ = rand_circuit(10, 10, fx_rng) - pattern = circ.transpile().pattern - graph = pattern.extract_graph() - input_ = set(pattern.input_nodes) - output = set(pattern.output_nodes) - meas_planes = pattern.get_meas_plane() - f, _ = find_flow(graph, input_, output, meas_planes) - valid = verify_flow(graph, input_, output, f, meas_planes) - - assert valid - - # TODO: Remove after fixed - @pytest.mark.skip - def test_rand_circ_gflow(self, fx_rng: Generator) -> None: - # test for large graph - # pauli-node measured graph always has gflow - circ = rand_circuit(5, 5, fx_rng) - pattern = circ.transpile().pattern - pattern.standardize() - pattern.shift_signals() - pattern.remove_input_nodes() - pattern.perform_pauli_measurements() - graph = pattern.extract_graph() - input_ = set() - output = set(pattern.output_nodes) - meas_planes = pattern.get_meas_plane() - g, _ = find_gflow(graph, input_, output, meas_planes) - - valid = verify_gflow(graph, input_, output, g, meas_planes) - - assert valid - - @pytest.mark.parametrize("jumps", range(1, 51)) - def test_rand_graph_flow(self, fx_bg: PCG64, jumps: int) -> None: - # test finding algorithm and verification for random graphs - rng = Generator(fx_bg.jumped(jumps)) - n_nodes = 5 - graph, vin, vout, meas_planes, _ = get_rand_graph(rng, n_nodes) - f, _ = find_flow(graph, vin, vout, meas_planes) - if f: - valid = verify_flow(graph, vin, vout, f, meas_planes) - assert valid - - @pytest.mark.parametrize("jumps", range(1, 51)) - def test_rand_graph_gflow(self, fx_bg: PCG64, jumps: int) -> None: - # test finding algorithm and verification for random graphs - rng = Generator(fx_bg.jumped(jumps)) - n_nodes = 5 - graph, vin, vout, meas_planes, _ = get_rand_graph(rng, n_nodes) - - g, _ = find_gflow(graph, vin, vout, meas_planes) - if g: - valid = verify_gflow(graph, vin, vout, g, meas_planes) - assert valid - - @pytest.mark.parametrize("jumps", range(1, 51)) - def test_rand_graph_pauliflow(self, fx_bg: PCG64, jumps: int) -> None: - # test finding algorithm and verification for random graphs - rng = Generator(fx_bg.jumped(jumps)) - n_nodes = 5 - graph, vin, vout, meas_planes, meas_angles = get_rand_graph(rng, n_nodes) - - p, _ = find_pauliflow(graph, vin, vout, meas_planes, meas_angles) - if p: - valid = verify_pauliflow(graph, vin, vout, p, meas_planes, meas_angles) - assert valid - - def test_corrections_from_pattern(self) -> None: - pattern = Pattern(input_nodes=list(range(5))) - pattern.add(command.M(node=0)) - pattern.add(command.M(node=1)) - pattern.add(command.M(node=2, s_domain={0}, t_domain={1})) - pattern.add(command.X(node=3, domain={2})) - pattern.add(command.Z(node=4, domain={3})) - xflow, zflow = get_corrections_from_pattern(pattern) - assert xflow == {0: {2}, 2: {3}} - assert zflow == {1: {2}, 3: {4}} diff --git a/tests/test_pattern.py b/tests/test_pattern.py index 0b282cac..163bfac3 100644 --- a/tests/test_pattern.py +++ b/tests/test_pattern.py @@ -13,6 +13,7 @@ from graphix.branch_selector import ConstBranchSelector, FixedBranchSelector from graphix.clifford import Clifford from graphix.command import C, Command, CommandKind, E, M, N, X, Z +from graphix.flow.core import XZCorrections from graphix.flow.exceptions import ( FlowError, ) @@ -1041,6 +1042,48 @@ def test_extract_gflow_og(self, fx_rng: Generator) -> None: assert np.abs(np.dot(s_ref.flatten().conjugate(), s_test.flatten())) == pytest.approx(1) + # Extract xz-corrections from random circuits + @pytest.mark.parametrize("jumps", range(1, 11)) + def test_extract_xzc_rnd_circuit(self, fx_bg: PCG64, jumps: int) -> None: + rng = Generator(fx_bg.jumped(jumps)) + nqubits = 2 + depth = 2 + circuit_1 = rand_circuit(nqubits, depth, rng, use_ccx=False) + p_ref = circuit_1.transpile().pattern + xzc = p_ref.extract_xzcorrections() + xzc.check_well_formed() + p_test = xzc.to_pattern() + + for p in [p_ref, p_test]: + p.remove_input_nodes() + p.perform_pauli_measurements() + + s_ref = p_ref.simulate_pattern(rng=rng) + s_test = p_test.simulate_pattern(rng=rng) + assert np.abs(np.dot(s_ref.flatten().conjugate(), s_test.flatten())) == pytest.approx(1) + + def test_extract_xzc_empty_domains(self) -> None: + p = Pattern(input_nodes=[0], cmds=[N(1), E((0, 1))]) + xzc = p.extract_xzcorrections() + assert xzc.x_corrections == {} + assert xzc.z_corrections == {} + assert xzc.partial_order_layers == (frozenset({0, 1}),) + + def test_extract_xzc_easy_example(self) -> None: + pattern = Pattern( + input_nodes=list(range(5)), + cmds=[M(0), M(1), M(2, s_domain={0}, t_domain={1}), X(3, domain={2}), M(3), Z(4, domain={3})], + ) + + xzc = pattern.extract_xzcorrections() + xzc_ref = XZCorrections.from_measured_nodes_mapping( + pattern.extract_opengraph(), x_corrections={0: {2}, 2: {3}}, z_corrections={1: {2}, 3: {4}} + ) + assert xzc.og.isclose(xzc_ref.og) + assert xzc.x_corrections == xzc_ref.x_corrections + assert xzc.z_corrections == xzc_ref.z_corrections + assert xzc.partial_order_layers == xzc_ref.partial_order_layers + def cp(circuit: Circuit, theta: Angle, control: int, target: int) -> None: """Controlled rotation gate, decomposed.""" # noqa: D401