Source code for eqc_models.graph.partition
- from typing import Tuple
- import numpy as np
- import scipy.sparse as sp
- import networkx as nx
- from math import modf
- from eqc_models.graph.base import GraphModel
- [docs]
- class GraphPartitionModel(GraphModel):
- """
- A model for graph partitioning into `k` parts with objective and constraints
- derived from the Laplacian matrix and additional penalties for balance and constraints.
- """
- def __init__(self, G: nx.Graph, k: int = 2, weight: str = "weight", alpha: float = 1.0, beta_obj: float = 1.0,
- gamma: float = 1.0):
- """
- Parameters:
- -----------
- G : nx.Graph
- The graph to partition.
- k : int
- The number of partitions.
- weight : str
- The key for edge weights in the graph.
- alpha : float
- The penalty multiplier for balance constraints.
- beta_obj : float
- The penalty multiplier for minimizing edge cuts (Laplacian term).
- gamma : float
- The penalty multiplier for assignment constraints.
- """
- self._G = G
- self._k = k
- self._weight = weight
- self._alpha = alpha
- self._beta_obj = beta_obj
- self._gamma = gamma
- self._laplacian = nx.laplacian_matrix(G, weight=weight)
- self._num_nodes = G.number_of_nodes()
- self._sorted_nodes = sorted(G.nodes)
- self._constraints_offset = 0
- self._balanced_partition_offset = 0
- self.set_and_validate_k()
- self._objective_matrix = self.initialize_model()
- super().__init__(self._G)
- [docs]
- def set_and_validate_k(self):
- """
- Sets k and encoding length for a graph problem
- """
-
-
- assert modf(self._k)[0] == 0, "'k' must be an integer."
-
- self._k = int(self._k)
-
- assert self._k >= 2, f"ERROR, k={self._k}: k must be greater than or equal to 2."
-
- assert self._k <= self._num_nodes, (
- f"ERROR, k={self._k}: k must be less than number of nodes or variables. k = {self._k} and "
- f"number of nodes = {self._num_nodes}"
- )
- [docs]
- def initialize_model(self):
- """
- Build the objective matrix and constraints for the k-partition problem.
- """
- if self._k == 2:
-
- return self.get_two_partition_qubo()
- else:
-
- laplacian_blocks = 0.5 * sp.block_diag([self._laplacian] * self._k, format="csr")
- balance_term = self.get_balanced_partition_term()
- constraints = self.get_constraints()
- return (
- self._alpha * balance_term
- + self._gamma * constraints
- + self._beta_obj * laplacian_blocks
- )
- [docs]
- def get_balanced_partition_term(self) -> sp.spmatrix:
- """
- Construct the quadratic penalty term for balanced partitions.
- """
- I_k = sp.identity(self._k)
- Ones_n = np.ones((self._num_nodes, self._num_nodes))
- balanced_partition_term = sp.kron(I_k, Ones_n, format="csr")
- balanced_partition_term -= (
- 2 * self._num_nodes / self._k * sp.identity(balanced_partition_term.shape[0])
- )
- self._balanced_partition_offset = self._num_nodes**2 / self._k
- return balanced_partition_term
- [docs]
- def get_constraints(self) -> sp.spmatrix:
- """
- Construct the quadratic penalty term for assignment constraints.
- """
- I_n = sp.identity(self._num_nodes)
- Ones_k = np.ones((self._k, self._k))
- constraints = sp.kron(Ones_k, I_n, format="csr")
- constraints -= 2 * sp.identity(constraints.shape[0])
- self._constraints_offset = self._num_nodes
- return constraints
- [docs]
- def get_two_partition_qubo(self) -> sp.spmatrix:
- """
- Construct the QUBO matrix for two partitions using adjacency and penalties.
- """
- Garr = nx.to_scipy_sparse_matrix(self._G, weight=self._weight, nodelist=self._sorted_nodes)
- Q = (
- self._alpha * np.ones(Garr.shape, dtype=np.float32)
- - self._beta_obj * Garr
- )
- degrees = Garr.sum(axis=1).A1
- diag = self._beta_obj * degrees - self._alpha * (self._num_nodes - 1)
- np.fill_diagonal(Q, diag)
- return sp.csr_matrix(Q)
- [docs]
- def evaluate(self, solution: np.ndarray) -> float:
- """
- Evaluate the objective function for a given solution.
- """
- assert len(solution) == self._objective_matrix.shape[0], "Solution size mismatch."
- return float(solution.T @ self._objective_matrix @ solution)
- [docs]
- def decode(self, solution: np.ndarray) -> dict:
- """
- Decode the solution vector into a partition assignment.
- """
- if self._k == 2:
- return {node: int(solution[i]) for i, node in enumerate(self._sorted_nodes)}
- else:
- partitions, nodes = np.where(solution.reshape((self._k, self._num_nodes)) == 1)
- return {self._sorted_nodes[node]: int(partition) for partition, node in zip(partitions, nodes)}
- [docs]
- def costFunction(self) -> Tuple[np.ndarray, np.ndarray]:
- """
- Return the linear and quadratic components of the objective function.
- """
- Q = self._objective_matrix
- h = Q.diagonal()
- J = 2 * sp.triu(Q, k=1).tocsr()
- return h, J