diff --git a/quantumreservoirpy/reservoirs.py b/quantumreservoirpy/reservoirs.py index acce901..543c15a 100644 --- a/quantumreservoirpy/reservoirs.py +++ b/quantumreservoirpy/reservoirs.py @@ -1,220 +1,217 @@ -import numpy as np - -# from multiprocessing import Pool # could be used for parallel execution of reservoirs in multiplexing approach - -from itertools import combinations -from tqdm import tqdm - -from quantumreservoirpy.reservoirbase import QReservoir -from quantumreservoirpy.util import memory_to_mean, create_shifted_array -from quantumreservoirpy.statistic import Statistic - - -class Static(QReservoir): - def run(self, timeseries, **kwargs): - transpile = kwargs.pop("transpile", True) - tqdm_disable = kwargs.pop("tqdm", False) - self.precision = kwargs.pop("precision", None) - shots = kwargs.pop("shots", 10**3) - - num_timesteps = len(timeseries) - self.shots_taken = {} - - for nr in tqdm( - range(1, self.num_reservoirs + 1), - desc="Running reservoirs", - disable=tqdm_disable, - ): - circ = self.circuit( - timeseries, - merge_registers=False, - transpile=transpile, - reservoir_number=nr, - ).reverse_bits() - - self.stat = {} # create new statistics - shots_total = 0 - new_shots = shots - - while True: - kwargs["shots"] = new_shots - self._job = self.backend.run(circ, **kwargs) - counts = self._job.result().get_counts() - - shots_total += new_shots - # print(shots, shots_total) - states_list, var_list = self.measurementStatistics( - counts, num_timesteps - ) - # print(shots_total, states_list[0]) - - if not self.precision: - break - else: - v = np.amax(np.concatenate(var_list)) - # print("max var=", v) - new_shots = int(v / (self.precision) ** 2) - shots_total - if new_shots <= 0: - break - - states = ( - np.stack(states_list, axis=0) - if nr == 1 - else np.hstack((states, np.stack(states_list, axis=0))) - ) - self.variances = ( - np.stack(var_list, axis=0) - if nr == 1 - else np.hstack((self.variances, np.stack(var_list, axis=0))) - ) - self.shots_taken[nr] = shots_total - - self.states = states - - return states - - def state(self): - return self.states - - def predict(self, num_pred, model, from_series, **kwargs): - kwargs["tqdm"] = True - timeplex = kwargs.pop("timeplex", 1) - M = min(num_pred + len(from_series), self.memory) - - predictions = np.zeros( - num_pred + len(from_series), dtype=np.array(from_series).dtype - ) - - if np.ndim(from_series) == 2: - prediction_dimension = np.shape(from_series)[1] - predictions = np.zeros( - (num_pred + len(from_series), prediction_dimension), - dtype=np.array(from_series).dtype, - ) - - predictions[: len(from_series)] = from_series - - for i in tqdm(range(num_pred), desc="Predicting..."): - curidx = len(from_series) + i - states = self.run(predictions[:curidx][-M:], **kwargs) - - if timeplex > 1: - states = create_shifted_array(states, timeplex) - predictions[curidx] = model.predict(states[-1].reshape((1, -1))) - - return predictions # [-num_pred:] - - def measurementStatistics(self, counts, num_timesteps): - states_list = [] - var_list = [] - - # number of measurements per time step - num_meas_pt = int(len(list(counts.keys())[0]) / num_timesteps) - - for t in range(num_timesteps): - sl = [] - vl = [] - indices = range(num_meas_pt) - for k in range(1, self.degree + 1): - for O in combinations(indices, k): - statkey = str(t) + " " + str(O) - self.stat.setdefault(statkey, Statistic()) - Static.__add_counts(self.stat[statkey], O, counts, t) - sl.append(self.stat[statkey].get_E()) - vl.append(self.stat[statkey].get_Variance()) - states_list.append(np.array(sl)) - var_list.append(np.array(vl)) - - return states_list, var_list - - @staticmethod - def __add_counts(stat, Obs, counts, t): - # tmp_counts=0 - # tmp_vals=[] - for key, count in counts.items(): - key_t = key.split()[t] - val = ( - 0.5 - - np.prod(1 - 2.0 * np.array([int(key_t[ind_O]) for ind_O in Obs])) / 2 - ) - stat.add_sample(val, count) - # tmp_counts+=count - # tmp_vals.append(val*count) - # print(" -> ", np.var(np.array(tmp_vals))) - - -class Incremental(QReservoir): - def __init__(self, n_qubits, memory=np.inf, backend=None, num_features=-1) -> None: - super().__init__(n_qubits, memory, backend) - - if num_features > 0: - self.num_features = num_features - - def run(self, timeseries, **kwargs): - transpile = kwargs.pop("transpile", True) - M = min(len(timeseries), self.memory) - timeseries_splitted = [timeseries[: i + 1][-M:] for i in range(len(timeseries))] - - total = len(timeseries_splitted) - - with tqdm(total=total) as pbar: - pbar.set_description("Creating circuits...") - circuits = [ - self.circuit( - series, merge_registers=False, transpile=transpile - ).reverse_bits() - for series in timeseries_splitted - ] - - pbar.set_description("Running job...") - self._job = self.backend.run(circuits, memory=True, **kwargs) - - result = self._job.result() - - if not hasattr(self, "num_features"): - self.num_features = memory_to_mean(result.get_memory(0)).size - - states = np.zeros((len(timeseries_splitted), self.num_features)) - - pbar.set_description("Analyzing... ") - for idx, _ in enumerate(timeseries_splitted): - memory = self._job.result().get_memory(idx) - - avg = memory_to_mean(memory)[-self.num_features :] - states[idx, self.num_features - len(avg) :] = avg - pbar.update(1) - - return states - - def __run(self, timeseries, **kwargs): - transpile = kwargs.pop("transpile", True) - circ = self.circuit( - timeseries, merge_registers=False, transpile=transpile - ).reverse_bits() - self._job = self.backend.run(circ, memory=True, **kwargs) - results = self._job.result() - - avg = memory_to_mean(results.get_memory()) - if hasattr(self, "num_features"): - avg = avg[-self.num_features :] - temp = np.zeros(self.num_features) - temp[self.num_features - len(avg) :] = avg - return temp - - return avg - - def predict(self, num_pred, model, from_series, **kwargs): - M = min(num_pred + len(from_series), self.memory) - - shape = np.array(np.shape(from_series)) - shape[0] += num_pred - predictions = np.zeros(shape=shape) - predictions[: len(from_series)] = from_series - - for i in tqdm(range(num_pred), desc="Predicting..."): - curidx = len(from_series) + i - states = self.__run(predictions[:curidx][-M:], kwargs=kwargs) - pred_state = states.reshape((1, -1)) - predictions[curidx] = model.predict(pred_state) - - return predictions[-num_pred:] +import numpy as np + +# from multiprocessing import Pool # could be used for parallel execution of reservoirs in multiplexing approach + +from itertools import combinations +from tqdm import tqdm + +from quantumreservoirpy.reservoirbase import QReservoir +from quantumreservoirpy.util import memory_to_mean, create_shifted_array +from quantumreservoirpy.statistic import Statistic + + +class Static(QReservoir): + def run(self, timeseries, **kwargs): + transpile = kwargs.pop("transpile", True) + tqdm_disable = kwargs.pop("tqdm", False) + self.precision = kwargs.pop("precision", None) + shots = kwargs.pop("shots", 10**3) + + num_timesteps = len(timeseries) + self.shots_taken = {} + + for nr in tqdm( + range(1, self.num_reservoirs + 1), + desc="Running reservoirs", + disable=tqdm_disable, + ): + circ = self.circuit( + timeseries, + merge_registers=False, + transpile=transpile, + reservoir_number=nr, + ).reverse_bits() + + self.stat = {} # create new statistics + shots_total = 0 + new_shots = shots + + while True: + kwargs["shots"] = new_shots + self._job = self.backend.run(circ, **kwargs) + counts = self._job.result().get_counts() + + shots_total += new_shots + # print(shots, shots_total) + states_list, var_list = self.measurementStatistics( + counts, num_timesteps + ) + # print(shots_total, states_list[0]) + + if not self.precision: + break + else: + v = np.amax(np.concatenate(var_list)) + # print("max var=", v) + new_shots = int(v / (self.precision) ** 2) - shots_total + if new_shots <= 0: + break + + states = ( + np.stack(states_list, axis=0) + if nr == 1 + else np.hstack((states, np.stack(states_list, axis=0))) + ) + self.variances = ( + np.stack(var_list, axis=0) + if nr == 1 + else np.hstack((self.variances, np.stack(var_list, axis=0))) + ) + self.shots_taken[nr] = shots_total + + self.states = states + + return states + + def state(self): + return self.states + + def predict(self, num_pred, model, from_series, **kwargs): + kwargs["tqdm"] = True + timeplex = kwargs.pop("timeplex", 1) + M = min(num_pred + len(from_series), self.memory) + + predictions = np.zeros( + num_pred + len(from_series), dtype=np.array(from_series).dtype + ) + + if np.ndim(from_series) == 2: + prediction_dimension = np.shape(from_series)[1] + predictions = np.zeros( + (num_pred + len(from_series), prediction_dimension), + dtype=np.array(from_series).dtype, + ) + + predictions[: len(from_series)] = from_series + + for i in tqdm(range(num_pred), desc="Predicting..."): + curidx = len(from_series) + i + states = self.run(predictions[:curidx][-M:], **kwargs) + + if timeplex > 1: + states = create_shifted_array(states, timeplex) + predictions[curidx] = model.predict(states[-1].reshape((1, -1))) + + return predictions # [-num_pred:] + + def measurementStatistics(self, counts, num_timesteps): + states_list = [] + var_list = [] + + # number of measurements per time step + num_meas_pt = int(len(list(counts.keys())[0]) / num_timesteps) + + for t in range(num_timesteps): + sl = [] + vl = [] + indices = range(num_meas_pt) + for k in range(1, self.degree + 1): + for O in combinations(indices, k): + statkey = str(t) + " " + str(O) + self.stat.setdefault(statkey, Statistic()) + Static.__add_counts(self.stat[statkey], O, counts, t) + sl.append(self.stat[statkey].get_E()) + vl.append(self.stat[statkey].get_Variance()) + states_list.append(np.array(sl)) + var_list.append(np.array(vl)) + + return states_list, var_list + + @staticmethod + def __add_counts(stat, Obs, counts, t): + # tmp_counts=0 + # tmp_vals=[] + for key, count in counts.items(): + key_t = key.split()[t] + val=-np.prod(1 - 2.0 * np.array([int(key_t[ind_O]) for ind_O in Obs])) + stat.add_sample(val, count) + # tmp_counts+=count + # tmp_vals.append(val*count) + # print(" -> ", np.var(np.array(tmp_vals))) + + +class Incremental(QReservoir): + def __init__(self, n_qubits, memory=np.inf, backend=None, num_features=-1) -> None: + super().__init__(n_qubits, memory, backend) + + if num_features > 0: + self.num_features = num_features + + def run(self, timeseries, **kwargs): + transpile = kwargs.pop("transpile", True) + M = min(len(timeseries), self.memory) + timeseries_splitted = [timeseries[: i + 1][-M:] for i in range(len(timeseries))] + + total = len(timeseries_splitted) + + with tqdm(total=total) as pbar: + pbar.set_description("Creating circuits...") + circuits = [ + self.circuit( + series, merge_registers=False, transpile=transpile + ).reverse_bits() + for series in timeseries_splitted + ] + + pbar.set_description("Running job...") + self._job = self.backend.run(circuits, memory=True, **kwargs) + + result = self._job.result() + + if not hasattr(self, "num_features"): + self.num_features = memory_to_mean(result.get_memory(0)).size + + states = np.zeros((len(timeseries_splitted), self.num_features)) + + pbar.set_description("Analyzing... ") + for idx, _ in enumerate(timeseries_splitted): + memory = self._job.result().get_memory(idx) + + avg = memory_to_mean(memory)[-self.num_features :] + states[idx, self.num_features - len(avg) :] = avg + pbar.update(1) + + return states + + def __run(self, timeseries, **kwargs): + transpile = kwargs.pop("transpile", True) + circ = self.circuit( + timeseries, merge_registers=False, transpile=transpile + ).reverse_bits() + self._job = self.backend.run(circ, memory=True, **kwargs) + results = self._job.result() + + avg = memory_to_mean(results.get_memory()) + if hasattr(self, "num_features"): + avg = avg[-self.num_features :] + temp = np.zeros(self.num_features) + temp[self.num_features - len(avg) :] = avg + return temp + + return avg + + def predict(self, num_pred, model, from_series, **kwargs): + M = min(num_pred + len(from_series), self.memory) + + shape = np.array(np.shape(from_series)) + shape[0] += num_pred + predictions = np.zeros(shape=shape) + predictions[: len(from_series)] = from_series + + for i in tqdm(range(num_pred), desc="Predicting..."): + curidx = len(from_series) + i + states = self.__run(predictions[:curidx][-M:], kwargs=kwargs) + pred_state = states.reshape((1, -1)) + predictions[curidx] = model.predict(pred_state) + + return predictions[-num_pred:] diff --git a/quantumreservoirpy/stabilizer.py b/quantumreservoirpy/stabilizer.py index 15c28f9..c6edc98 100644 --- a/quantumreservoirpy/stabilizer.py +++ b/quantumreservoirpy/stabilizer.py @@ -1,245 +1,262 @@ -from itertools import combinations, product -import numpy as np -from qiskit import QuantumCircuit, ClassicalRegister, QuantumRegister, AncillaRegister -from qiskit.quantum_info import random_clifford, Clifford, Pauli -from qiskit.circuit.library import PauliEvolutionGate - -from quantumreservoirpy.util import randomIsing, get_Ising_circuit -from quantumreservoirpy.reservoirs import Static - -from typing import Iterable - - -class Stabilizer(Static): - def __init__( - self, - n_qubits, - n_meas, # number of stabilizer generators - tableau: dict | None = None, # if specified, overrides the tableau generation - codestate_preparation_circ: ( - Iterable[QuantumCircuit] | None - ) = None, # if None, will generate a random stabilizer code - memory=np.inf, - backend=None, - degree=1, - num_reservoirs=1, - standard=False, - isingparams=None, - decode=True, # danger zone: this is only for testing - ) -> None: - super().__init__( - n_qubits + 1, memory, backend, degree=degree, num_reservoirs=num_reservoirs - ) - self.n_meas = n_meas - self.decode = decode - - if not isingparams: - steps = 1 - dt = 1.645 - # top = limitrange(list(combinations(range(n_qubits), 2))) - top = list(combinations(range(n_qubits), 2)) - self.U = {} - self.isingparams = {} - for nr in range(1, num_reservoirs + 1): - ( - self.U[nr], - self.isingparams[nr], - ) = randomIsing(n_qubits, top, steps, dt) - else: - self.U = {} - for nr in range(1, num_reservoirs + 1): - self.U[nr] = get_Ising_circuit(n_qubits, isingparams[nr]) - - if tableau != None: - if ( - len(tableau["stabilizer"]) != n_meas - or len(tableau["destabilizer"]) != n_meas - ): - raise Exception( - "Error: the number of entries of the tableau does not match the dimension of the stabilizer" - ) - self.tableau = tableau - else: - self.tableau = Stabilizer.generate_tableau( - n_qubits, n_meas, codestate_preparation_circ - ) - - def before(self, circuit): - if self.decode: - circuit.add_register(AncillaRegister(self.n_meas)) - - def during(self, circuit, timestep, reservoirnumber): - circuit.barrier() - - # encode - for k in range(self.n_meas): - beta = 3**k - pauliop = Pauli(self.tableau["destabilizer"][k]) - evo = PauliEvolutionGate(pauliop, -beta / 2 * np.pi * timestep) - circuit.append(evo, range(self.n_qubits - 1)) - - # reservoir - circuit.append(self.U[reservoirnumber], range(self.n_qubits - 1)) - - # decode - cr = ClassicalRegister(self.n_meas) - circuit.add_register(cr) - - if self.decode: - Stabilizer.decoder(circuit, self.tableau) - - @staticmethod - def generate_tableau( - n_qubits: int, - n_meas: int, - codestate_preparation_circ: Iterable[QuantumCircuit] | None = None, - ): - """generates a tableau for a stabilizer code based on 2**k codestate preparation circuits""" - - logical_qubits = n_qubits - n_meas # also called k - - if codestate_preparation_circ == None: # generate random stabilizer code - tableau = random_clifford(n_qubits).to_dict() - - # turn the last k stabilizers into logical Zs - # tableau["logical_z"] = tableau["stabilizer"][n_meas:] #these are just for QEC fun, not useful here - tableau["stabilizer"] = tableau["stabilizer"][:n_meas] - - # turn the last k destabilizers into logical Xs - # tableau["logical_x"] = tableau["destabilizer"][n_meas:] - tableau["destabilizer"] = tableau["destabilizer"][:n_meas] - - elif len(codestate_preparation_circ) != 2**logical_qubits: - print( - "Error : number of code state preparation circuits does not match the dimension of the codespace" - ) - return - - else: - tableau = Clifford(codestate_preparation_circ[0]).to_dict() - for circ in codestate_preparation_circ[1:]: - circ_tableau = Clifford(circ).to_dict() - to_pop = [] - - for i in range(len(tableau["stabilizer"])): - if tableau["stabilizer"][i] not in circ_tableau["stabilizer"]: - to_pop.append(i) - - for i in to_pop: - tableau["stabilizer"].pop(i) - tableau["destabilizer"].pop(i) - print(tableau) - - # check the stabilizer has the right dimension - if ( - len(tableau["stabilizer"]) != n_meas - or len(tableau["destabilizer"]) != n_meas - ): - print("Error : something went wrong with tableau generation") - print(tableau) - - return tableau - - @staticmethod - def binary_array_to_integer(binary_array): - binary_string = "".join(map(str, binary_array.astype(int))) - decimal_integer = int(binary_string, 2) - return decimal_integer - - @staticmethod - def indices_of_ones(input_list, n): - indices = [ - n - 1 - index for index, value in enumerate(input_list) if value == 1.0 - ] - return indices # n-1-index because of Endian encoding of qiskit - - @staticmethod - def get_parity_measurements(x): - G_list = [] - for i in range(x.shape[0] - 1): - tmp = np.zeros(x.shape[0]) - tmp[i] = 1 - tmp[i + 1] = 1 - G_list.append(tmp) - G = np.array(G_list) - return np.remainder(G @ x, 2) - - @staticmethod - def build_decoder_map(n, standard): - decoder = {} - if standard: - for origin in list(product((0, 1), repeat=n - 1)): - p = np.array(origin) - p = Stabilizer.binary_array_to_integer(p) - flips = Stabilizer.indices_of_ones(origin, n - 1) - decoder[p] = flips - else: - for origin in list(product((0, 1), repeat=n)): - p = Stabilizer.get_parity_measurements(np.array(origin)) - p = Stabilizer.binary_array_to_integer(p) - flips = Stabilizer.indices_of_ones(origin, n) - if p in decoder: - if len(decoder[p]) > len(flips): - decoder[p] = flips - else: - decoder[p] = flips - return decoder - - @staticmethod - def apply_operations_for_integers(circ, c, integer_dict): - with circ.switch(c) as case: - for key, value in integer_dict.items(): - if value: - with case(key): - circ.x(value) - - @staticmethod - def decoder(circuit: QuantumCircuit, code_tableau: dict): - """ - Given a n-qubit state and a stabilizer code, detects errors and applies the operations - to project the state back to the codespace. - - Args: - circuit : state preparation circuit - code_tableau : dictionary {"stabilizer": [], "destabilizer": []} each list has n-k elements - - Returns: - circuit : state preparation circuit with syndrome measurement and error correction appended - """ - - n_qubits = circuit.num_qubits - n_meas = len(code_tableau["stabilizer"]) - - qr = circuit.qregs[0] - cr = circuit.cregs[-1] - ar = circuit.ancillas - - circuit.barrier() - - # syndrome measurement operations - circuit.reset(ar) - circuit.h(ar) - - for j in range(n_meas): - P = code_tableau["stabilizer"][j] - if P[0] == -1: - circuit.z(ar[j]) - for i in range(1, len(P)): - if P[i] == "X": - circuit.cx(ar[j], qr[i - 1]) - elif P[i] == "Y": - circuit.cy(ar[j], qr[i - 1]) - elif P[i] == "Z": - circuit.cz(ar[j], qr[i - 1]) - - circuit.h(ar) - - for j in range(n_meas): - circuit.measure(ar[j], cr[j]) - circuit.barrier() - - for j in range(n_meas): - with circuit.if_test((cr[j], 1)): - circuit.pauli(code_tableau["destabilizer"][j][1:], qr[:-1]) - - return circuit +from itertools import combinations, product +import numpy as np +from qiskit import QuantumCircuit, ClassicalRegister, QuantumRegister, AncillaRegister +from qiskit.quantum_info import random_clifford, Clifford, Pauli +from qiskit.circuit.library import PauliEvolutionGate +from quantumreservoirpy.util import randomIsing, get_Ising_circuit +from quantumreservoirpy.reservoirs import Static +from typing import Iterable + +class Stabilizer(Static): + def __init__( + self, + n_qubits, + n_meas, # number of stabilizer generators + tableau: dict | None = None, # if specified, overrides the tableau generation + codestate_preparation_circ: ( + Iterable[QuantumCircuit] | None + ) = None, # if None, will generate a random stabilizer code + memory=np.inf, + backend=None, + degree=1, + num_reservoirs=1, + standard=False, + isingparams=None, + decode=True, # danger zone: this is only for testing + ) -> None: + super().__init__( + n_qubits , memory, backend, degree=degree, num_reservoirs=num_reservoirs + ) + self.n_meas = n_meas + self.decode = decode + + if not isingparams: + steps = 1 + dt = 1.645 + # top = limitrange(list(combinations(range(n_qubits), 2))) + top = list(combinations(range(n_qubits), 2)) + self.U = {} + self.isingparams = {} + for nr in range(1, num_reservoirs + 1): + ( + self.U[nr], + self.isingparams[nr], + ) = randomIsing(n_qubits, top, steps, dt) + else: + self.U = {} + for nr in range(1, num_reservoirs + 1): + self.U[nr] = get_Ising_circuit(n_qubits, isingparams[nr]) + + if tableau != None: + if ( + len(tableau["stabilizer"]) != n_meas + or len(tableau["destabilizer"]) != n_meas + ): + raise Exception( + "Error: the number of entries of the tableau does not match the dimension of the stabilizer" + ) + self.tableau = tableau + else: + self.tableau = Stabilizer.generate_tableau( + n_qubits, n_meas, codestate_preparation_circ + ) + + def before(self, circuit): + if self.decode: + circuit.add_register(AncillaRegister(self.n_meas)) + + def during(self, circuit, timestep, reservoirnumber): + circuit.barrier() + + # encode + for k in range(self.n_meas): + beta = 3**(k/self.n_meas)/2*np.pi + pauliop = Pauli(self.tableau["destabilizer"][k]) + evo = PauliEvolutionGate(pauliop, -beta * timestep) + circuit.append(evo, range(self.n_qubits )) + circuit.barrier() + # reservoir + circuit.append(self.U[reservoirnumber], range(self.n_qubits)) + + # decode + cr = ClassicalRegister(self.n_meas) + circuit.add_register(cr) + + if self.decode: + Stabilizer.decoder(circuit, self.tableau) + + @staticmethod + def generate_tableau( + n_qubits: int, + n_meas: int, + codestate_preparation_circ: Iterable[QuantumCircuit] | None = None, + ): + """generates a tableau for a stabilizer code based on 2**k codestate preparation circuits""" + + logical_qubits = n_qubits - n_meas # also called k + + if codestate_preparation_circ == None: # generate random stabilizer code + tableau = random_clifford(n_qubits).to_dict() + + # turn the last k stabilizers into logical Zs + # tableau["logical_z"] = tableau["stabilizer"][n_meas:] #these are just for QEC fun, not useful here + tableau["stabilizer"] = tableau["stabilizer"][:n_meas] + + # turn the last k destabilizers into logical Xs + # tableau["logical_x"] = tableau["destabilizer"][n_meas:] + tableau["destabilizer"] = tableau["destabilizer"][:n_meas] + + elif len(codestate_preparation_circ) != 2**logical_qubits: + print( + "Error : number of code state preparation circuits does not match the dimension of the codespace" + ) + return + + else: + tableau = Clifford(codestate_preparation_circ[0]).to_dict() + for circ in codestate_preparation_circ[1:]: + circ_tableau = Clifford(circ).to_dict() + to_pop = [] + + for i in range(len(tableau["stabilizer"])): + if tableau["stabilizer"][i] not in circ_tableau["stabilizer"]: + to_pop.append(i) + + for i in to_pop: + tableau["stabilizer"].pop(i) + tableau["destabilizer"].pop(i) + + # check the stabilizer has the right dimension + if ( + len(tableau["stabilizer"]) != n_meas + or len(tableau["destabilizer"]) != n_meas + ): + print("Error : something went wrong with tableau generation") + print(tableau) + ''' + # Construct stabilizers: Z on qubit i, I elsewhere + stabilizers = [] + for i in range(n_meas): + pauli_str = ["I"] * n_qubits + pauli_str[i] = "Z" + stabilizers.append("+" + "".join(pauli_str)) + + # Construct destabilizers: X on qubit i, I elsewhere + destabilizers = [] + for i in range(n_meas): + pauli_str = ["I"] * n_qubits + pauli_str[i] = "X" + destabilizers.append("+" + "".join(pauli_str)) + + tableau = { + "stabilizer": stabilizers, + "destabilizer": destabilizers, + } + + print("stab,destab") + print(tableau["stabilizer"], tableau["destabilizer"]) + ''' + return tableau + + @staticmethod + def binary_array_to_integer(binary_array): + binary_string = "".join(map(str, binary_array.astype(int))) + decimal_integer = int(binary_string, 2) + return decimal_integer + + @staticmethod + def indices_of_ones(input_list, n): + indices = [ + n - 1 - index for index, value in enumerate(input_list) if value == 1.0 + ] + return indices # n-1-index because of Endian encoding of qiskit + + @staticmethod + def get_parity_measurements(x): + G_list = [] + for i in range(x.shape[0] - 1): + tmp = np.zeros(x.shape[0]) + tmp[i] = 1 + tmp[i + 1] = 1 + G_list.append(tmp) + G = np.array(G_list) + return np.remainder(G @ x, 2) + + @staticmethod + def build_decoder_map(n, standard): + decoder = {} + if standard: + for origin in list(product((0, 1), repeat=n - 1)): + p = np.array(origin) + p = Stabilizer.binary_array_to_integer(p) + flips = Stabilizer.indices_of_ones(origin, n - 1) + decoder[p] = flips + else: + for origin in list(product((0, 1), repeat=n)): + p = Stabilizer.get_parity_measurements(np.array(origin)) + p = Stabilizer.binary_array_to_integer(p) + flips = Stabilizer.indices_of_ones(origin, n) + if p in decoder: + if len(decoder[p]) > len(flips): + decoder[p] = flips + else: + decoder[p] = flips + return decoder + + @staticmethod + def apply_operations_for_integers(circ, c, integer_dict): + with circ.switch(c) as case: + for key, value in integer_dict.items(): + if value: + with case(key): + circ.x(value) + + @staticmethod + def decoder(circuit: QuantumCircuit, code_tableau: dict): + """ + Given a n-qubit state and a stabilizer code, detects errors and applies the operations + to project the state back to the codespace. + + Args: + circuit : state preparation circuit + code_tableau : dictionary {"stabilizer": [], "destabilizer": []} each list has n-k elements + + Returns: + circuit : state preparation circuit with syndrome measurement and error correction appended + """ + + n_qubits = circuit.num_qubits + n_meas = len(code_tableau["stabilizer"]) + + qr = circuit.qregs[0] + cr = circuit.cregs[-1] + ar = circuit.ancillas + + circuit.barrier() + + # syndrome measurement operations + circuit.reset(ar) + circuit.h(ar) + + for j in range(n_meas): + P = code_tableau["stabilizer"][j] + P_aux=P[1:][::-1] + if P[0] == str('-'): + circuit.z(ar[j]) + for i in range(0, len(P_aux)): + if P_aux[i] == "X": + circuit.cx(ar[j], qr[i]) + elif P_aux[i] == "Y": + circuit.cy(ar[j], qr[i]) + elif P_aux[i] == "Z": + circuit.cz(ar[j], qr[i]) + + circuit.h(ar) + + for j in range(n_meas): + circuit.measure(ar[j], cr[j]) + circuit.barrier() + for j in range(n_meas): + with circuit.if_test((cr[j], 1)): + circuit.pauli(code_tableau["destabilizer"][j][1:], qr) + return circuit