Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added best_qubit_model_weights.pt
Binary file not shown.
522 changes: 522 additions & 0 deletions experiment_log.txt

Large diffs are not rendered by default.

45 changes: 45 additions & 0 deletions src/nn/best_qubit_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import torch
import torch.nn as nn
import torch.nn.functional as F

class BestQubitModel(nn.Module):
def __init__(self, n_size=4, hidden_layers=3, hidden_size=128, dropout_rate=0.5):
super(BestQubitModel, self).__init__()
input_size = 2 * n_size * n_size # 32 features
output_size = n_size * n_size # 16 outputs

self.n_size = n_size
self.hidden_layers = hidden_layers
self.dropout_rate = dropout_rate

# Input layer
self.fc1 = nn.Linear(input_size, hidden_size)

# Hidden layers
self.hidden_layers_list = nn.ModuleList()
for _ in range(hidden_layers - 1):
self.hidden_layers_list.append(nn.Linear(hidden_size, hidden_size))

# Output layer
self.fc_out = nn.Linear(hidden_size, output_size)

self.dropout = nn.Dropout(p=dropout_rate) # Dropout layer with adjustable dropout rat

def forward(self, x):
# Take only the first two channels: shape becomes [batch, 2, n_size, n_size]
x = x[:, :2, :, :]
# Flatten to [batch, 2*n_size*n_size]
x = x.view(x.size(0), -1)

x = F.relu(self.fc1(x))
x = self.dropout(x) # Apply dropout

for layer in self.hidden_layers_list:
x = F.relu(layer(x))
x = self.dropout(x) # Apply dropout

x = self.fc_out(x)

# Reshape output to match target shape: [batch, 1, n_size, n_size]
x = x.view(-1, 1, self.n_size, self.n_size)
return x
2 changes: 1 addition & 1 deletion src/nn/brute_force_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def apply(gate_name: str, gate_data: tuple) -> None:

def generate_dataset_ct(nr_samples: int, qubits: List[int],
gates: List[int],
topo_factory: Callable[[int], Topology] = None, labels_as_described:bool = False, preprocessing_type: PreprocessingType = PreprocessingType.ORIGINAL) -> Tuple[torch.Tensor, torch.Tensor]:
topo_factory: Callable[[int], Topology] = None, labels_as_described:bool = True, preprocessing_type: PreprocessingType = PreprocessingType.ORIGINAL) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Generate a dataset defined by labels_as_described and preprocessing_type.

Expand Down
204 changes: 204 additions & 0 deletions src/nn/nn_train_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F # Import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from src.nn.best_qubit_model import BestQubitModel

def load_data(train_path, val_path):
"""
Loads the training and validation data.

Args:
train_path (str): Path to the training data file.
val_path (str): Path to the validation data file.

Returns:
(Tensor, Tensor, Tensor, Tensor): X_train, y_train, X_val, y_val
"""
train_data = torch.load(train_path)
val_data = torch.load(val_path)
X_train, y_train = train_data
X_val, y_val = val_data
return X_train, y_train, X_val, y_val

def create_dataloaders(X_train, y_train, batch_size=32):
"""
Creates the training DataLoader.

Args:
X_train (Tensor): Training inputs
y_train (Tensor): Training targets
batch_size (int): Batch size for DataLoader

Returns:
DataLoader: A DataLoader for training data
"""
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
return train_loader

def save_loss_plot(train_losses, val_losses, filename="training_loss.png"):
"""
Saves the training and validation loss plot as a PNG file.

Args:
train_losses (list of float): List containing the training loss value per epoch.
val_losses (list of float): List containing the validation loss value per epoch.
filename (str): Filename for the saved plot (default: training_loss.png).
"""
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.plot(train_losses, label="Training Loss")
plt.plot(val_losses, label="Validation Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training and Validation Loss Progression")
plt.legend()
plt.grid(True)
plt.savefig(filename)
plt.close()

def log_experiment_details(filename, model, optimizer, best_train_loss, best_val_loss, n_epochs, patience):
"""
Logs the experiment details to a text file.

Args:
filename (str): Path to the log file.
model (nn.Module): The model being trained.
optimizer (torch.optim.Optimizer): The optimizer used for training.
best_train_loss (float): The best training loss achieved.
best_val_loss (float): The best validation loss achieved.
n_epochs (int): Number of epochs the model was trained for.
patience (int): Patience for early stopping.
"""
with open(filename, 'a') as f:
f.write(f"Model: {model}\n")
f.write(f"Number of hidden layers: {model.hidden_layers}\n")
f.write(f"Optimizer: {optimizer}\n")
f.write(f"Number of epochs: {n_epochs}\n")
f.write(f"Patience: {patience}\n")
f.write(f"Best training loss: {best_train_loss:.4f}\n")
f.write(f"Best validation loss: {best_val_loss:.4f}\n")
f.write("\n" + "="*80 + "\n\n")

def custom_loss(output, target):
mse_loss = nn.MSELoss()(output, target)
penalty = torch.sum(F.relu(-output)) # Penalize negative values
return mse_loss + penalty


def train_model(model, train_loader, criterion, optimizer, X_train, y_train, X_val, y_val, n_epochs=30000, verbose=True, patience=1000, log_file="experiment_log.txt"):
"""
Main training loop for the model.

Args:
model (nn.Module): Neural network model.
train_loader (DataLoader): DataLoader for training data.
criterion (nn.Module): Loss function.
optimizer (torch.optim.Optimizer): Optimizer for training.
X_train (Tensor): Training inputs for occasional sample prediction.
y_train (Tensor): Training targets for occasional sample comparison.
X_val (Tensor): Validation inputs.
y_val (Tensor): Validation targets.
n_epochs (int): Number of epochs to train.
verbose (bool): If True, prints updates to terminal.
log_file (str): Path to the log file.

Returns:
None.
"""
train_losses = []
val_losses = []
best_train_loss = float('inf')
best_val_loss = float('inf')
epochs_no_improve = 0 # Counter for early stopping

for epoch in range(n_epochs):
model.train()
total_loss = 0

for batch_X, batch_y in train_loader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()

# Compute average training loss for this epoch
avg_loss = total_loss / len(train_loader)
train_losses.append(avg_loss)

# Evaluate on validation set
model.eval()
with torch.no_grad():
val_outputs = model(X_val)
val_loss = criterion(val_outputs, y_val).item()
val_losses.append(val_loss)

if verbose and epoch % 10 == 0:
current_lr = optimizer.param_groups[0]['lr']
print(f'Epoch {epoch}, Training Loss: {avg_loss:.4f}, '
f'Validation Loss: {val_loss:.4f}, LR: {current_lr:.6f}')
with torch.no_grad():
for i in range(2): # Print predictions for the first i examples
test_input = X_train[i:i+1]
pred = model(test_input)
print(f"Example {i+1} - Predicted values:")
print(pred[0, 0])
print(f"Example {i+1} - Actual values:")
print(y_train[i, 0])

# Save only the best model so far and check early stopping
if val_loss < best_val_loss:
best_val_loss = val_loss
best_train_loss = avg_loss
torch.save(model.state_dict(), "best_qubit_model_weights.pt")
epochs_no_improve = 0

else:
epochs_no_improve += 1

if epochs_no_improve >= patience:
print(f"Early stopping triggered after {epoch} epochs.")
break

save_loss_plot(train_losses, val_losses)
model.load_state_dict(torch.load("best_qubit_model_weights.pt"))

# Log experiment details when a new best validation loss is achieved
log_experiment_details(log_file, model, optimizer, best_train_loss, best_val_loss, epoch, patience)

def main():
# File paths
train_path = 'train_data_True_from_project_description.pt'
val_path = 'val_data_True_from_project_description.pt'

# Load data
X_train, y_train, X_val, y_val = load_data(train_path, val_path)

# Create model, criterion, optimizer
model = BestQubitModel(n_size=4, hidden_layers=3, hidden_size=128, dropout_rate=0.5)
criterion = custom_loss # Use the custom loss function
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-4)

# Create data loader
train_loader = create_dataloaders(X_train, y_train, batch_size=32)

# Train model with validation
train_model(
model=model,
train_loader=train_loader,
criterion=criterion,
optimizer=optimizer,
X_train=X_train,
y_train=y_train,
X_val=X_val,
y_val=y_val,
n_epochs=20000,
verbose=True
)

if __name__ == "__main__":
main()
88 changes: 86 additions & 2 deletions src/nn_eval_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import warnings
from typing import List

import torch
import numpy as np
import pandas as pd
from pauliopt.circuits import Circuit
Expand All @@ -12,6 +13,10 @@
from src.nn.brute_force_data import get_best_cnots
from src.utils import random_hscx_circuit, tableau_from_circuit

from src.nn.best_qubit_model import BestQubitModel
from src.nn.preprocess_data import PREPROCESSING_SCRIPTS, PreprocessingType


# Suppress all overflow warnings globally
np.seterr(over='ignore')

Expand Down Expand Up @@ -88,6 +93,61 @@ def pick_pivot_callback(G, remaining: "CliffordTableau", remaining_rows: List[in
return {"n_rep": n_rep} | collect_circuit_data(circ_out) | {"method": "optimum"}


def nn_compilation(circuit: Circuit, topology: Topology, n_rep: int):
"""
Compilation using the trained neural network to infer the best pivot qubit.
"""
model = BestQubitModel()
model.load_state_dict(torch.load("best_qubit_model_weights.pt"))
model.eval()

# Prepare the Clifford tableau from the circuit
clifford_tableau = CliffordTableau(circuit.n_qubits)
clifford_tableau = tableau_from_circuit(clifford_tableau, circuit)

# Ensure matrices are numpy arrays with the expected shape (n_qubits x n_qubits)
n_qubits = circuit.n_qubits

# Reshape x_mat and z_mat to (n, n)
x_mat = np.array(clifford_tableau.x_matrix).reshape(n_qubits, n_qubits)
z_mat = np.array(clifford_tableau.z_matrix).reshape(n_qubits, n_qubits)

# Create an input tensor of shape [1, 3, n, n]
input_tensor = torch.zeros(1, 3, n_qubits, n_qubits, dtype=torch.float32)
input_tensor[0, 0] = torch.tensor(x_mat, dtype=torch.float32)
input_tensor[0, 1] = torch.tensor(z_mat, dtype=torch.float32)
# The third channel remains zero (or filled as needed)

with torch.no_grad():
output = model(input_tensor)
output = torch.round(output).int().numpy()

# Ensure the output matrix has the expected shape (n_qubits x n_qubits)
output = output.reshape(n_qubits, n_qubits)
print(output)

# Use a large integer value to represent infinity
int_inf = np.iinfo(np.int32).max

# Collect row and column combinations based on the lowest values
combinations = []
while not np.all(output == int_inf):
min_index = np.unravel_index(np.argmin(output, axis=None), output.shape) #note picks the first occurence in ties
print(min_index)
combinations.append(min_index)
output[min_index[0], :] = int_inf
output[:, min_index[1]] = int_inf

combination_iterator = iter(combinations)

def pick_pivot_callback(G, remaining: "CliffordTableau", remaining_rows: List[int], choice_fn=min):
row, col = next(combination_iterator)
return row, col


circ_out = synthesize_tableau_perm_row_col(clifford_tableau, topology, pick_pivot_callback=pick_pivot_callback)
return {"n_rep": n_rep} | collect_circuit_data(circ_out) | {"method": "nn"}


def main(n_qubits: int = 4, nr_gates: int = 1000):
"""
Expand All @@ -100,23 +160,47 @@ def main(n_qubits: int = 4, nr_gates: int = 1000):
df = pd.DataFrame(columns=["n_rep", "num_qubits", "method", "h", "s", "cx", "depth"])
topo = Topology.complete(n_qubits)
for i in range(20):
print(i)
circuit = random_hscx_circuit(nr_qubits=n_qubits, nr_gates=nr_gates)

# Our compilation e.g. the baseline from the paper
df_dictionary = pd.DataFrame([our_compilation(circuit.copy(), topo, i)])
df = pd.concat([df, df_dictionary], ignore_index=True)
print("Min", df_dictionary["cx"])

# Optimal compilation
df_dictionary = pd.DataFrame([optimal_compilation(circuit.copy(), topo, i)])
df = pd.concat([df, df_dictionary], ignore_index=True)
print("OPTIMUM", df_dictionary["cx"])

# Random compilation
df_dictionary = pd.DataFrame([random_compilation(circuit.copy(), topo, i)])
df = pd.concat([df, df_dictionary], ignore_index=True)
print("Random", df_dictionary["cx"])

# Group's first ANN compilation
df_dictionary = pd.DataFrame([nn_compilation(circuit.copy(), topo, i)])
df = pd.concat([df, df_dictionary], ignore_index=True)
print("NN", df_dictionary["cx"])

# Convert the cx column to a numerical type
df["cx"] = pd.to_numeric(df["cx"])

df.to_csv("test_clifford_synthesis.csv", index=False)
print(df.groupby("method").mean())

# Is the difference just luck?
from scipy.stats import ttest_ind

nn_cx_values = df[df["method"] == "nn"]["cx"]
random_cx_values = df[df["method"] == "random"]["cx"]
t_stat, p_value = ttest_ind(nn_cx_values, random_cx_values)

print(f"T-test results: t-statistic = {t_stat}, p-value = {p_value}")
if p_value < 0.05:
print("The difference in cx values between nn and random is statistically significant (p < 0.05).")
else:
print("The difference in cx values between nn and random is not statistically significant (p >= 0.05).")


if __name__ == "__main__":
main()
main()
Loading