Download

Source code for eqc_models.graph.partition

  • from typing import Tuple
  • import numpy as np
  • import scipy.sparse as sp
  • import networkx as nx
  • from math import modf
  • from eqc_models.graph.base import GraphModel
  • [docs]
  • class GraphPartitionModel(GraphModel):
  • """
  • A model for graph partitioning into `k` parts with objective and constraints
  • derived from the Laplacian matrix and additional penalties for balance and constraints.
  • """
  • def __init__(self, G: nx.Graph, k: int = 2, weight: str = "weight", alpha: float = 1.0, beta_obj: float = 1.0,
  • gamma: float = 1.0):
  • """
  • Parameters:
  • -----------
  • G : nx.Graph
  • The graph to partition.
  • k : int
  • The number of partitions.
  • weight : str
  • The key for edge weights in the graph.
  • alpha : float
  • The penalty multiplier for balance constraints.
  • beta_obj : float
  • The penalty multiplier for minimizing edge cuts (Laplacian term).
  • gamma : float
  • The penalty multiplier for assignment constraints.
  • """
  • self._G = G
  • self._k = k
  • self._weight = weight
  • self._alpha = alpha
  • self._beta_obj = beta_obj
  • self._gamma = gamma
  • self._laplacian = nx.laplacian_matrix(G, weight=weight)
  • self._num_nodes = G.number_of_nodes()
  • self._sorted_nodes = sorted(G.nodes)
  • self._constraints_offset = 0
  • self._balanced_partition_offset = 0
  • self.set_and_validate_k()
  • self._objective_matrix = self.initialize_model()
  • super().__init__(self._G)
  • [docs]
  • def set_and_validate_k(self):
  • """
  • Sets k and encoding length for a graph problem
  • """
  • # modf(x) = (fractional, integer) decomposition.
  • # Make sure fractional portion is zero. Convert to int if so.
  • assert modf(self._k)[0] == 0, "'k' must be an integer."
  • # it's an int, so set self.k
  • self._k = int(self._k)
  • # Verify k >= 2
  • assert self._k >= 2, f"ERROR, k={self._k}: k must be greater than or equal to 2."
  • # Verify that k makes sense
  • assert self._k <= self._num_nodes, (
  • f"ERROR, k={self._k}: k must be less than number of nodes or variables. k = {self._k} and "
  • f"number of nodes = {self._num_nodes}"
  • )
  • [docs]
  • def initialize_model(self):
  • """
  • Build the objective matrix and constraints for the k-partition problem.
  • """
  • if self._k == 2:
  • # For 2 partitions, construct a simpler QUBO from the Laplacian matrix
  • return self.get_two_partition_qubo()
  • else:
  • # For k > 2, construct a block-diagonal Laplacian with balance and constraints
  • laplacian_blocks = 0.5 * sp.block_diag([self._laplacian] * self._k, format="csr")
  • balance_term = self.get_balanced_partition_term()
  • constraints = self.get_constraints()
  • return (
  • self._alpha * balance_term
  • + self._gamma * constraints
  • + self._beta_obj * laplacian_blocks
  • )
  • [docs]
  • def get_balanced_partition_term(self) -> sp.spmatrix:
  • """
  • Construct the quadratic penalty term for balanced partitions.
  • """
  • I_k = sp.identity(self._k)
  • Ones_n = np.ones((self._num_nodes, self._num_nodes))
  • balanced_partition_term = sp.kron(I_k, Ones_n, format="csr")
  • balanced_partition_term -= (
  • 2 * self._num_nodes / self._k * sp.identity(balanced_partition_term.shape[0])
  • )
  • self._balanced_partition_offset = self._num_nodes**2 / self._k
  • return balanced_partition_term
  • [docs]
  • def get_constraints(self) -> sp.spmatrix:
  • """
  • Construct the quadratic penalty term for assignment constraints.
  • """
  • I_n = sp.identity(self._num_nodes)
  • Ones_k = np.ones((self._k, self._k))
  • constraints = sp.kron(Ones_k, I_n, format="csr")
  • constraints -= 2 * sp.identity(constraints.shape[0])
  • self._constraints_offset = self._num_nodes
  • return constraints
  • [docs]
  • def get_two_partition_qubo(self) -> sp.spmatrix:
  • """
  • Construct the QUBO matrix for two partitions using adjacency and penalties.
  • """
  • Garr = nx.to_scipy_sparse_matrix(self._G, weight=self._weight, nodelist=self._sorted_nodes)
  • Q = (
  • self._alpha * np.ones(Garr.shape, dtype=np.float32)
  • - self._beta_obj * Garr
  • )
  • degrees = Garr.sum(axis=1).A1 # Convert sparse matrix to 1D array
  • diag = self._beta_obj * degrees - self._alpha * (self._num_nodes - 1)
  • np.fill_diagonal(Q, diag)
  • return sp.csr_matrix(Q)
  • [docs]
  • def evaluate(self, solution: np.ndarray) -> float:
  • """
  • Evaluate the objective function for a given solution.
  • """
  • assert len(solution) == self._objective_matrix.shape[0], "Solution size mismatch."
  • return float(solution.T @ self._objective_matrix @ solution)
  • [docs]
  • def decode(self, solution: np.ndarray) -> dict:
  • """
  • Decode the solution vector into a partition assignment.
  • """
  • if self._k == 2:
  • return {node: int(solution[i]) for i, node in enumerate(self._sorted_nodes)}
  • else:
  • partitions, nodes = np.where(solution.reshape((self._k, self._num_nodes)) == 1)
  • return {self._sorted_nodes[node]: int(partition) for partition, node in zip(partitions, nodes)}
  • [docs]
  • def costFunction(self) -> Tuple[np.ndarray, np.ndarray]:
  • """
  • Return the linear and quadratic components of the objective function.
  • """
  • Q = self._objective_matrix
  • h = Q.diagonal()
  • J = 2 * sp.triu(Q, k=1).tocsr() # Extract upper triangular part for quadratic terms
  • return h, J