From a73493bd33c5606043e9a14294c83b0aa891b36c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 19 Mar 2026 13:05:12 +0000 Subject: [PATCH 1/7] Initial plan From 2ed9c047f971ea8815fe4cdffde21d0b51354942 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 19 Mar 2026 13:20:11 +0000 Subject: [PATCH 2/7] Implement BucketExactCover HUBO problem class with IO support and tests Co-authored-by: havahol <5363644+havahol@users.noreply.github.com> --- qaoa/problems/__init__.py | 1 + qaoa/problems/bucketexactcover_problem.py | 400 ++++++++++++++++++++++ qaoa/utils/qaoaIO.py | 38 +- unittests/test_io.py | 123 +++++++ unittests/test_problems.py | 117 +++++++ 5 files changed, 672 insertions(+), 7 deletions(-) create mode 100644 qaoa/problems/bucketexactcover_problem.py create mode 100644 unittests/test_io.py diff --git a/qaoa/problems/__init__.py b/qaoa/problems/__init__.py index 8c5680b..328d365 100644 --- a/qaoa/problems/__init__.py +++ b/qaoa/problems/__init__.py @@ -2,6 +2,7 @@ from .qubo_problem import QUBO from .graph_problem import GraphProblem from .exactcover_problem import ExactCover +from .bucketexactcover_problem import BucketExactCover from .portfolio_problem import PortfolioOptimization from .maxkcut_binary_powertwo import MaxKCutBinaryPowerOfTwo from .maxkcut_binary_fullH import MaxKCutBinaryFullH diff --git a/qaoa/problems/bucketexactcover_problem.py b/qaoa/problems/bucketexactcover_problem.py new file mode 100644 index 0000000..2c33978 --- /dev/null +++ b/qaoa/problems/bucketexactcover_problem.py @@ -0,0 +1,400 @@ +import math +import itertools +from itertools import combinations + +import numpy as np +from qiskit import QuantumCircuit +from qiskit.circuit import Parameter + +from .base_problem import Problem + + +class BucketExactCover(Problem): + """ + Bucket Exact Cover (HUBO) problem. + + Subclass of the `Problem` class. Implements the binary-encoded exact cover + problem where routes are grouped by boat (bucket), enabling a HUBO + formulation that requires fewer qubits than the one-hot QUBO. + + Each bucket k corresponds to one boat and contains all routes for that boat. + A b_k-bit binary string selects one route (or a null route) from bucket k + via modular wrapping (Strategy B). + + Attributes: + columns (np.ndarray): Full column matrix (N_rows × N_routes). + num_buckets (int): Number of top rows used for bucket assignment. + weights (np.ndarray): (Possibly scaled) weights for valid columns. + penalty_factor (float): (Possibly scaled) penalty factor. + scale_problem (bool): Whether the problem has been scaled. + allow_infeasible (bool): Whether infeasible solutions are accepted. + N_qubits (int): Total number of qubits. + """ + + def __init__( + self, + columns, + num_buckets, + weights=None, + penalty_factor=None, + allow_infeasible=False, + scale_problem=False, + ) -> None: + super().__init__() + + self.columns = np.array(columns, dtype=float) + self.num_buckets = num_buckets + self.allow_infeasible = allow_infeasible + self.scale_problem = scale_problem + + N_rows, N_routes = self.columns.shape + n_orders = N_rows - num_buckets + + # --- Bucket structure ------------------------------------------------ + self._bucket_columns = [] + for k in range(num_buckets): + bucket_k = [r for r in range(N_routes) if self.columns[k, r] == 1] + self._bucket_columns.append(bucket_k) + + # Valid columns: those belonging to at least one bucket + valid_set = set() + for k in range(num_buckets): + valid_set.update(self._bucket_columns[k]) + self._valid_columns = sorted(valid_set) + self._valid_col_to_idx = {col: idx for idx, col in enumerate(self._valid_columns)} + + # Bucket sizes and qubit counts + self._bucket_sizes = [len(bc) for bc in self._bucket_columns] + self._bucket_qubits = [ + max(1, math.ceil(math.log2(n_k + 1))) for n_k in self._bucket_sizes + ] + self._bucket_qubit_offsets = [] + offset = 0 + for b_k in self._bucket_qubits: + self._bucket_qubit_offsets.append(offset) + offset += b_k + self.N_qubits = offset + + # --- Weights --------------------------------------------------------- + if weights is None: + orig_weights = np.zeros(len(self._valid_columns)) + else: + orig_weights = np.array(weights, dtype=float)[self._valid_columns] + self._original_weights = orig_weights.copy() + self.weights = orig_weights.copy() + + # --- Per-bucket weight statistics ------------------------------------ + bucket_mins = [] + bucket_maxs = [] + for k in range(num_buckets): + if self._bucket_sizes[k] > 0: + w_k = [ + self._original_weights[self._valid_col_to_idx[g]] + for g in self._bucket_columns[k] + ] + bucket_mins.append(min(w_k)) + bucket_maxs.append(max(w_k)) + else: + bucket_mins.append(0.0) + bucket_maxs.append(0.0) + + C_min = sum(bucket_mins) + C_max_shifted = sum(bmax - bmin for bmax, bmin in zip(bucket_maxs, bucket_mins)) + epsilon = max(1e-6, 1e-6 * C_max_shifted) + + # --- Penalty factor -------------------------------------------------- + if penalty_factor is None: + if np.all(self._original_weights == 0): + unscaled_penalty = 1.0 + else: + unscaled_penalty = C_max_shifted + epsilon + else: + unscaled_penalty = float(penalty_factor) + + self._original_penalty_factor = unscaled_penalty + self.penalty_factor = unscaled_penalty + + # --- Scaling --------------------------------------------------------- + if scale_problem and C_max_shifted > 0: + shift = C_min / num_buckets + omega = self._original_weights - shift + + lam = C_max_shifted + epsilon + denom = C_max_shifted * (1 + n_orders * (num_buckets - 1) ** 2) + s = 1.0 / denom + + self.weights = s * omega + self.penalty_factor = s * lam + + # ------------------------------------------------------------------------- + # Decoding + # ------------------------------------------------------------------------- + + def _decode(self, string): + """Decode a bitstring to a binary indicator vector over valid columns.""" + x = np.zeros(len(self._valid_columns)) + for k in range(self.num_buckets): + offset = self._bucket_qubit_offsets[k] + b_k = self._bucket_qubits[k] + n_k = self._bucket_sizes[k] + v = sum(int(string[offset + j]) * (1 << j) for j in range(b_k)) + idx = v % (n_k + 1) + if idx > 0: + global_col = self._bucket_columns[k][idx - 1] + valid_idx = self._valid_col_to_idx[global_col] + x[valid_idx] = 1.0 + return x + + # ------------------------------------------------------------------------- + # Cost functions + # ------------------------------------------------------------------------- + + def _compute_cost(self, x, w, pf): + n_orders = self.columns.shape[0] - self.num_buckets + penalty = 0.0 + if n_orders > 0: + Y_orders = self.columns[self.num_buckets :, :][:, self._valid_columns] + coverage = Y_orders @ x + penalty = np.sum((coverage - 1) ** 2) + return -(w @ x + pf * penalty) + + def cost(self, string): + """ + Calculates the (possibly scaled) cost of a given solution. + + Args: + string (str): Bitstring representing a candidate solution. + + Returns: + float: Negated cost (QAOA maximizes). + """ + x = self._decode(string) + return self._compute_cost(x, self.weights, self.penalty_factor) + + def unscaled_cost(self, string): + """ + Calculates the original unscaled cost of a given solution. + + Args: + string (str): Bitstring representing a candidate solution. + + Returns: + float: Negated unscaled cost. + """ + x = self._decode(string) + return self._compute_cost(x, self._original_weights, self._original_penalty_factor) + + def isFeasible(self, string): + """ + Checks if a bitstring represents a feasible solution. + + Args: + string (str): Bitstring representing a candidate solution. + + Returns: + bool: True if feasible. + """ + if self.allow_infeasible: + return True + n_orders = self.columns.shape[0] - self.num_buckets + if n_orders == 0: + return True + x = self._decode(string) + Y_orders = self.columns[self.num_buckets :, :][:, self._valid_columns] + coverage = Y_orders @ x + return np.allclose(coverage, np.ones(n_orders), atol=1e-7) + + # ------------------------------------------------------------------------- + # HUBO polynomial helpers + # ------------------------------------------------------------------------- + + def _mobius_inversion(self, f_vals, k): + """ + Compute multilinear polynomial coefficients from function values via + Möbius inversion on the Boolean lattice. + + f_vals[v] = f(z) where v = sum_j 2^j * z_j. + + Returns: + dict mapping frozenset(local_qubit_indices) -> coefficient + """ + result = {} + for size in range(k + 1): + for T in combinations(range(k), size): + alpha = 0.0 + for u_size in range(size + 1): + for U in combinations(T, u_size): + v = sum(1 << j for j in U) + sign = (-1) ** (size - u_size) + alpha += sign * f_vals[v] + if abs(alpha) > 1e-15: + result[frozenset(T)] = alpha + return result + + # ------------------------------------------------------------------------- + # Circuit construction + # ------------------------------------------------------------------------- + + def create_circuit(self): + """ + Creates the QAOA cost (phase separator) circuit for the HUBO problem. + + Builds the HUBO polynomial via multilinear interpolation, substitutes + q_m = (1 - Z_m)/2, and implements multi-qubit Z-rotations using + CNOT-ladder + RZ + reverse-CNOT-ladder. + + Returns: + QuantumCircuit: Parameterized circuit with parameter "x_gamma". + """ + n_orders = self.columns.shape[0] - self.num_buckets + Y_orders = self.columns[self.num_buckets :, :] if n_orders > 0 else None + + # Accumulated HUBO polynomial: frozenset(global qubit indices) -> coeff + hubo_poly = {} + + def add_to_poly(T_key, coeff): + if abs(coeff) < 1e-15: + return + hubo_poly[T_key] = hubo_poly.get(T_key, 0.0) + coeff + + # Coverage polynomials per (order, bucket) + cov_polys = {} + + for k in range(self.num_buckets): + b_k = self._bucket_qubits[k] + n_k = self._bucket_sizes[k] + offset = self._bucket_qubit_offsets[k] + num_states = 1 << b_k + + # Cost contribution values per state + cost_vals = np.zeros(num_states) + for v in range(num_states): + idx = v % (n_k + 1) + if idx > 0: + global_col = self._bucket_columns[k][idx - 1] + valid_idx = self._valid_col_to_idx[global_col] + cost_vals[v] = self.weights[valid_idx] + + # Möbius inversion for cost + cost_alpha = self._mobius_inversion(cost_vals, b_k) + for T_local, coeff in cost_alpha.items(): + T_global = frozenset(offset + j for j in T_local) + add_to_poly(T_global, coeff) + + # Coverage values per (order, state) + if n_orders > 0: + for o in range(n_orders): + cov_vals = np.zeros(num_states) + for v in range(num_states): + idx = v % (n_k + 1) + if idx > 0: + global_col = self._bucket_columns[k][idx - 1] + cov_vals[v] = float(Y_orders[o, global_col]) + cov_alpha = self._mobius_inversion(cov_vals, b_k) + cov_polys[(o, k)] = {} + for T_local, coeff in cov_alpha.items(): + T_global = frozenset(offset + j for j in T_local) + cov_polys[(o, k)][T_global] = coeff + + # Penalty terms: penalty_factor * sum_o P_o + # P_o = 1 - sum_k g_{o,k} + 2 * sum_{k 0: + for o in range(n_orders): + # Constant term: +1 + add_to_poly(frozenset(), self.penalty_factor) + + # -sum_k g_{o,k} + for k in range(self.num_buckets): + for T, coeff in cov_polys.get((o, k), {}).items(): + add_to_poly(T, -self.penalty_factor * coeff) + + # +2 * sum_{k Pauli-Z coefficient + + for T, alpha in hubo_poly.items(): + if abs(alpha) < 1e-15: + continue + T_list = sorted(T) + d = len(T_list) + factor = alpha / (2 ** d) + for u_size in range(d + 1): + for U in combinations(T_list, u_size): + U_key = frozenset(U) + sign = (-1) ** u_size + beta[U_key] = beta.get(U_key, 0.0) + sign * factor + + # Build Qiskit circuit + gamma = Parameter("x_gamma") + circuit = QuantumCircuit(self.N_qubits) + + for T_key, b_T in beta.items(): + if abs(b_T) < 1e-10: + continue + T_list = sorted(T_key) + d = len(T_list) + if d == 0: + continue # constant term — no gate needed + elif d == 1: + circuit.rz(2 * gamma * b_T, T_list[0]) + else: + # CNOT ladder + for i in range(d - 1): + circuit.cx(T_list[i], T_list[i + 1]) + circuit.rz(2 * gamma * b_T, T_list[d - 1]) + # Reverse CNOT ladder + for i in range(d - 2, -1, -1): + circuit.cx(T_list[i], T_list[i + 1]) + + self.circuit = circuit + return circuit + + # ------------------------------------------------------------------------- + # Brute force + # ------------------------------------------------------------------------- + + def brute_force_solve(self, return_num_feasible=False): + """ + Find the optimal solution by exhaustive enumeration over all bucket + value combinations. + + Args: + return_num_feasible (bool): If True, also return the count of + feasible solutions encountered. + + Returns: + str or (str, int): Optimal bitstring, and optionally the number of + feasible solutions. + """ + bucket_ranges = [range(1 << self._bucket_qubits[k]) for k in range(self.num_buckets)] + + best_val = -np.inf + best_sol = None + num_feasible = 0 + + for combo in itertools.product(*bucket_ranges): + parts = [] + for k, v in enumerate(combo): + b_k = self._bucket_qubits[k] + bits = "".join(str((v >> j) & 1) for j in range(b_k)) + parts.append(bits) + bitstring = "".join(parts) + + val = self.cost(bitstring) + if val > best_val: + best_val = val + best_sol = bitstring + if self.isFeasible(bitstring): + num_feasible += 1 + + if return_num_feasible: + return best_sol, num_feasible + return best_sol diff --git a/qaoa/utils/qaoaIO.py b/qaoa/utils/qaoaIO.py index dc0d366..90cb297 100644 --- a/qaoa/utils/qaoaIO.py +++ b/qaoa/utils/qaoaIO.py @@ -86,6 +86,15 @@ class ExactCoverProblemData(ProblemData): problem_type: str = "ExactCover" +@dataclass +class BucketExactCoverProblemData(ProblemData): + columns: np.ndarray = None + weights: np.ndarray = None + solution: np.ndarray = None + num_buckets: int = None + problem_type: str = "BucketExactCover" + + @dataclass class PortfolioOptimizationProblemData(ProblemData): risk: float = 0.0 @@ -98,6 +107,7 @@ class PortfolioOptimizationProblemData(ProblemData): # Register available problem types problem_registry: Dict[str, Type[ProblemData]] = { "ExactCover": ExactCoverProblemData, + "BucketExactCover": BucketExactCoverProblemData, "PortfolioOptimization": PortfolioOptimizationProblemData, } @@ -227,12 +237,20 @@ def from_qaoa(cls, qaoa: QAOA, opt_time = qaoa.optimization_results[k].opt_time ) - problem_data = ExactCoverProblemData( - columns = qaoa.problem.columns, - weights = qaoa.problem.weights, - solution = solution, - hamming_weight = qaoa.problem.hamming_weight - ) + if isinstance(qaoa.problem, problems.BucketExactCover): + problem_data = BucketExactCoverProblemData( + columns=qaoa.problem.columns, + weights=qaoa.problem._original_weights, + solution=solution, + num_buckets=qaoa.problem.num_buckets, + ) + else: + problem_data = ExactCoverProblemData( + columns=qaoa.problem.columns, + weights=qaoa.problem.weights, + solution=solution, + hamming_weight=qaoa.problem.hamming_weight, + ) init_method = InitMethod(str(qaoa.initialstate).split(" ")[0].split(".")[-1].upper()) mixer_str = str(qaoa.mixer).split(" ")[0].split(".")[-1].upper() @@ -264,7 +282,13 @@ def from_qaoa(cls, qaoa: QAOA, # def generate_qaoa_object(self) -> "QAOA" def get_problem_instance(self): - if isinstance(self.problem, ExactCoverProblemData): + if isinstance(self.problem, BucketExactCoverProblemData): + return problems.BucketExactCover( + columns=self.problem.columns, + weights=self.problem.weights, + num_buckets=self.problem.num_buckets, + ) + elif isinstance(self.problem, ExactCoverProblemData): return problems.ExactCover( columns = self.problem.columns, weights = self.problem.weights, diff --git a/unittests/test_io.py b/unittests/test_io.py new file mode 100644 index 0000000..cb0e21b --- /dev/null +++ b/unittests/test_io.py @@ -0,0 +1,123 @@ +""" +Round-trip IO tests for QAOAResult with BucketExactCover problems. +""" + +import os +import tempfile +import unittest + +import numpy as np + +from qaoa.utils.qaoaIO import ( + BucketExactCoverProblemData, + DepthResult, + InitMethod, + MixerMethod, + QAOAParameters, + QAOAResult, +) + + +class TestBucketExactCoverIO(unittest.TestCase): + """Round-trip IO tests for BucketExactCover.""" + + # Same small problem used in test_problems.py + _COLUMNS = np.array([ + [1, 1, 1, 0, 0, 0, 0], + [0, 0, 0, 1, 1, 1, 1], + [1, 0, 0, 1, 0, 1, 1], + [0, 1, 0, 0, 1, 0, 1], + [0, 1, 0, 1, 1, 0, 1], + [1, 0, 1, 0, 0, 1, 0], + ], dtype=float) + _WEIGHTS = np.array([20., 40., 30., 35., 10., 25., 40.]) + _NUM_BUCKETS = 2 + + def _make_result(self): + from qaoa.problems import BucketExactCover + + bec = BucketExactCover( + self._COLUMNS, + num_buckets=self._NUM_BUCKETS, + weights=self._WEIGHTS, + ) + + problem_data = BucketExactCoverProblemData( + columns=self._COLUMNS, + weights=self._WEIGHTS, + num_buckets=self._NUM_BUCKETS, + ) + + qaoa_params = QAOAParameters( + cvar=1.0, + init_method=InitMethod.PLUS, + mixer_method=MixerMethod.X, + backend="statevector_simulator", + optimizer="COBYLA", + N_qubits=bec.N_qubits, + depths={}, + ) + + return QAOAResult(problem=problem_data, qaoa_params=qaoa_params) + + def _temp_json(self): + """Return a path to a temporary JSON file and register cleanup.""" + f = tempfile.NamedTemporaryFile(suffix=".json", delete=False) + f.close() + self.addCleanup(os.remove, f.name) + return f.name + + def test_save_load_problem_type(self): + result = self._make_result() + fname = self._temp_json() + result.save(fname) + loaded = QAOAResult.load(fname) + self.assertEqual(loaded.problem.problem_type, "BucketExactCover") + + def test_save_load_columns(self): + result = self._make_result() + fname = self._temp_json() + result.save(fname) + loaded = QAOAResult.load(fname) + np.testing.assert_array_equal(loaded.problem.columns, self._COLUMNS) + + def test_save_load_weights(self): + result = self._make_result() + fname = self._temp_json() + result.save(fname) + loaded = QAOAResult.load(fname) + np.testing.assert_allclose(loaded.problem.weights, self._WEIGHTS) + + def test_save_load_num_buckets(self): + result = self._make_result() + fname = self._temp_json() + result.save(fname) + loaded = QAOAResult.load(fname) + self.assertEqual(loaded.problem.num_buckets, self._NUM_BUCKETS) + + def test_get_problem_instance_cost_matches(self): + """Reconstruct via get_problem_instance() and verify cost() matches.""" + from qaoa.problems import BucketExactCover + + result = self._make_result() + fname = self._temp_json() + result.save(fname) + loaded = QAOAResult.load(fname) + + original_bec = BucketExactCover( + self._COLUMNS, + num_buckets=self._NUM_BUCKETS, + weights=self._WEIGHTS, + ) + reconstructed_bec = loaded.get_problem_instance() + + # Test cost of [c0, c4] bitstring + test_bitstring = "10010" + self.assertAlmostEqual( + original_bec.cost(test_bitstring), + reconstructed_bec.cost(test_bitstring), + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/unittests/test_problems.py b/unittests/test_problems.py index 38729fb..ca1b32c 100644 --- a/unittests/test_problems.py +++ b/unittests/test_problems.py @@ -6,6 +6,7 @@ - QUBO: cost, create_circuit, lower-triangular - PortfolioOptimization: cost, isFeasible, penalty behavior - MaxKCutOneHot: binstringToLabels, cost, create_circuit +- BucketExactCover: cost, isFeasible, circuit, brute force, scaling, IO """ import math @@ -268,5 +269,121 @@ def test_maxkcut_one_hot_3cuts(self): self.assertIsNotNone(problem.circuit) +class TestBucketExactCover(unittest.TestCase): + """Tests for the BucketExactCover problem (HUBO formulation).""" + + # columns (6 rows × 7 cols): + # c0 c1 c2 c3 c4 c5 c6 + # boat1 [ 1 1 1 0 0 0 0 ] + # boat2 [ 0 0 0 1 1 1 1 ] + # order1 [ 1 0 0 1 0 1 1 ] + # order2 [ 0 1 0 0 1 0 1 ] + # order3 [ 0 1 0 1 1 0 1 ] + # order4 [ 1 0 1 0 0 1 0 ] + # + # weights = [20, 40, 30, 35, 10, 25, 40], num_buckets = 2 + # bucket 0 (boat1): [c0, c1, c2], n_k=3, b_k=2 → qubits 0,1 + # bucket 1 (boat2): [c3, c4, c5, c6], n_k=4, b_k=3 → qubits 2,3,4 + # N_qubits = 5 + # + # Feasible solutions (all 4 orders covered exactly once): + # [c0, c4]: weights 20+10=30 → bitstring "10010" (optimal, least cost) + # [c1, c5]: weights 40+25=65 → bitstring "01110" + # [c2, c6]: weights 30+40=70 → bitstring "11001" + + def _make_problem(self, **kwargs): + from qaoa.problems import BucketExactCover + columns = np.array([ + [1, 1, 1, 0, 0, 0, 0], + [0, 0, 0, 1, 1, 1, 1], + [1, 0, 0, 1, 0, 1, 1], + [0, 1, 0, 0, 1, 0, 1], + [0, 1, 0, 1, 1, 0, 1], + [1, 0, 1, 0, 0, 1, 0], + ], dtype=float) + weights = np.array([20., 40., 30., 35., 10., 25., 40.]) + return BucketExactCover(columns, num_buckets=2, weights=weights, **kwargs) + + # Bitstrings for known solutions + # bucket0: v=1→c0 → bits "10"; bucket1: v=2→c4 → bits "010" + _S_C0C4 = "10010" + # bucket0: v=2→c1 → bits "01"; bucket1: v=3→c5 → bits "110" + _S_C1C5 = "01110" + # bucket0: v=3→c2 → bits "11"; bucket1: v=4→c6 → bits "001" + _S_C2C6 = "11001" + # all zeros → null routes + _S_NULL = "00000" + + def test_n_qubits(self): + bec = self._make_problem() + self.assertEqual(bec.N_qubits, 5) + + def test_cost_feasible_solution(self): + bec = self._make_problem() + self.assertAlmostEqual(bec.unscaled_cost(self._S_C0C4), -30.0) + + def test_cost_suboptimal_feasible(self): + bec = self._make_problem() + self.assertAlmostEqual(bec.unscaled_cost(self._S_C1C5), -65.0) + + def test_is_feasible_true(self): + bec = self._make_problem() + self.assertTrue(bec.isFeasible(self._S_C0C4)) + self.assertTrue(bec.isFeasible(self._S_C1C5)) + self.assertTrue(bec.isFeasible(self._S_C2C6)) + + def test_is_feasible_false(self): + bec = self._make_problem() + self.assertFalse(bec.isFeasible(self._S_NULL)) + + def test_brute_force_finds_cheapest_solution(self): + bec = self._make_problem() + opt_sol = bec.brute_force_solve() + self.assertAlmostEqual(bec.unscaled_cost(opt_sol), -30.0) + + def test_create_circuit_not_none(self): + bec = self._make_problem() + circ = bec.create_circuit() + self.assertIsNotNone(circ) + + def test_circuit_has_parameter(self): + bec = self._make_problem() + circ = bec.create_circuit() + self.assertGreater(len(circ.parameters), 0) + + def test_validate_circuit(self): + bec = self._make_problem() + bec.create_circuit() + ok, report = bec.validate_circuit() + self.assertTrue(ok, msg=str(report)) + + def test_unscaled_cost_equals_cost_when_no_scaling(self): + bec = self._make_problem(scale_problem=False) + for s in [self._S_C0C4, self._S_C1C5, self._S_C2C6, self._S_NULL]: + self.assertAlmostEqual(bec.cost(s), bec.unscaled_cost(s)) + + def test_penalty_auto_computed(self): + bec = self._make_problem() + self.assertGreater(bec.penalty_factor, 0) + + def test_no_bucket_columns_ignored(self): + """Columns with all zeros in the top num_buckets rows are ignored.""" + from qaoa.problems import BucketExactCover + columns = np.array([ + [1, 1, 1, 0, 0, 0, 0, 0], # boat1: c0,c1,c2 + [0, 0, 0, 1, 1, 1, 1, 0], # boat2: c3,c4,c5,c6 + [1, 0, 0, 1, 0, 1, 1, 0], # order1 + [0, 1, 0, 0, 1, 0, 1, 0], # order2 + [0, 1, 0, 1, 1, 0, 1, 0], # order3 + [1, 0, 1, 0, 0, 1, 0, 0], # order4 + ], dtype=float) + weights = np.array([20., 40., 30., 35., 10., 25., 40., 99.]) + # c7 has all zeros in top 2 rows → ignored + bec = BucketExactCover(columns, num_buckets=2, weights=weights) + # N_qubits should be the same as the 7-column problem (c7 ignored) + self.assertEqual(bec.N_qubits, 5) + self.assertNotIn(7, bec._valid_columns) + + if __name__ == "__main__": unittest.main() From da29366f765032294f28a5e3dc4e94e3272b934e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 19 Mar 2026 13:33:44 +0000 Subject: [PATCH 3/7] Fix binary encoding: use ceil(log2(n_k)) qubits per bucket per docs/main_hubo.tex Eq. 10 Co-authored-by: havahol <5363644+havahol@users.noreply.github.com> --- qaoa/problems/bucketexactcover_problem.py | 40 +++++++++++---------- unittests/test_io.py | 4 +-- unittests/test_problems.py | 43 +++++++++++++---------- 3 files changed, 47 insertions(+), 40 deletions(-) diff --git a/qaoa/problems/bucketexactcover_problem.py b/qaoa/problems/bucketexactcover_problem.py index 2c33978..3be794a 100644 --- a/qaoa/problems/bucketexactcover_problem.py +++ b/qaoa/problems/bucketexactcover_problem.py @@ -18,8 +18,9 @@ class BucketExactCover(Problem): formulation that requires fewer qubits than the one-hot QUBO. Each bucket k corresponds to one boat and contains all routes for that boat. - A b_k-bit binary string selects one route (or a null route) from bucket k - via modular wrapping (Strategy B). + A b_k-bit binary string selects exactly one route from bucket k via modular + wrapping (Strategy B): `idx = v % n_k` where `v` is the integer value of + the binary string. This guarantees one route is always selected per bucket. Attributes: columns (np.ndarray): Full column matrix (N_rows × N_routes). @@ -63,10 +64,11 @@ def __init__( self._valid_columns = sorted(valid_set) self._valid_col_to_idx = {col: idx for idx, col in enumerate(self._valid_columns)} - # Bucket sizes and qubit counts + # Bucket sizes and qubit counts: b_k = ceil(log2(n_k)) per Eq. 10 of + # docs/main_hubo.tex. Minimum 1 qubit (handles n_k == 1 where log2=0). self._bucket_sizes = [len(bc) for bc in self._bucket_columns] self._bucket_qubits = [ - max(1, math.ceil(math.log2(n_k + 1))) for n_k in self._bucket_sizes + max(1, math.ceil(math.log2(n_k))) for n_k in self._bucket_sizes ] self._bucket_qubit_offsets = [] offset = 0 @@ -131,18 +133,20 @@ def __init__( # ------------------------------------------------------------------------- def _decode(self, string): - """Decode a bitstring to a binary indicator vector over valid columns.""" + """Decode a bitstring to a binary indicator vector over valid columns. + + Strategy B modular wrapping: idx = v % n_k always selects one route. + """ x = np.zeros(len(self._valid_columns)) for k in range(self.num_buckets): offset = self._bucket_qubit_offsets[k] b_k = self._bucket_qubits[k] n_k = self._bucket_sizes[k] v = sum(int(string[offset + j]) * (1 << j) for j in range(b_k)) - idx = v % (n_k + 1) - if idx > 0: - global_col = self._bucket_columns[k][idx - 1] - valid_idx = self._valid_col_to_idx[global_col] - x[valid_idx] = 1.0 + idx = v % n_k + global_col = self._bucket_columns[k][idx] + valid_idx = self._valid_col_to_idx[global_col] + x[valid_idx] = 1.0 return x # ------------------------------------------------------------------------- @@ -269,11 +273,10 @@ def add_to_poly(T_key, coeff): # Cost contribution values per state cost_vals = np.zeros(num_states) for v in range(num_states): - idx = v % (n_k + 1) - if idx > 0: - global_col = self._bucket_columns[k][idx - 1] - valid_idx = self._valid_col_to_idx[global_col] - cost_vals[v] = self.weights[valid_idx] + idx = v % n_k + global_col = self._bucket_columns[k][idx] + valid_idx = self._valid_col_to_idx[global_col] + cost_vals[v] = self.weights[valid_idx] # Möbius inversion for cost cost_alpha = self._mobius_inversion(cost_vals, b_k) @@ -286,10 +289,9 @@ def add_to_poly(T_key, coeff): for o in range(n_orders): cov_vals = np.zeros(num_states) for v in range(num_states): - idx = v % (n_k + 1) - if idx > 0: - global_col = self._bucket_columns[k][idx - 1] - cov_vals[v] = float(Y_orders[o, global_col]) + idx = v % n_k + global_col = self._bucket_columns[k][idx] + cov_vals[v] = float(Y_orders[o, global_col]) cov_alpha = self._mobius_inversion(cov_vals, b_k) cov_polys[(o, k)] = {} for T_local, coeff in cov_alpha.items(): diff --git a/unittests/test_io.py b/unittests/test_io.py index cb0e21b..c3260bd 100644 --- a/unittests/test_io.py +++ b/unittests/test_io.py @@ -111,8 +111,8 @@ def test_get_problem_instance_cost_matches(self): ) reconstructed_bec = loaded.get_problem_instance() - # Test cost of [c0, c4] bitstring - test_bitstring = "10010" + # Test cost of [c0, c4] bitstring (bucket0 v=0→c0, bucket1 v=1→c4) + test_bitstring = "0010" self.assertAlmostEqual( original_bec.cost(test_bitstring), reconstructed_bec.cost(test_bitstring), diff --git a/unittests/test_problems.py b/unittests/test_problems.py index ca1b32c..779303d 100644 --- a/unittests/test_problems.py +++ b/unittests/test_problems.py @@ -282,14 +282,18 @@ class TestBucketExactCover(unittest.TestCase): # order4 [ 1 0 1 0 0 1 0 ] # # weights = [20, 40, 30, 35, 10, 25, 40], num_buckets = 2 - # bucket 0 (boat1): [c0, c1, c2], n_k=3, b_k=2 → qubits 0,1 - # bucket 1 (boat2): [c3, c4, c5, c6], n_k=4, b_k=3 → qubits 2,3,4 - # N_qubits = 5 + # bucket 0 (boat1): [c0, c1, c2], n_k=3, b_k=ceil(log2(3))=2 → qubits 0,1 + # bucket 1 (boat2): [c3, c4, c5, c6], n_k=4, b_k=ceil(log2(4))=2 → qubits 2,3 + # N_qubits = 4 + # + # Bitstring layout (LSB first per bucket): + # bucket0: string[0]=z0, string[1]=z1; v=z0+2*z1; idx=v%3 + # bucket1: string[2]=z0, string[3]=z1; v=z0+2*z1; idx=v%4 # # Feasible solutions (all 4 orders covered exactly once): - # [c0, c4]: weights 20+10=30 → bitstring "10010" (optimal, least cost) - # [c1, c5]: weights 40+25=65 → bitstring "01110" - # [c2, c6]: weights 30+40=70 → bitstring "11001" + # [c0, c4]: weights 20+10=30 → "0010" (optimal, least cost) + # [c1, c5]: weights 40+25=65 → "1001" + # [c2, c6]: weights 30+40=70 → "0111" def _make_problem(self, **kwargs): from qaoa.problems import BucketExactCover @@ -304,19 +308,20 @@ def _make_problem(self, **kwargs): weights = np.array([20., 40., 30., 35., 10., 25., 40.]) return BucketExactCover(columns, num_buckets=2, weights=weights, **kwargs) - # Bitstrings for known solutions - # bucket0: v=1→c0 → bits "10"; bucket1: v=2→c4 → bits "010" - _S_C0C4 = "10010" - # bucket0: v=2→c1 → bits "01"; bucket1: v=3→c5 → bits "110" - _S_C1C5 = "01110" - # bucket0: v=3→c2 → bits "11"; bucket1: v=4→c6 → bits "001" - _S_C2C6 = "11001" - # all zeros → null routes - _S_NULL = "00000" + # Bitstrings for known feasible solutions + # bucket0: v=0→idx=0→c0, bucket1: v=1→idx=1→c4 + _S_C0C4 = "0010" + # bucket0: v=1→idx=1→c1, bucket1: v=2→idx=2→c5 + _S_C1C5 = "1001" + # bucket0: v=2→idx=2→c2, bucket1: v=3→idx=3→c6 + _S_C2C6 = "0111" + # "0000" → bucket0 v=0→c0 (covers orders 1,4), bucket1 v=0→c3 (covers orders 1,3) + # → order 1 covered twice, order 2 not covered at all → infeasible + _S_INFEASIBLE = "0000" def test_n_qubits(self): bec = self._make_problem() - self.assertEqual(bec.N_qubits, 5) + self.assertEqual(bec.N_qubits, 4) def test_cost_feasible_solution(self): bec = self._make_problem() @@ -334,7 +339,7 @@ def test_is_feasible_true(self): def test_is_feasible_false(self): bec = self._make_problem() - self.assertFalse(bec.isFeasible(self._S_NULL)) + self.assertFalse(bec.isFeasible(self._S_INFEASIBLE)) def test_brute_force_finds_cheapest_solution(self): bec = self._make_problem() @@ -359,7 +364,7 @@ def test_validate_circuit(self): def test_unscaled_cost_equals_cost_when_no_scaling(self): bec = self._make_problem(scale_problem=False) - for s in [self._S_C0C4, self._S_C1C5, self._S_C2C6, self._S_NULL]: + for s in [self._S_C0C4, self._S_C1C5, self._S_C2C6, self._S_INFEASIBLE]: self.assertAlmostEqual(bec.cost(s), bec.unscaled_cost(s)) def test_penalty_auto_computed(self): @@ -381,7 +386,7 @@ def test_no_bucket_columns_ignored(self): # c7 has all zeros in top 2 rows → ignored bec = BucketExactCover(columns, num_buckets=2, weights=weights) # N_qubits should be the same as the 7-column problem (c7 ignored) - self.assertEqual(bec.N_qubits, 5) + self.assertEqual(bec.N_qubits, 4) self.assertNotIn(7, bec._valid_columns) From 0a0578fe68d332d29bb1eb838cfc9a037cc40c70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Heitlo=20Holm?= Date: Thu, 19 Mar 2026 15:16:21 +0100 Subject: [PATCH 4/7] Fixing some comments --- qaoa/problems/bucketexactcover_problem.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/qaoa/problems/bucketexactcover_problem.py b/qaoa/problems/bucketexactcover_problem.py index 3be794a..ca71d53 100644 --- a/qaoa/problems/bucketexactcover_problem.py +++ b/qaoa/problems/bucketexactcover_problem.py @@ -64,8 +64,8 @@ def __init__( self._valid_columns = sorted(valid_set) self._valid_col_to_idx = {col: idx for idx, col in enumerate(self._valid_columns)} - # Bucket sizes and qubit counts: b_k = ceil(log2(n_k)) per Eq. 10 of - # docs/main_hubo.tex. Minimum 1 qubit (handles n_k == 1 where log2=0). + # Each bucket k has n_k columns; we need ceil(log2(n_k)) qubits to encode a choice among them. + # Use at least 1 qubit (when n_k=1, log2(1)=0). self._bucket_sizes = [len(bc) for bc in self._bucket_columns] self._bucket_qubits = [ max(1, math.ceil(math.log2(n_k))) for n_k in self._bucket_sizes @@ -135,7 +135,7 @@ def __init__( def _decode(self, string): """Decode a bitstring to a binary indicator vector over valid columns. - Strategy B modular wrapping: idx = v % n_k always selects one route. + Modular wrapping: idx = v % n_k always selects one route. """ x = np.zeros(len(self._valid_columns)) for k in range(self.num_buckets): From 8a5d29084f2fb125975417115974eec6361262da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Heitlo=20Holm?= Date: Fri, 20 Mar 2026 10:16:50 +0100 Subject: [PATCH 5/7] bug fix in write and best_bitstring from hist function --- qaoa/utils/qaoaIO.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/qaoa/utils/qaoaIO.py b/qaoa/utils/qaoaIO.py index 90cb297..058c717 100644 --- a/qaoa/utils/qaoaIO.py +++ b/qaoa/utils/qaoaIO.py @@ -238,9 +238,12 @@ def from_qaoa(cls, qaoa: QAOA, ) if isinstance(qaoa.problem, problems.BucketExactCover): + # Save full weights array (one per column); BucketExactCover expects this on load + full_weights = np.zeros(qaoa.problem.columns.shape[1]) + full_weights[qaoa.problem._valid_columns] = qaoa.problem._original_weights problem_data = BucketExactCoverProblemData( columns=qaoa.problem.columns, - weights=qaoa.problem._original_weights, + weights=full_weights, solution=solution, num_buckets=qaoa.problem.num_buckets, ) @@ -303,4 +306,10 @@ def get_problem_instance(self): exp_return=self.problem.exp_return ) - \ No newline at end of file + def best_bitstring(self) -> str: + """Return the most frequent bitstring from the histogram at the final depth.""" + if not self.qaoa_params.depths: + raise ValueError("No depth results available") + final_depth = max(self.qaoa_params.depths.keys()) + hist = self.qaoa_params.depths[final_depth].histogram + return max(hist, key=lambda bs: hist[bs]) \ No newline at end of file From 8c54240229e8e8b85431b9e46e18d824cabb96b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Heitlo=20Holm?= Date: Sat, 21 Mar 2026 12:55:44 +0100 Subject: [PATCH 6/7] Decoding the bitstrings from the histograms generated from BucketExactCover --- qaoa/problems/base_problem.py | 20 ++++++++++ qaoa/problems/bucketexactcover_problem.py | 45 +++++++++++++++++++++++ qaoa/utils/plotroutines.py | 22 +++++++++-- qaoa/utils/qaoaIO.py | 32 +++++++++++++--- 4 files changed, 110 insertions(+), 9 deletions(-) diff --git a/qaoa/problems/base_problem.py b/qaoa/problems/base_problem.py index dc6d8e0..dd30a15 100644 --- a/qaoa/problems/base_problem.py +++ b/qaoa/problems/base_problem.py @@ -147,3 +147,23 @@ def validate_circuit(self, t=1, flip=True, atol=1e-8, rtol=1e-8): if self.circuit is None: self.create_circuit() return validation.check_phase_separator_exact_problem(self, t=t, flip=flip, atol=atol, rtol=rtol) + + + def preprocess_histogram(self, hist: dict) -> dict: + """ + Optionally transform a measurement histogram before plotting or analysis. + + Subclasses that use encoded bitstrings (e.g. binary-encoded formulations + with modular wrapping as in BucketExactCover) may override this to decode + keys into a canonical format and combine counts for equivalent solutions. + The default implementation returns the histogram unchanged. + + Args: + hist (dict): Raw histogram mapping bitstrings to hit counts. + + Returns: + dict: Histogram with keys in the format used for plotting and + comparison. For problems without encoding, returns ``hist`` + unchanged. + """ + return hist \ No newline at end of file diff --git a/qaoa/problems/bucketexactcover_problem.py b/qaoa/problems/bucketexactcover_problem.py index ca71d53..f0e56da 100644 --- a/qaoa/problems/bucketexactcover_problem.py +++ b/qaoa/problems/bucketexactcover_problem.py @@ -400,3 +400,48 @@ def brute_force_solve(self, return_num_feasible=False): if return_num_feasible: return best_sol, num_feasible return best_sol + + + # ------------------------------------------------------------------------- + # Decode histogram + # ------------------------------------------------------------------------- + + def decode_histogram(self, encoded_hist: dict) -> dict: + """Collapse encoded histogram to decoded (ExactCover-format) keys. + + Multiple encoded bitstrings can map to the same column selection via modular + wrapping; their counts are summed. + + Returns: + dict: Keys are one-hot strings over all columns (ExactCover format), + values are combined hit counts. + """ + decoded = {} + N_routes = self.columns.shape[1] + for enc_bs, count in encoded_hist.items(): + x = self._decode(enc_bs) + full = np.zeros(N_routes) + for valid_idx in range(len(x)): + if x[valid_idx]: + full[self._valid_columns[valid_idx]] = 1 + key = "".join(str(int(b)) for b in full) + decoded[key] = decoded.get(key, 0) + count + return decoded + + + def preprocess_histogram(self, hist: dict) -> dict: + """ + Decode histogram keys from HUBO bitstrings to one-hot column format. + + Multiple encoded bitstrings can map to the same column selection via + modular wrapping; their counts are summed. The returned histogram uses + keys compatible with ExactCover (one-hot over all columns), enabling + direct comparison with QUBO-based exact cover results. + + Args: + hist (dict): Raw histogram with encoded HUBO bitstrings as keys. + + Returns: + dict: Histogram with one-hot keys and combined hit counts. + """ + return self.decode_histogram(hist) \ No newline at end of file diff --git a/qaoa/utils/plotroutines.py b/qaoa/utils/plotroutines.py index cd55054..af94488 100644 --- a/qaoa/utils/plotroutines.py +++ b/qaoa/utils/plotroutines.py @@ -359,18 +359,23 @@ def plot_AllOptimalParameters(qaoa, figsize=(14, 6), title=None): return fig, axs -def plot_optimalHitRatios(qaoa, optimal_solution, shots=1024, fig=None, label=None, style="", title=None, **kwargs): +def plot_optimalHitRatios(qaoa, optimal_solution, shots=1024, fig=None, label=None, style="", title=None, decode=False, **kwargs): """Plot the hit ratio for the optimal solution as a function of depth. Args: qaoa (QAOA): A QAOA instance that has been optimized. optimal_solution (array-like): Binary array representing the optimal - solution (will be converted to a bitstring). + solution (will be converted to a bitstring). When ``decode=True`` + and the problem uses encoded bitstrings (e.g. BucketExactCover), + pass the optimal in decoded (one-hot) format. shots (int): Number of shots for sampling. fig (matplotlib.figure.Figure, optional): Existing figure to draw on. label (str, optional): Legend label. style (str): Matplotlib line-style string. title (str, optional): Plot title. + decode (bool): If True, apply the problem's histogram preprocessing + (e.g. decode encoded bitstrings to one-hot for BucketExactCover). + Default False. Returns: tuple: ``(fig, ax)``. @@ -379,6 +384,8 @@ def plot_optimalHitRatios(qaoa, optimal_solution, shots=1024, fig=None, label=No hit_rates = np.zeros(qaoa.current_depth) for d in range(qaoa.current_depth): hist = qaoa.hist(qaoa.optimization_results[d + 1].get_best_angles(), shots) + if decode: + hist = qaoa.problem.preprocess_histogram(hist) if optimal_sol in hist: num_hits = hist[optimal_sol] hit_rates[d] = num_hits / shots @@ -483,14 +490,19 @@ def printBestHistogramEntries(qaoa, classical_solution=None, num_solutions=10, s print(str(best_i) + "\t" + toprint + str(best_sol) + ", " + str(best_cost) + ", " + str(best_freq) + "<-- Best obtained solution") -def plotHitProbabilities(qaoa, opt_sol, depth=None, hist_shots=2**13, **kwargs): +def plotHitProbabilities(qaoa, opt_sol, depth=None, hist_shots=2**13, decode=False, **kwargs): """Plot hit probability for the optimal solution (delegates to :func:`plotHitProbabilities_fromHist`). Args: qaoa (QAOA): A QAOA instance that has been optimized. - opt_sol (str): Optimal solution bitstring. + opt_sol (str): Optimal solution bitstring. When ``decode=True`` and the + problem uses encoded bitstrings (e.g. BucketExactCover), pass the + optimal in decoded (one-hot) format. depth (int, optional): Depth to use; defaults to ``qaoa.current_depth``. hist_shots (int): Number of shots for sampling. + decode (bool): If True, apply the problem's histogram preprocessing + (e.g. decode encoded bitstrings to one-hot for BucketExactCover). + Default False. **kwargs: Forwarded to :func:`plotHitProbabilities_fromHist`. Returns: @@ -499,6 +511,8 @@ def plotHitProbabilities(qaoa, opt_sol, depth=None, hist_shots=2**13, **kwargs): if depth is None: depth = qaoa.current_depth hist = qaoa.hist(qaoa.get_angles(depth), hist_shots) + if decode: + hist = qaoa.problem.preprocess_histogram(hist) return plotHitProbabilities_fromHist(hist, opt_sol, **kwargs) diff --git a/qaoa/utils/qaoaIO.py b/qaoa/utils/qaoaIO.py index 058c717..9e3e338 100644 --- a/qaoa/utils/qaoaIO.py +++ b/qaoa/utils/qaoaIO.py @@ -3,7 +3,7 @@ import json import numpy as np from dataclasses import dataclass, asdict, field -from typing import List, Dict, Type +from typing import List, Dict, Type, Tuple from enum import Enum import os import subprocess @@ -306,10 +306,32 @@ def get_problem_instance(self): exp_return=self.problem.exp_return ) - def best_bitstring(self) -> str: - """Return the most frequent bitstring from the histogram at the final depth.""" + def best_bitstring(self, decode: bool = False) -> Tuple[str, int]: + """Return the most frequent bitstring from the histogram at the final depth. + + Args: + decode: If True and problem is BucketExactCover, return histogram with + decoded (ExactCover-format) keys and combined counts. + Otherwise return raw encoded histogram. + """ if not self.qaoa_params.depths: raise ValueError("No depth results available") final_depth = max(self.qaoa_params.depths.keys()) - hist = self.qaoa_params.depths[final_depth].histogram - return max(hist, key=lambda bs: hist[bs]) \ No newline at end of file + hist = self.get_histogram(final_depth, decode=decode) + best_bs = max(hist, key=lambda bs: hist[bs]) + return best_bs, hist[best_bs] + + def get_histogram(self, depth: int, decode: bool = False) -> dict: + """Get histogram for a given depth. + + Args: + depth: QAOA depth. + decode: If True and problem is BucketExactCover, return histogram with + decoded (ExactCover-format) keys and combined counts. + Otherwise return raw encoded histogram. + """ + hist = self.qaoa_params.depths[depth].histogram + if decode and isinstance(self.problem, BucketExactCoverProblemData): + bec = self.get_problem_instance() + return bec.decode_histogram(hist) + return hist \ No newline at end of file From fd93ddb20f7c82ba82c900417b43027c0655e681 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Heitlo=20Holm?= Date: Mon, 23 Mar 2026 21:39:36 +0100 Subject: [PATCH 7/7] added optional scaling and more io data --- qaoa/problems/bucketexactcover_problem.py | 5 ++- qaoa/utils/qaoaIO.py | 48 ++++++++++++++++------- unittests/test_io.py | 4 +- 3 files changed, 39 insertions(+), 18 deletions(-) diff --git a/qaoa/problems/bucketexactcover_problem.py b/qaoa/problems/bucketexactcover_problem.py index f0e56da..d204ab1 100644 --- a/qaoa/problems/bucketexactcover_problem.py +++ b/qaoa/problems/bucketexactcover_problem.py @@ -30,6 +30,7 @@ class BucketExactCover(Problem): scale_problem (bool): Whether the problem has been scaled. allow_infeasible (bool): Whether infeasible solutions are accepted. N_qubits (int): Total number of qubits. + upper_bound_scaling (float): Upper bound used for scaling (defaults to 1.0) """ def __init__( @@ -40,6 +41,7 @@ def __init__( penalty_factor=None, allow_infeasible=False, scale_problem=False, + upper_bound_scaling=1.0 ) -> None: super().__init__() @@ -47,6 +49,7 @@ def __init__( self.num_buckets = num_buckets self.allow_infeasible = allow_infeasible self.scale_problem = scale_problem + self.upper_bound_scaling = upper_bound_scaling N_rows, N_routes = self.columns.shape n_orders = N_rows - num_buckets @@ -123,7 +126,7 @@ def __init__( lam = C_max_shifted + epsilon denom = C_max_shifted * (1 + n_orders * (num_buckets - 1) ** 2) - s = 1.0 / denom + s = self.upper_bound_scaling / denom self.weights = s * omega self.penalty_factor = s * lam diff --git a/qaoa/utils/qaoaIO.py b/qaoa/utils/qaoaIO.py index 9e3e338..13ad240 100644 --- a/qaoa/utils/qaoaIO.py +++ b/qaoa/utils/qaoaIO.py @@ -72,7 +72,10 @@ def from_dict(cls, data: dict) -> "ProblemData": subclass = problem_registry[problem_type] data = _list_to_numpy(data) data.pop("problem_type", None) - return subclass(**data) + # Only pass known fields for backward compatibility (missing keys use defaults) + known_fields = set(subclass.__dataclass_fields__.keys()) - {"problem_type"} + filtered = {k: v for k, v in data.items() if k in known_fields} + return subclass(**filtered) # ---------- Problem Data Subclasses ---------- @@ -92,6 +95,7 @@ class BucketExactCoverProblemData(ProblemData): weights: np.ndarray = None solution: np.ndarray = None num_buckets: int = None + upper_bound_scaling: float = 1.0 problem_type: str = "BucketExactCover" @@ -141,8 +145,10 @@ class QAOAParameters: backend: str optimizer: str N_qubits: int - #depths: List[DepthResult] = field(default_factory=list) depths: Dict[int, DepthResult] = field(default_factory=dict) + landscape_p1_angles: Dict[str, List[float]] = field(default_factory=dict) + interpolate: bool = True + shots: int = 1024 @dataclass class QAOAResult: @@ -180,14 +186,18 @@ def load(cls, filename: str) -> "QAOAResult": depths_data = data["qaoa_params"]["depths"] depths = {int(k): DepthResult(**v) for k, v in depths_data.items()} + qp = data["qaoa_params"] qaoa_params = QAOAParameters( - cvar=data["qaoa_params"]["cvar"], - init_method=InitMethod[data["qaoa_params"]["init_method"]], - mixer_method=MixerMethod[data["qaoa_params"]["mixer_method"]], - backend=data["qaoa_params"]["backend"], - optimizer=data["qaoa_params"]["optimizer"], - N_qubits=data["qaoa_params"]["N_qubits"], + cvar=qp["cvar"], + init_method=InitMethod[qp["init_method"]], + mixer_method=MixerMethod[qp["mixer_method"]], + backend=qp["backend"], + optimizer=qp["optimizer"], + N_qubits=qp["N_qubits"], depths=depths, + landscape_p1_angles=qp.get("landscape_p1_angles", {}), + interpolate=qp.get("interpolate", True), + shots=qp.get("shots", 1024), ) return cls(problem=problem, qaoa_params=qaoa_params, metadata=data.get("metadata", {})) @@ -246,6 +256,7 @@ def from_qaoa(cls, qaoa: QAOA, weights=full_weights, solution=solution, num_buckets=qaoa.problem.num_buckets, + upper_bound_scaling=getattr(qaoa.problem, "upper_bound_scaling", 1.0), ) else: problem_data = ExactCoverProblemData( @@ -270,13 +281,16 @@ def from_qaoa(cls, qaoa: QAOA, optimizer = "COBYLA" qaoa_params = QAOAParameters( - cvar = qaoa.cvar, - init_method = init_method, - mixer_method = mixer_method, - backend = qaoa.backend.name, - optimizer = optimizer, - N_qubits = qaoa.problem.N_qubits, - depths = depths + cvar=qaoa.cvar, + init_method=init_method, + mixer_method=mixer_method, + backend=qaoa.backend.name, + optimizer=optimizer, + N_qubits=qaoa.problem.N_qubits, + depths=depths, + landscape_p1_angles=getattr(qaoa, "landscape_p1_angles", {}) or {}, + interpolate=getattr(qaoa, "interpolate", True), + shots=getattr(qaoa, "shots", 1024), ) return cls(problem=problem_data, qaoa_params=qaoa_params) @@ -290,6 +304,10 @@ def get_problem_instance(self): columns=self.problem.columns, weights=self.problem.weights, num_buckets=self.problem.num_buckets, + scale_problem=True, + upper_bound_scaling=getattr( + self.problem, "upper_bound_scaling", 1.0 + ), ) elif isinstance(self.problem, ExactCoverProblemData): return problems.ExactCover( diff --git a/unittests/test_io.py b/unittests/test_io.py index c3260bd..3b956de 100644 --- a/unittests/test_io.py +++ b/unittests/test_io.py @@ -114,8 +114,8 @@ def test_get_problem_instance_cost_matches(self): # Test cost of [c0, c4] bitstring (bucket0 v=0→c0, bucket1 v=1→c4) test_bitstring = "0010" self.assertAlmostEqual( - original_bec.cost(test_bitstring), - reconstructed_bec.cost(test_bitstring), + original_bec.unscaled_cost(test_bitstring), + reconstructed_bec.unscaled_cost(test_bitstring), )