diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cut_and_eval_example.py b/cut_and_eval_example.py new file mode 100644 index 0000000..ebdc66f --- /dev/null +++ b/cut_and_eval_example.py @@ -0,0 +1,58 @@ +''' +Title: cut_and_eval.py +Description: Example of how to cut and evaluate for the purposes of +distributed reconstruction +''' + +import os, logging + +logging.disable(logging.WARNING) +os.environ["TF_CPP_MIN_LOG_LEVEL"] = "1" +os.environ["CUDA_VISIBLE_DEVICES"] = "-1" + +from cutqc.main import CutQC +from helper_functions.benchmarks import generate_circ + + +if __name__ == "__main__": + filename = "adder_example.pkl" + circ_type= 'adder' + circ_size=10 + max_width=10 + + # Generate Example Circuit and Initialize CutQC + circuit = generate_circ( + num_qubits=circ_size, + depth=1, + circuit_type=circ_type, + reg_name="q", + connected_only=True, + seed=None, + ) + + cutqc = CutQC( + name="%s_%d" % (circ_type, circ_size), + circuit=circuit, + cutter_constraints={ + "max_subcircuit_width": max_width, + "max_subcircuit_cuts": 10, + "subcircuit_size_imbalance": 2, + "max_cuts": 10, + "num_subcircuits": [2, 3, 4, 5, 6, 8], + }, + ) + + print ("--- Cut --- ") + cutqc.cut() + + if not cutqc.has_solution: + raise Exception("The input circuit and constraints have no viable cuts") + + print ("--- Evaluate ---") + cutqc.evaluate(eval_mode="sv", num_shots_fn=None) + + print ("--- Dumping CutQC Object into {} ---".format (filename)) + cutqc.save_cutqc_obj (filename) + + print ("Completed") + diff --git a/cut_and_eval_example.slurm b/cut_and_eval_example.slurm new file mode 100644 index 0000000..81aaf44 --- /dev/null +++ b/cut_and_eval_example.slurm @@ -0,0 +1,16 @@ +#!/bin/bash +#SBATCH --output='cut_and_eval_example.out' +#SBATCH --nodes=1 # node count +#SBATCH --ntasks-per-node=1 # total number of tasks across all nodes +#SBATCH --cpus-per-task=12 # cpu-cores per task (>1 if multi-threaded tasks) +#SBATCH --mem=40G # memory per cpu-core (4G is default) +#SBATCH --time=00:00:55 # total run time limit (HH:MM:SS) + +# Load Modules +module purge +module load anaconda3/2024.2 +conda activate CutQCSummer2025 +module load gurobi/12.0.0 + +python3 cut_and_eval_example.py + diff --git a/cutqc/abstract_graph_contractor.py b/cutqc/abstract_graph_contractor.py new file mode 100644 index 0000000..1bdea4d --- /dev/null +++ b/cutqc/abstract_graph_contractor.py @@ -0,0 +1,58 @@ +from abc import ABC, abstractmethod +from time import perf_counter +import numpy as np +from cutqc.post_process_helper import ComputeGraph + +class AbstractGraphContractor(ABC): + + @abstractmethod + def _compute(self): + pass + + def reconstruct(self, compute_graph: ComputeGraph, subcircuit_entry_probs: dict, num_cuts: int) -> None: + self.compute_graph = compute_graph + self.subcircuit_entry_probs = subcircuit_entry_probs + self.overhead = {"additions": 0, "multiplications": 0} + self.num_cuts = num_cuts + self._set_smart_order() + + start_time = perf_counter() + res = self._compute() + end_time = perf_counter() - start_time + self.times['compute'] = end_time + + return res + + def _set_smart_order(self) -> None: + """ + Sets the order in which Kronecker products are computed (greedy subcircuit order). + """ + subcircuit_entry_lengths = {} + for subcircuit_idx in self.subcircuit_entry_probs: + first_entry_init_meas = list(self.subcircuit_entry_probs[subcircuit_idx].keys())[0] + length = len(self.subcircuit_entry_probs[subcircuit_idx][first_entry_init_meas]) + subcircuit_entry_lengths[subcircuit_idx] = length + + # Sort according to subcircuit lengths (greedy-subcircuit-order) + self.smart_order = sorted( + subcircuit_entry_lengths.keys(), + key=lambda subcircuit_idx: subcircuit_entry_lengths[subcircuit_idx], + ) + + self.subcircuit_entry_lengths = [subcircuit_entry_lengths[i] for i in self.smart_order] + print(f"subcircuit_entry_length: {self.subcircuit_entry_lengths}", flush=True) + self.result_size = np.prod(self.subcircuit_entry_lengths) + + + def _get_subcircuit_entry_prob(self, subcircuit_idx: int): + """ + Returns The subcircuit Entry Probability for the subcircuit at index + 'SUBCIRCUIT_IDX' + """ + + subcircuit_entry_init_meas = self.compute_graph.get_init_meas(subcircuit_idx) + return self.subcircuit_entry_probs[subcircuit_idx][subcircuit_entry_init_meas] + + @abstractmethod + def _get_paulibase_probability(self, edge_bases: tuple, edges: list): + pass \ No newline at end of file diff --git a/cutqc/cutter.py b/cutqc/cutter.py index 20ff198..fea7fa0 100644 --- a/cutqc/cutter.py +++ b/cutqc/cutter.py @@ -134,11 +134,10 @@ def _add_constraints(self): """ for v in range(self.n_vertices): self.model.addConstr( + gp.quicksum( [self.vertex_var[i][v] for i in range(self.num_subcircuit)] - ), - gp.GRB.EQUAL, - 1, + ) == 1 ) """ @@ -310,41 +309,158 @@ def solve(self): def read_circ(circuit): dag = circuit_to_dag(circuit) edges = [] + node_name_ids = {} id_node_names = {} vertex_ids = {} + + topological_node_id = 0 + qubit_gate_counter = {} + + for qubit in dag.qubits: + qubit_gate_counter[qubit] = 0 + + # Assign vertices a unique id w.r.t. their topological order and a unique name + # given from its op and qubits + for vertex in dag.topological_op_nodes(): + if len(vertex.qargs) != 2: + raise Exception("vertex does not have 2 qargs!") + + arg0, arg1 = vertex.qargs + vertex_name = "%s[%d]%d %s[%d]%d" % ( + arg0._register.name, + arg0._index, + qubit_gate_counter[arg0], + arg1._register.name, + arg1._index, + qubit_gate_counter[arg1], + ) + + qubit_gate_counter[arg0] += 1 + qubit_gate_counter[arg1] += 1 + + + if vertex_name not in node_name_ids and vertex._node_id not in vertex_ids: + node_name_ids[vertex_name] = topological_node_id + id_node_names[topological_node_id] = vertex_name + vertex_ids[vertex._node_id] = topological_node_id + + topological_node_id += 1 + + # Collect edge ids + for u, v, _ in dag.edges(): + + if type(u) == DAGOpNode and type(v) == DAGOpNode: + u_id = vertex_ids[u._node_id] + v_id = vertex_ids[v._node_id] + + edges.append((u_id, v_id)) + + n_vertices = dag.size() + + return n_vertices, edges, node_name_ids, id_node_names + + +def read_circ_2(circuit): + + dag = circuit_to_dag(circuit) + edges = [] + + node_name_ids = {} + id_node_names = {} + vertex_ids = {} + curr_node_id = 0 qubit_gate_counter = {} + + all_nodes_sanity = [] + all_nodes_sanity_name = [] + for qubit in dag.qubits: qubit_gate_counter[qubit] = 0 + i = 0 for vertex in dag.topological_op_nodes(): + + all_nodes_sanity.append (id(vertex)) + if len(vertex.qargs) != 2: raise Exception("vertex does not have 2 qargs!") - + arg0, arg1 = vertex.qargs vertex_name = "%s[%d]%d %s[%d]%d" % ( arg0._register.name, arg0._index, qubit_gate_counter[arg0], + arg1._register.name, arg1._index, qubit_gate_counter[arg1], ) + + all_nodes_sanity_name.append (vertex_name) + qubit_gate_counter[arg0] += 1 qubit_gate_counter[arg1] += 1 - # print(vertex.op.label,vertex_name,curr_node_id) + + + ## Add if vertex_name not in node_name_ids and id(vertex) not in vertex_ids: + i = i + 1 + print (f"vertex name Executed! {id(vertex)}") node_name_ids[vertex_name] = curr_node_id id_node_names[curr_node_id] = vertex_name vertex_ids[id(vertex)] = curr_node_id curr_node_id += 1 - for u, v, _ in dag.edges(): - if isinstance(u, DAGOpNode) and isinstance(v, DAGOpNode): + i = 0 + for u, v, _ in dag.edges(): + # if isinstance(u, DAGOpNode) and isinstance(v, DAGOpNode): + print (f"u is in list: {id(u) in all_nodes_sanity }") + print (f"v is in list: {id(v) in all_nodes_sanity }") + + print (f"Type(u){type(u)}") + print (f"Type(v){type(v)}") + + if type(u) == DAGOpNode and type(v) == DAGOpNode: + i = i + 1 + + print (u.op.name) + print (v.op.name) + print (f"Line Executed! {i}") + + arg0, arg1 = u.qargs + + vertex_name_u = "%s[%d]%d %s[%d]%d" % ( + arg0._register.name, + arg0._index, + qubit_gate_counter[arg0], + + arg1._register.name, + arg1._index, + qubit_gate_counter[arg1], + ) + arg0, arg1 = v.qargs + vertex_name_v = "%s[%d]%d %s[%d]%d" % ( + arg0._register.name, + arg0._index, + qubit_gate_counter[arg0], + + arg1._register.name, + arg1._index, + qubit_gate_counter[arg1], + ) + print (f"u name is in list: {id(u) in all_nodes_sanity }") + print (f"v name is in list: {id(v) in all_nodes_sanity }") + + print (id(u)) + print (id(v)) + + ## Ensure end nodes are in the all node list + u_id = vertex_ids[id(u)] v_id = vertex_ids[id(v)] - edges.append((u_id, v_id)) + edges.append((u_id, v_id)) n_vertices = dag.size() return n_vertices, edges, node_name_ids, id_node_names diff --git a/cutqc/distributed_graph_contraction.py b/cutqc/distributed_graph_contraction.py new file mode 100644 index 0000000..d49bbe5 --- /dev/null +++ b/cutqc/distributed_graph_contraction.py @@ -0,0 +1,215 @@ +""" +File: distributed_graph_contraction.py +Original Author: Wei Tang (tangwei13579@gmail.com) +Current Version Author: Charles "Chuck" Garcia (chuckgarcian@utexas.edu) +Description: Distributed implementation of Wei Tang's original TensorFlow CutQC implementation. +""" + +import itertools +from time import perf_counter +from typing import List, Optional +import numpy as np +import torch +import torch.distributed as dist +from cutqc.abstract_graph_contractor import AbstractGraphContractor +from cutqc.post_process_helper import ComputeGraph + +__host_machine__ = 0 + +class DistributedGraphContractor(AbstractGraphContractor): + """ + Distributed Graph Contractor Implementation + + Args: + local_rank (int): Node identifier value + compute_backend (str): Device used for compute (Default is GPU) + + """ + def __init__(self, local_rank: Optional[int] = None, compute_backend: str = 'gpu') -> None: + self.local_rank = local_rank + + # Set up compute devices based on backend + self.mp_backend = torch.device(f"cuda:{local_rank}" if dist.get_backend() == 'nccl' else "cpu") # Deviced used MP + self.compute_device = torch.device(f"cuda:{local_rank}") if compute_backend == 'gpu' else self.mp_backend + self.is_gpu = compute_backend == 'gpu' + + print ("Worker {}, compute_device: {}".format (dist.get_rank(), self.compute_device), flush=True) + + if dist.get_rank() != __host_machine__: + self._initiate_worker_loop() + + self.times = {'compute': 0} + self.compute_graph = None + self.subcircuit_entry_probs = None + self.reconstructed_prob = None + + + def terminate_distributed_process(self): + """ + Sends signal to workers to finish their execution. + """ + termination_signal = torch.tensor([-1], dtype=torch.int64).to(self.mp_backend) + for rank in range(1, dist.get_world_size()): + dist.send(termination_signal, dst=rank) + + print(f"DESTROYING NOW! {self.times['compute']}", flush=True) + dist.destroy_process_group() + + def _get_paulibase_probability (self, edge_bases: tuple, edges: list): + """ + Returns probability contribution for the basis 'edge_bases' in the circuit + cutting decomposition. + """ + with torch.no_grad(): + self.compute_graph.assign_bases_to_edges(edge_bases=edge_bases, edges=edges) + + # Create list of kronecker product terms + flat_size = np.sum(self.subcircuit_entry_lengths) + flat = torch.empty(flat_size) + idx = 0 + + # Store all probability tensors into single flattened tensor + for size, subcircuit_idx in zip(self.subcircuit_entry_lengths, self.smart_order): + subcircuit_entry_prob = self._get_subcircuit_entry_prob(subcircuit_idx) + flat[idx:idx+size] = torch.tensor(subcircuit_entry_prob, dtype=torch.float32) + idx += size + + return flat + + def _send_distributed(self, dataset: List[torch.Tensor], num_batches: int) -> torch.Tensor: + """ + Decomposes `dataset` list into 'num_batches' number of batches and distributes + to worker processes. + """ + torch.set_default_device(self.mp_backend) + + with torch.no_grad(): + print ("LEN(DATASET): {}".format (len(dataset)), flush=True) + print ("NUMBER BATCHES: {}".format (num_batches), flush=True) + if len(dataset) < num_batches: + raise ValueError("Error 2000: Invalid number of requested batches -- Too many nodes allocated, for dataset length {} and {} number of batches".format (len(dataset), num_batches)) + + batches = torch.stack(dataset).tensor_split(num_batches) + tensor_sizes = torch.tensor(self.subcircuit_entry_lengths, dtype=torch.int64) + tensor_sizes_shape = torch.tensor(tensor_sizes.shape, dtype=torch.int64) + + if dist.get_backend() == 'gloo': + op_list = [] + # List of sending objects + for dst, batch in enumerate(batches, start=1): + op_list.extend([ + dist.P2POp(dist.isend, tensor_sizes_shape, dst), + dist.P2POp(dist.isend, tensor_sizes, dst), + dist.P2POp(dist.isend, torch.tensor(batch.shape, dtype=torch.int64), dst), + dist.P2POp(dist.isend, batch, dst), + ]) + handles = dist.batch_isend_irecv(op_list) + else: + # NCCL backend + for dst_rank, batch in enumerate(batches, start=1): + # Non-Blocking send on NCCL + dist.isend(tensor_sizes_shape, dst=dst_rank) + dist.isend(tensor_sizes, dst=dst_rank) + dist.isend(torch.tensor(batch.shape), dst=dst_rank) + dist.isend(batch.to(self.compute_device), dst=dst_rank) + + # Receive Results + output_buff = torch.zeros(self.result_size, dtype=torch.float32) + dist.reduce(output_buff, dst=0, op=dist.ReduceOp.SUM) + + return torch.mul(output_buff, (1/2**self.num_cuts)) + + def _compute(self) -> np.ndarray: + """ + Performs distributed graph contraction. Returns the reconstructed probability. + """ + edges = self.compute_graph.get_edges(from_node=None, to_node=None) + summation_terms_sequence = [] + + # Assemble sequence of uncomputed kronecker products + for edge_bases in itertools.product(["I", "X", "Y", "Z"], repeat=len(edges)): + summation_terms = self._get_paulibase_probability(edge_bases, edges) + summation_terms_sequence.append(summation_terms) + + self.compute_graph.remove_bases_from_edges(edges=self.compute_graph.edges) + + # Distribute and Execute reconstruction on nodes + num_batches = dist.get_world_size() - 1 # No batch for host + reconstructed_prob = self._send_distributed(summation_terms_sequence, num_batches) + + return reconstructed_prob.cpu().numpy() + + + def _receive_from_host(self): + """ + Receives tensors sent by host. Returns batch and unpadded sizes. + """ + torch.set_default_device(self.mp_backend) + torch.cuda.device(self.compute_device) + if (self.is_gpu): torch.cuda.device(self.compute_device) + + with torch.no_grad(): + tensor_sizes_shape = torch.empty([1], dtype=torch.int64) + dist.recv(tensor=tensor_sizes_shape, src=0) + + # Check for termination signal + if tensor_sizes_shape.item() == -1: + print(f"WORKER {dist.get_rank()} DYING", flush=True) + dist.destroy_process_group() + exit() + + # Used to unflatten + tensor_sizes = torch.empty(tensor_sizes_shape, dtype=torch.int64) + dist.recv(tensor=tensor_sizes, src=0) + + # Get shape of the batch we are receiving + batch_shape = torch.empty([2], dtype=torch.int64) + dist.recv(tensor=batch_shape, src=0) + + # Create an empty batch tensor and receive its data + batch = torch.empty(tuple(batch_shape), dtype=torch.float32) + dist.recv(tensor=batch, src=0) + + return batch_shape[0], batch, tensor_sizes + + def _initiate_worker_loop(self): + """ + Primary worker loop. + + Each worker receives a portion of the workload from the host/master node. + Once done with computation, all nodes perform a collective reduction + operation back to the host. Synchronization among nodes is provided via + barriers and blocked message passing. + """ + from pprint import pprint + + while True: + torch.cuda.device(self.compute_device) + num_batches, batch, tensor_sizes = self._receive_from_host() + + # Ensure Enough Size + gpu_free = torch.cuda.mem_get_info()[0] + batch_mem_size = batch.element_size() * torch.prod(tensor_sizes) * num_batches + assert (batch_mem_size < gpu_free), ValueError ("Error 2006: Batch of size {}, to large for GPU device of size {}".format (batch_mem_size, gpu_free)) + + # Execute kronecker products in parallel (vectorization) + torch.cuda.memory._record_memory_history() + lambda_fn = lambda x: compute_kronecker_product(x, tensor_sizes) + vec_fn = torch.func.vmap(lambda_fn) + res = vec_fn(batch) + torch.cuda.memory._dump_snapshot("compute_snap.pickle") + + del (batch) + res = res.sum(dim=0) + + # Send Back to host + dist.reduce(res.to(self.mp_backend), dst=__host_machine__, op=dist.ReduceOp.SUM) + + +from functools import reduce +def compute_kronecker_product(flattened: torch.Tensor, sizes: torch.Tensor) -> torch.Tensor: + """ + Computes sequence of Kronecker products, where operands are tensors in 'components'. + """ + tensors = torch.split(flattened, tuple(sizes)) + return reduce(torch.kron, tensors) diff --git a/cutqc/dynamic_definition.py b/cutqc/dynamic_definition.py index b738959..b1f3d52 100644 --- a/cutqc/dynamic_definition.py +++ b/cutqc/dynamic_definition.py @@ -1,20 +1,24 @@ import itertools, copy, pickle, subprocess from time import perf_counter import numpy as np +import torch from helper_functions.non_ibmq_functions import evaluate_circ from helper_functions.conversions import quasi_to_real from helper_functions.metrics import MSE from cutqc.evaluator import get_num_workers -from cutqc.graph_contraction import GraphContractor +# from cutqc.graph_contraction import GraphContractor +from cutqc.distributed_graph_contraction import DistributedGraphContractor from cutqc.helper_fun import add_times from cutqc.post_process_helper import get_reconstruction_qubit_order +from cutqc.graph_contraction import GraphContractor +import torch.distributed as dist class DynamicDefinition(object): def __init__( - self, compute_graph, data_folder, num_cuts, mem_limit, recursion_depth + self, compute_graph, data_folder, num_cuts, mem_limit, recursion_depth, pytorch_distributed=False, local_rank=None, compute_backend='gpu' ) -> None: super().__init__() self.compute_graph = compute_graph @@ -23,8 +27,11 @@ def __init__( self.mem_limit = mem_limit self.recursion_depth = recursion_depth self.dd_bins = {} - self.overhead = {"additions": 0, "multiplications": 0} + self.local_rank = local_rank + self.graph_contractor = DistributedGraphContractor (local_rank=self.local_rank, compute_backend=compute_backend) if (pytorch_distributed) else GraphContractor() + self.pytorch_distributed = pytorch_distributed + self.overhead = {"additions": 0, "multiplications": 0} self.times = {"get_dd_schedule": 0, "merge_states_into_bins": 0, "sort": 0} def build(self): @@ -42,6 +49,7 @@ def build(self): ) largest_bins = [] # [{recursion_layer, bin_id}] recursion_layer = 0 + while recursion_layer < self.recursion_depth: # print('-'*10,'Recursion Layer %d'%(recursion_layer),'-'*10) """Get qubit states""" @@ -56,25 +64,25 @@ def build(self): recursion_layer=bin_to_expand["recursion_layer"], bin_id=bin_to_expand["bin_id"], ) - pickle.dump( + pickle.dump ( dd_schedule, open("%s/dd_schedule.pckl" % self.data_folder, "wb") ) self.times["get_dd_schedule"] += perf_counter() - get_dd_schedule_begin - merged_subcircuit_entry_probs = self.merge_states_into_bins() """ Build from the merged subcircuit entries """ - graph_contractor = GraphContractor( + reconstructed_prob = self.graph_contractor.reconstruct ( compute_graph=self.compute_graph, subcircuit_entry_probs=merged_subcircuit_entry_probs, - num_cuts=self.num_cuts, - ) - reconstructed_prob = graph_contractor.reconstructed_prob - smart_order = graph_contractor.smart_order - recursion_overhead = graph_contractor.overhead + num_cuts=self.num_cuts + ) + + + smart_order = self.graph_contractor.smart_order + recursion_overhead = self.graph_contractor.overhead self.overhead["additions"] += recursion_overhead["additions"] self.overhead["multiplications"] += recursion_overhead["multiplications"] - self.times = add_times(times_a=self.times, times_b=graph_contractor.times) + self.times = add_times(times_a=self.times, times_b=self.graph_contractor.times) self.dd_bins[recursion_layer] = dd_schedule self.dd_bins[recursion_layer]["smart_order"] = smart_order @@ -107,6 +115,12 @@ def build(self): )[: self.recursion_depth] self.times["sort"] += perf_counter() - sort_begin recursion_layer += 1 + + # Terminate the parallized process + print("Compute Time: {}".format (self.graph_contractor.times["compute"])) + # if (self.pytorch_distributed): + # self.graph_contractor.terminate_distributed_process() + def initialize_dynamic_definition_schedule(self): schedule = {} @@ -326,9 +340,20 @@ def full_verify(full_circuit, complete_path_map, subcircuits, dd_bins): real_probability = quasi_to_real( quasiprobability=reconstructed_prob, mode="nearest" ) + # print (f"MSE: {MSE(target=ground_truth, obs=real_probability)}") + # print ("real_probability: {}".format (real_probability)) + # print ("real_probability.shape: {}".format (real_probability.shape)) + # print ("ground_truth: {}".format (ground_truth)) + # print ("ground_truth.shape: {}".format (ground_truth.shape)) + approximation_error = ( MSE(target=ground_truth, obs=real_probability) * 2**full_circuit.num_qubits / np.linalg.norm(ground_truth) ** 2 ) + + + # print (f"Reconstructed Error: {reconstructed_prob}") + # print (f"Real Error: {real_probability}") + return reconstructed_prob, approximation_error diff --git a/cutqc/graph_contraction.py b/cutqc/graph_contraction.py index 0c7fb80..f04b1c8 100644 --- a/cutqc/graph_contraction.py +++ b/cutqc/graph_contraction.py @@ -2,96 +2,82 @@ from time import perf_counter import numpy as np import logging, os +from cutqc.post_process_helper import ComputeGraph +from cutqc.abstract_graph_contractor import AbstractGraphContractor +import tensorflow as tf + logging.disable(logging.WARNING) os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" -import tensorflow as tf -def compute_summation_term(*argv): - summation_term = None - for subcircuit_entry_prob in argv: - if summation_term is None: - summation_term = subcircuit_entry_prob - else: - summation_term = tf.reshape( - tf.tensordot(summation_term, subcircuit_entry_prob, axes=0), [-1] - ) - return summation_term +class GraphContractor(AbstractGraphContractor): + def __init__(self) -> None: + super().__init__() + self.times = {} + self.reconstructed_prob = None + + # Used to compute + self.compute_graph = None + self.subcircuit_entry_probs = None + self.num_cuts = None + + def _get_paulibase_probability(self, edge_bases: tuple, edges: list): + """ + Returns the probability contribution for the basis 'EDGE_BASES' in the circuit + cutting decomposition. + """ -class GraphContractor(object): - def __init__(self, compute_graph, subcircuit_entry_probs, num_cuts) -> None: - super().__init__() - self.times = {} - self.compute_graph = compute_graph - self.subcircuit_entry_probs = subcircuit_entry_probs - self.num_cuts = num_cuts - self.subcircuit_entry_lengths = {} - for subcircuit_idx in subcircuit_entry_probs: - first_entry_init_meas = list(subcircuit_entry_probs[subcircuit_idx].keys())[ - 0 - ] - length = len(subcircuit_entry_probs[subcircuit_idx][first_entry_init_meas]) - self.subcircuit_entry_lengths[subcircuit_idx] = length - self.num_qubits = 0 - for subcircuit_idx in compute_graph.nodes: - self.num_qubits += compute_graph.nodes[subcircuit_idx]["effective"] + summation_term = None + self.compute_graph.assign_bases_to_edges(edge_bases=edge_bases, edges=edges) - self.smart_order = sorted( - self.subcircuit_entry_lengths.keys(), - key=lambda subcircuit_idx: self.subcircuit_entry_lengths[subcircuit_idx], - ) - self.overhead = {"additions": 0, "multiplications": 0} - self.reconstructed_prob = self.compute() + for subcircuit_idx in self.smart_order: + subcircuit_entry_prob = self._get_subcircuit_entry_prob(subcircuit_idx) + if summation_term is None: + summation_term = subcircuit_entry_prob + else: + summation_term = tf.reshape( + tf.tensordot(summation_term, subcircuit_entry_prob, axes=0), + [-1], + ) + self.overhead["multiplications"] += len(summation_term) - def compute(self): + return summation_term + + def _compute(self): + ''' + Internal function that actualy does the reconstruct + ''' edges = self.compute_graph.get_edges(from_node=None, to_node=None) - make_dataset_begin = perf_counter() - dataset = None + partial_compute_begin = perf_counter() + reconstructed_prob = tf.zeros_like(self._get_paulibase_probability(["I"] * len(edges), edges)) + counter = 0 + + # Compute Kronecker sums over the different basis for edge_bases in itertools.product(["I", "X", "Y", "Z"], repeat=len(edges)): - self.compute_graph.assign_bases_to_edges(edge_bases=edge_bases, edges=edges) - summation_term = [] - cumulative_len = 1 - for subcircuit_idx in self.smart_order: - subcircuit_entry_init_meas = self.compute_graph.get_init_meas( - subcircuit_idx=subcircuit_idx - ) - subcircuit_entry_prob = self.subcircuit_entry_probs[subcircuit_idx][ - subcircuit_entry_init_meas - ] - summation_term.append(subcircuit_entry_prob) - cumulative_len *= len(subcircuit_entry_prob) - self.overhead["multiplications"] += cumulative_len - self.overhead["multiplications"] -= len(summation_term[0]) - dataset_elem = tf.data.Dataset.from_tensors(tuple(summation_term)) - if dataset is None: - dataset = dataset_elem - else: - dataset = dataset.concatenate(dataset_elem) + summation_term = self._get_paulibase_probability(edge_bases, edges) + reconstructed_prob = tf.add(reconstructed_prob, summation_term) + self.overhead["additions"] += len(summation_term) + counter += 1 + self.compute_graph.remove_bases_from_edges(edges=self.compute_graph.edges) - dataset = dataset.batch( - batch_size=1, num_parallel_calls=tf.data.AUTOTUNE, deterministic=False - ) - self.times["make_dataset"] = perf_counter() - make_dataset_begin - - compute_begin = perf_counter() - dataset = dataset.map( - compute_summation_term, - num_parallel_calls=tf.data.AUTOTUNE, - deterministic=False, - ) + partial_compute_time = perf_counter() - partial_compute_begin - reconstructed_prob = None - for x in dataset: - if reconstructed_prob is None: - reconstructed_prob = x - else: - self.overhead["additions"] += len(reconstructed_prob) - reconstructed_prob += x + scale_begin = perf_counter() reconstructed_prob = tf.math.scalar_mul( 1 / 2**self.num_cuts, reconstructed_prob ).numpy() - self.times["compute"] = perf_counter() - compute_begin + scale_time = perf_counter() - scale_begin + + self.times["compute"] = ( + partial_compute_time / counter * 4 ** len(edges) + scale_time + ) + self.overhead["additions"] = int( + self.overhead["additions"] / counter * 4 ** len(edges) + ) + self.overhead["multiplications"] = int( + self.overhead["multiplications"] / counter * 4 ** len(edges) + ) return reconstructed_prob diff --git a/cutqc/main.py b/cutqc/main.py index b0d8cde..6f7052b 100644 --- a/cutqc/main.py +++ b/cutqc/main.py @@ -1,4 +1,5 @@ import subprocess, os +import pickle from time import perf_counter from cutqc.helper_fun import check_valid, add_times @@ -10,6 +11,10 @@ ) from cutqc.dynamic_definition import DynamicDefinition, full_verify +from datetime import timedelta +import torch.distributed as dist + +__host_machine__ = 0 class CutQC: """ @@ -17,27 +22,129 @@ class CutQC: cut --> evaluate results --> verify (optional) """ - def __init__(self, name, circuit, cutter_constraints, verbose): + def __init__(self, + name=None, + circuit=None, + cutter_constraints=None, + verbose=False, + pytorch_distributed=False, + reconstruct_only=False, + load_data=None, + compute_backend='gpu', + comm_backend = 'nccl', + timeout=1, + gpus_per_node = None, + world_rank = None, + world_size = None, + ): """ Args: - name : name of the input quantum circuit - circuit : the input quantum circuit - cutter_constraints : cutting constraints to satisfy + name: name of the input quantum circuit + circuit: the input quantum circuit + cutter_constraints: cutting constraints to satisfy verbose: setting verbose to True to turn on logging information. - Useful to visualize what happens, - but may produce very long outputs for complicated circuits. + Useful to visualize what happens, + but may produce very long outputs for complicated circuits. + + --- Distributed Reconstruction Related Arguments --- + + pytorch_distributed (Optional): When set to 'True', reconstruction + is executed distributed using pytorch. Otherwise when 'False', + framework used in Tensorflow, single node. Default FALSE. + + reconstruct_only (Optional): When enabled, cutqc performs only reconstructions. + Executing with Pytorch requires that this be 'TRUE'. + Default FALSE + + load_data (Optional): String of file name to load subcircuits outputs + from a previous CutQC instance. Default None. + + compute_backend (Optional): Compute processing device used if + pytorch_distributed is set to 'TRUE'. + 'cpu' for cpu and 'gpu' for gpu. Default GPU + timeout (Optional): Integer bounded wait time to prevent deadlock between nodes. + + comm_backend (Optional): message passing backend internally used by pytorch for + sending data between nodes. Default NCCL. + gpus_per_node (Optional): Number of GPUs per node in the case they are + used as the compute backend. + world_rank (Optional): Global Identifier. Default None. + world_size (Optional): Total number of nodes + """ - check_valid(circuit=circuit) + assert not (pytorch_distributed is False and reconstruct_only is True), "Executing with pytorch requires 'reconstruct_only' be true." + self.name = name self.circuit = circuit - self.cutter_constraints = cutter_constraints + self.cutter_constraints = cutter_constraints self.verbose = verbose self.times = {} + + self.compute_graph = None + self.tmp_data_folder = None + self.num_cuts = None + self.complete_path_map = None + self.subcircuits = None + self.local_rank = None + self.compute_backend = compute_backend + self.pytorch_distributed = pytorch_distributed + + if reconstruct_only: + # Multi node - Pytorch Version + if pytorch_distributed: + self.compute_backend = compute_backend + self._setup_for_dist_reconstruction (load_data, comm_backend, world_rank, world_size, gpus_per_node, timeout) + + # Single node - Tensorflow Version + else: + self._load_data(load_data) + + elif not reconstruct_only: + # Cutting, evaluation and reconstruction are occurring all at once. + self._initialize_for_serial_reconstruction(circuit) + + def _setup_for_dist_reconstruction (self, load_data, comm_backend: str, world_rank: int, world_size: int, gpus_per_node: int, timeout: int): + """ + Sets up to call the distributed kernel. Worker nodes + + Args: + comm_backend: message passing backend internally used by pytorch for + sending data between nodes + world_rank: Global Identifier + world_size: Total number of nodes + timeout: Max amount of time pytorch will let any one node wait on + a message before killing it. + """ + # GPU identifer on local compute cluster + self.local_rank = world_rank - gpus_per_node * (world_rank // gpus_per_node) + self.pytorch_distributed = True + timelimit = timedelta(hours=timeout) # Bounded wait time to prevent deadlock + + dist.init_process_group(comm_backend, rank=world_rank, world_size=world_size, timeout=timelimit) # + + # Only host should load subcircuits data + if dist.get_rank() == __host_machine__: + # Todo: I think ideally the workers should on start load their own data + self._load_data(load_data) + + def _load_data(self, load_data): + with open(load_data, 'rb') as inp: + loaded_cutqc = pickle.load(inp) + self.__dict__.update(vars(loaded_cutqc)) + + def _initialize_for_serial_reconstruction(self, circuit): + check_valid(circuit=circuit) self.tmp_data_folder = "cutqc/tmp_data" + self._setup_tmp_folder() + + def _setup_tmp_folder(self): if os.path.exists(self.tmp_data_folder): subprocess.run(["rm", "-r", self.tmp_data_folder]) os.makedirs(self.tmp_data_folder) + + def destroy_distributed (self): + self.dd.graph_contractor.terminate_distributed_process() def cut(self): """ @@ -73,9 +180,10 @@ def cut(self): ) for field in cut_solution: self.__setattr__(field, cut_solution[field]) + if "complete_path_map" in cut_solution: self.has_solution = True - self._generate_metadata() + self._generate_metadata () else: self.has_solution = False self.times["cutter"] = perf_counter() - cutter_begin @@ -104,34 +212,41 @@ def build(self, mem_limit, recursion_depth): """ if self.verbose: print("--> Build %s" % (self.name)) - - # Keep these times and discard the rest - self.times = { - "cutter": self.times["cutter"], - "evaluate": self.times["evaluate"], - } - - build_begin = perf_counter() - dd = DynamicDefinition( + + # print ("self.pytorch_distributed': {}".format (self.pytorch_distributed)) + + self.dd = DynamicDefinition( compute_graph=self.compute_graph, data_folder=self.tmp_data_folder, num_cuts=self.num_cuts, mem_limit=mem_limit, recursion_depth=recursion_depth, + pytorch_distributed=self.pytorch_distributed, + local_rank=self.local_rank, + compute_backend=self.compute_backend ) - dd.build() + self.dd.build () - self.times = add_times(times_a=self.times, times_b=dd.times) - self.approximation_bins = dd.dd_bins + self.times = add_times(times_a=self.times, times_b=self.dd.times) + self.approximation_bins = self.dd.dd_bins self.num_recursions = len(self.approximation_bins) - self.overhead = dd.overhead - self.times["build"] = perf_counter() - build_begin - self.times["build"] += self.times["cutter"] - self.times["build"] -= self.times["merge_states_into_bins"] + self.overhead = self.dd.overhead + # self.times["build"] = perf_counter() - build_begin + # self.times["build"] += self.times["cutter"] + # self.times["build"] -= self.times["merge_states_into_bins"] if self.verbose: print("Overhead = {}".format(self.overhead)) + return self.dd.graph_contractor.times["compute"] + + def save_eval_data (self, foldername: str) -> None: + ''' + Saves subcircuit evaluation data which can be used in a future + instance of `cutqc` for reconstruction. + ''' + subprocess.run(["cp", "-r", self.tmp_data_folder, foldername]) + def verify(self): verify_begin = perf_counter() reconstructed_prob, self.approximation_error = full_verify( @@ -140,8 +255,18 @@ def verify(self): subcircuits=self.subcircuits, dd_bins=self.approximation_bins, ) + + print (f"Approximate Error: {self.approximation_error}") print("verify took %.3f" % (perf_counter() - verify_begin)) + return self.approximation_error + def save_cutqc_obj (self, filename : str) -> None: + ''' + Saves CutQC instance as the pickle file 'FILENAME' + ''' + with open (filename, 'wb') as outp: + pickle.dump(self, outp, pickle.HIGHEST_PROTOCOL) + def clean_data(self): subprocess.run(["rm", "-r", self.tmp_data_folder]) @@ -174,7 +299,8 @@ def _run_subcircuits(self): if os.path.exists(self.tmp_data_folder): subprocess.run(["rm", "-r", self.tmp_data_folder]) os.makedirs(self.tmp_data_folder) - run_subcircuit_instances( + + run_subcircuit_instances ( subcircuits=self.subcircuits, subcircuit_instances=self.subcircuit_instances, eval_mode=self.eval_mode, diff --git a/cutqc_runtime/graph_contraction.py b/cutqc_runtime/graph_contraction.py index f3ce8fc..66730b0 100644 --- a/cutqc_runtime/graph_contraction.py +++ b/cutqc_runtime/graph_contraction.py @@ -60,9 +60,8 @@ def compute(self): self.compute_graph.assign_bases_to_edges(edge_bases=edge_bases, edges=edges) summation_term = None for subcircuit_idx in self.smart_order: - subcircuit_entry_prob = self.pseudo_subcircuit_entry_probs[ - subcircuit_idx - ] + subcircuit_entry_prob = self.pseudo_subcircuit_entry_probs[subcircuit_idx] + if summation_term is None: summation_term = subcircuit_entry_prob else: @@ -71,6 +70,7 @@ def compute(self): [-1], ) self.overhead["multiplications"] += len(summation_term) + if reconstructed_prob is None: reconstructed_prob = summation_term else: diff --git a/data_collection_scripts/collect_data.py b/data_collection_scripts/collect_data.py new file mode 100644 index 0000000..443ccb8 --- /dev/null +++ b/data_collection_scripts/collect_data.py @@ -0,0 +1,48 @@ +# Author: Ellie Vogel + +import os +import subprocess + +# Define the sets of variables +variable_sets = [ + {'circuit_size': 22, 'max_subcircuit_width': 20, 'circuit_type': 'adder'}, + {'circuit_size': 24, 'max_subcircuit_width': 20, 'circuit_type': 'adder'}, + {'circuit_size': 26, 'max_subcircuit_width': 20, 'circuit_type': 'adder'}, + {'circuit_size': 28, 'max_subcircuit_width': 20, 'circuit_type': 'adder'}, + {'circuit_size': 30, 'max_subcircuit_width': 20, 'circuit_type': 'adder'} +] + +# Read the SLURM script template +with open('run.slurm', 'r') as file: + slurm_template = file.read() + +# Directory to store generated SLURM scripts +slurm_scripts_dir = 'generated_slurm_scripts' +os.makedirs(slurm_scripts_dir, exist_ok=True) + +previous_job_id = None + +# Generate and submit SLURM scripts for each set of variables +for i, variables in enumerate(variable_sets): + slurm_script_content = slurm_template.format(**variables) + slurm_script_path = os.path.join(slurm_scripts_dir, f'slurm_script_{i}.slurm') + + # Write the generated SLURM script to a file + with open(slurm_script_path, 'w') as slurm_script_file: + slurm_script_file.write(slurm_script_content) + + # Construct the sbatch command + sbatch_command = ['sbatch'] + if previous_job_id: + sbatch_command.extend(['--dependency=afterok:' + previous_job_id]) + sbatch_command.append(slurm_script_path) + + # Submit the SLURM script using sbatch and capture the job ID + result = subprocess.run(sbatch_command, capture_output=True, text=True) + output = result.stdout.strip() + + # Extract job ID from sbatch output + job_id = output.split()[-1] + previous_job_id = job_id + +print('All SLURM scripts have been submitted.') \ No newline at end of file diff --git a/data_collection_scripts/example.py b/data_collection_scripts/example.py new file mode 100644 index 0000000..2330feb --- /dev/null +++ b/data_collection_scripts/example.py @@ -0,0 +1,66 @@ +import os +import math +import logging +import argparse + +logging.disable(logging.WARNING) +os.environ["TF_CPP_MIN_LOG_LEVEL"] = "1" +# Comment this line if using GPU +os.environ["CUDA_VISIBLE_DEVICES"] = "-1" + +# from cutqc_runtime.main import CutQC # Use this just to benchmark the runtime +from cutqc.main import CutQC # Use this for exact computation +# from cutqc_runtime.main import CutQC # Use this for exact computation +from helper_functions.benchmarks import generate_circ + +def main(circuit_size, max_subcircuit_width, circuit_type): + circuit_type = circuit_type + circuit = generate_circ( + num_qubits=circuit_size, + depth=1, + circuit_type=circuit_type, + reg_name="q", + connected_only=True, + seed=None, + ) + cutqc = CutQC( + name="%s_%d_%d" % (circuit_type, max_subcircuit_width, circuit_size), + circuit=circuit, + cutter_constraints={ + "max_subcircuit_width": max_subcircuit_width, + # "max_subcircuit_width": math.ceil(circuit.num_qubits / 4 * 3), + "max_subcircuit_cuts": 10, + "subcircuit_size_imbalance": 2, + "max_cuts": 10, + "num_subcircuits": [2, 3, 4, 5, 6, 7, 8], + }, + verbose=True, + ) + + print("-- Cut --") + cutqc.cut() + if not cutqc.has_solution: + raise Exception("The input circuit and constraints have no viable cuts") + print("-- Done Cutting -- \n") + + print("-- Evaluate --") + cutqc.evaluate(eval_mode="sv", num_shots_fn=None) + print("-- Done Evaluating -- \n") + + print("-- Build --") + cutqc.build(mem_limit=128, recursion_depth=1) + print("-- Done Building -- \n") + + # cutqc.verify() + # print("Cut: %d recursions." % (cutqc.num_recursions)) + # print(cutqc.approximation_bins) + cutqc.clean_data() + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run CutQC with given parameters") + parser.add_argument('--circuit_size', type=int, required=True, help='Size of the circuit') + parser.add_argument('--max_subcircuit_width', type=int, required=True, help='Max width of subcircuit') + parser.add_argument('--circuit_type', type=str, required=True, help='Circuit Type') + args = parser.parse_args() + + main(args.circuit_size, args.max_subcircuit_width, args.circuit_type) \ No newline at end of file diff --git a/example.py b/example.py deleted file mode 100644 index 2ff1b5b..0000000 --- a/example.py +++ /dev/null @@ -1,46 +0,0 @@ -import os, math, logging - -# from cutqc_runtime.main import CutQC # Use this just to benchmark the runtime - -from cutqc.main import CutQC # Use this for exact computation - -from helper_functions.benchmarks import generate_circ - -logging.disable(logging.WARNING) -os.environ["TF_CPP_MIN_LOG_LEVEL"] = "1" -# Comment this line if using GPU -os.environ["CUDA_VISIBLE_DEVICES"] = "-1" - -if __name__ == "__main__": - circuit_type = "supremacy" - circuit_size = 16 - circuit = generate_circ( - num_qubits=circuit_size, - depth=1, - circuit_type=circuit_type, - reg_name="q", - connected_only=True, - seed=None, - ) - cutqc = CutQC( - name="%s_%d" % (circuit_type, circuit_size), - circuit=circuit, - cutter_constraints={ - "max_subcircuit_width": math.ceil(circuit.num_qubits / 4 * 3), - "max_subcircuit_cuts": 10, - "subcircuit_size_imbalance": 2, - "max_cuts": 10, - "num_subcircuits": [2, 3], - }, - verbose=True, - ) - cutqc.cut() - if not cutqc.has_solution: - raise Exception("The input circuit and constraints have no viable cuts") - - # add comment - cutqc.evaluate(eval_mode="sv", num_shots_fn=None) - cutqc.build(mem_limit=32, recursion_depth=1) - print("Cut: %d recursions." % (cutqc.num_recursions)) - print(cutqc.approximation_bins) - cutqc.clean_data() diff --git a/explaining_example.md b/explaining_example.md new file mode 100644 index 0000000..86a2e7c --- /dev/null +++ b/explaining_example.md @@ -0,0 +1,205 @@ +# Explaining the Reconstruction Example Script + +This document explains the basic environment setup for distributed reconstruction, along with how to run both `cut_and_eval_example.py` and `reconstruction_example.py`. + +I assume SLURM is used to execute Python scripts; however, running without SLURM is not much different, as certain environment variables will need to be manually set. More details on this are shown below. + +## 1 - Setup + +#### Required Environment Variables + +Prior to distributed reconstruction, the following [local environment variables](https://pytorch.org/tutorials//intermediate/dist_tuto.html?highlight=init_process_group#:~:text=MASTER_PORT%3A%20A%20free,or%20a%20worker.) must be set: + +- MASTER_PORT: A free port on the machine that will host the process with rank 0 +- MASTER_ADDR: IP address of the machine that will host the process with rank 0 +- WORLD_SIZE: The total number of processes, so that the master knows how many workers to wait for +- RANK: Rank of each process, so they will know whether it is the master or a worker + + +#### SLURM + +In the given example, [SLURM](https://slurm.schedmd.com/), a compute cluster scheduler, is used to automate the process of finding a free port for the master and setting the respective environment variables. + +When executed, the `reconstruction_example.slurm` SLURM script implicitly sets rank and GPUs per node environment variables as: + +- SLURM_GPUS_ON_NODE +- SLURM_PROCID + +> (note this is on each respective machine) + +Moreover, the address and port information is explicitly set in 'reconstruct.slurm' with: + + ```console + # Each node is set the following ENV VARS + export MASTER_PORT=$(get_free_port) # Get a free Port + export WORLD_SIZE=$(($SLURM_NNODES * $SLURM_NTASKS_PER_NODE)) + master_addr=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1) + export MASTER_ADDR=$master_addr + ``` + +### Without Slurm + +The example can be run without SLURM by manually setting the environment variables and executing an instance of the example on each machine. Note that this is true only for distributed reconstruction, i.e., cut and evaluation should be done on a single, separate, isolated process. + +## 2 - Running 'cut_and_eval_example.py' + +Before reconstruction can be performed, any given circuit must first be cut and evaluated, and the results saved to a `.pkl` file. This can be done using `cutqc.save_cutqc_obj(filename)` as seen in the `cut_and_eval_example.py` file. + +Run `cut_and_eval.py` using the corresponding SLURM script: + +``` + sbatch cut_and_eval.slurm +``` + +Running the SLURM script should produce an output file containing (or printed to console, if running without SLURM): + +``` + --- Cut --- + Set parameter GURO_PAR_SPECIAL + Set parameter TokenServer to value "license.rc.princeton.edu" + --- Evaluate --- + --- Dumping CutQC Object into adder_example.pkl --- + Completed +``` + +## 3 - Running 'reconstruction_example.py' + +Once the example adder circuit is cut and the CutQC instance is saved as a pickle file, distributed reconstruction can be performed. + +### Breakdown of example + +```{.Python .numberLines .lineAnchors} +import os +from cutqc.main import CutQC + +# Environment variables +GPUS_PER_NODE = int(os.environ["SLURM_GPUS_ON_NODE"]) +WORLD_RANK = int(os.environ["SLURM_PROCID"]) +WORLD_SIZE = int(os.environ["WORLD_SIZE"]) + +if __name__ == "__main__": + full_path = 'adder_example.pkl' + compute_backend = 'GPU' + comm_backend = 'nccl' + + # Load CutQC Instance from Pickle + print(f'--- Running {full_path} ---') + cutqc = CutQC( + pytorch_distributed = True, + reconstruct_only = True, + load_data = full_path, + compute_backend = compute_backend, + comm_backend = comm_backend, + gpus_per_node = GPUS_PER_NODE, + world_rank = WORLD_RANK, + world_size = WORLD_SIZE + ) + + # Initiate Reconstruct + compute_time = cutqc.build(mem_limit=32, recursion_depth=1) + approximation_error = cutqc.verify() + + print('--- Reconstruction Complete ---') + print("Total Reconstruction Time:\t{}".format(compute_time)) + print("Approximation Error:\t{}".format(approximation_error)) + cutqc.destroy_distributed() +``` + +#### Lines 5 - 7: + + ```python + GPUS_PER_NODE = int(os.environ["SLURM_GPUS_ON_NODE"]) + WORLD_RANK = int(os.environ["SLURM_PROCID"]) + WORLD_SIZE = int(os.environ["WORLD_SIZE"]) + ``` + +As mentioned in the first section, distributed reconstruction requires process information. On initialization of a CutQC object, only the World rank, world size, and GPUs per machine are required to be passed. + +#### Lines 10 - 12: + + ```python + full_path = 'adder_example.pkl' + compute_backend = 'GPU' + comm_backend = 'nccl' + ``` + +`full_path` should be the full path to the pickled CutQC object used to cut and evaluate the original target circuit. + +`compute_backend` is the device backend used. In this case, GPU is used but CPU is also possible. In cases of memory-intensive subcircuit reconstruction problem instances, CPU may be necessary. + +`comm_backend` is the [communication backend](https://pytorch.org/docs/stable/distributed.html), which facilitates the communication of data between nodes during computation/execution. In this case, the communication backend used is [NVIDIA's NCCL](https://developer.nvidia.com/nccl). + +#### Lines 16 - 25: + + ```python + cutqc = CutQC( + pytorch_distributed = True, + reconstruct_only = True, + load_data = full_path, + compute_backend = compute_backend, + comm_backend = comm_backend, + gpus_per_node = GPUS_PER_NODE, + world_rank = WORLD_RANK, + world_size = WORLD_SIZE + ) + ``` + +A new CutQC object is created for reconstruction. The previous CutQC instance, used to cut and evaluate, is loaded internally so that the master node can send the partitioned workloads to each worker. + +Reconstruct only must be passed as `True` too ensure CutQC does not attempt to cut and instead initializes for reconstruction. The `pytorch_distributed` parameter indicates to CutQC which computaitonal framework too use (Tensorflow, or Pytorch);for multinode distributed reconstruction, it must be passed as True. + +#### Lines 28 - 29: + + ```python + compute_time = cutqc.build(mem_limit=32, recursion_depth=1) + approximation_error = cutqc.verify() + ``` + +Once the CutQC object is instantiated, the reconstruction process can be initiated by calling build. + +In addition to the explicitly passed memory limit, there is an implicit memory limit imposed by the compute device itself. In some cases, the partitioned workload may exceed the capacity of each respective GPU; if this occurs, then the distributed graph contractor will fail with the message 'Error 2006: Batch of size $M$, too large for GPU device of size N', where M is the size of batch and $N$ is memory capacity of a GPU. + +A simple solution is to increase the number of GPU nodes used; Alternatively, a compute device type, like CPU, with more memory can be used instead. + +Finally, the verify method is called in the example as a sanity check which computes the subcircuit reconstruction using TensorFlow on a single node. This can be removed, as it has no practical effect on reconstruction. + +#### Lines 34: + + ```python + cutqc.destroy_distributed() + ``` + +When reconstruction is complete, resources can be freed by calling destroy_distributed. + +### Executing and Output + +Once the circuit is cut and the results are computed, you can run parallel reconstruction by calling the SLURM script: + +``` + sbatch dist_driver.slurm +``` + +Running the SLURM script should produce an output file containing: + +``` + MASTER_ADDR=adroit-h11g3 + MASTER_PORT=31179 + WORLD_SIZE=2 + --- Running adder_example.pkl --- + self.parallel_reconstruction: True + Worker 1, compute_device: cuda:1 + --- Running adder_example.pkl --- + self.parallel_reconstruction: True + Worker 0, compute_device: cuda:0 + subcircuit_entry_length: [32, 32] + LEN(DATASET): 16 + NUMBER BATCHES: 1 + Compute Time: 1.5637431228533387 + Approximate Error: 1.2621774483536279e-29 + verify took 0.011 + --- Reconstruction Complete --- + Total Reconstruction Time: 1.5637431228533387 + Approximation Error: 1.2621774483536279e-29 + DESTROYING NOW! 1.5637431228533387 + WORKER 1 DYING +``` \ No newline at end of file diff --git a/qcg/QAOA/hw_efficient_ansatz.py b/qcg/QAOA/hw_efficient_ansatz.py index 2136f09..9a553d9 100644 --- a/qcg/QAOA/hw_efficient_ansatz.py +++ b/qcg/QAOA/hw_efficient_ansatz.py @@ -146,12 +146,12 @@ def gen_circuit(self): # print(len(theta)) p_idx = 0 for i in range(self.nq): - self.circ.u3(theta[i + p_idx], 0, 0, self.qr[i]) + self.circ.u(theta[i + p_idx], 0, 0, self.qr[i]) p_idx += self.nq # layer 2 for i in range(self.nq): - self.circ.u3(0, 0, theta[i + p_idx], self.qr[i]) + self.circ.u(0, 0, theta[i + p_idx], self.qr[i]) p_idx += self.nq if self.barriers: @@ -171,12 +171,12 @@ def gen_circuit(self): # PARAMETERIZER # layer 1 for i in range(self.nq): - self.circ.u3(theta[i + p_idx], 0, 0, self.qr[i]) + self.circ.u(theta[i + p_idx], 0, 0, self.qr[i]) p_idx += self.nq # layer 2 for i in range(self.nq): - self.circ.u3(0, 0, theta[i + p_idx], self.qr[i]) + self.circ.u(0, 0, theta[i + p_idx], self.qr[i]) p_idx += self.nq # place measurements on the end of the circuit diff --git a/README.md b/readme.md similarity index 87% rename from README.md rename to readme.md index d78ca95..0930bb7 100644 --- a/README.md +++ b/readme.md @@ -31,14 +31,10 @@ Follow the [instructions](https://support.gurobi.com/hc/en-us/articles/147996775 pip install -r requirements.txt ``` -## Example Code -For an example, run: -``` -python example.py -``` -This runs an example 16-qubit supremacy circuit. -The output qubits are in a scrambled order based on the subcircuit post-processing sequence. -A function that converts an arbitrary state of interest to the original order will be added. +> Note on Qiskit Version: If you get warnings about conditionals '==' and 'is', switching to qiskit==0.45.2 may fix the issue. + +## Example Reconstruction +See `explaning_example.md` for running the example scripts. ## Citing CutQC If you use CutQC in your work, we would appreciate it if you cite our paper: diff --git a/reconstruction_example.py b/reconstruction_example.py new file mode 100644 index 0000000..3ce4304 --- /dev/null +++ b/reconstruction_example.py @@ -0,0 +1,39 @@ +''' +Title: dist_driver.py +Description: Example of how CutQC can be used to efficiently reconstruct subcircuits +''' + +import os +from cutqc.main import CutQC + +# Environment variables set by slurm script +GPUS_PER_NODE = int(os.environ["SLURM_GPUS_ON_NODE"]) +WORLD_RANK = int(os.environ["SLURM_PROCID"]) +WORLD_SIZE = int(os.environ["WORLD_SIZE"]) + +if __name__ == "__main__": + full_path = 'adder_example.pkl' + compute_backend = 'GPU' + comm_backend = 'nccl' + + # Load CutQC Instance from Pickle + print(f'--- Running {full_path} ---') + cutqc = CutQC ( + pytorch_distributed = True, + reconstruct_only = True, + load_data = full_path, + compute_backend = compute_backend, + comm_backend = comm_backend, + gpus_per_node = GPUS_PER_NODE, + world_rank = WORLD_RANK, + world_size = WORLD_SIZE + ) + + # Initiate Reconstruct + compute_time = cutqc.build(mem_limit=32, recursion_depth=1) + approximation_error = cutqc.verify() + + print('--- Reconstruction Complete ---') + print ("Total Reconstruction Time:\t{}".format(compute_time)) + print ("Approximate Error:\t {}".format (approximation_error)) + cutqc.destroy_distributed() \ No newline at end of file diff --git a/reconstruction_example.slurm b/reconstruction_example.slurm new file mode 100644 index 0000000..a41388a --- /dev/null +++ b/reconstruction_example.slurm @@ -0,0 +1,31 @@ +#!/bin/bash +#SBATCH --output=reconstruction_example.out +#SBATCH --nodes=1 # node count +#SBATCH --ntasks-per-node=2 # total number of tasks across all nodes +#SBATCH --cpus-per-task=12 # cpu-cores per task (>1 if multi-threaded tasks) +#SBATCH --mem=40G # memory per cpu-core (4G is default) +#SBATCH --gres=gpu:4 +#SBATCH --time=00:01:55 # total run time limit (HH:MM:SS) +#SBATCH --mail-type=begin # send email when job begins +#SBATCH --mail-type=end # send email when job ends +#SBATCH --mail-type=fail # send mail if job fails + +# Setup for Multi-node Workload +export MASTER_PORT=$(get_free_port) # Get a free Port +export WORLD_SIZE=$(($SLURM_NNODES * $SLURM_NTASKS_PER_NODE)) +master_addr=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1) # Master node is first host in list of hostnames +export MASTER_ADDR=$master_addr + +# Sanity Print +echo "MASTER_ADDR="$MASTER_ADDR +echo "MASTER_PORT="$MASTER_PORT +echo "WORLD_SIZE="$WORLD_SIZE + +# Load Modules +module purge +module load anaconda3/2024.2 +conda activate CutQCSummer2025 +module load gurobi/12.0.0 + +srun python reconstruction_example.py +