Source code for eqc_models.ml.decomposition

  • # (C) Quantum Computing Inc., 2024.
  • # Import libs
  • import os
  • import sys
  • import time
  • import datetime
  • import json
  • import warnings
  • from functools import wraps
  • import numpy as np
  • from qci_client import QciClient
  • from eqc_models import QuadraticModel
  • from eqc_models.solvers.qciclient import (
  • Dirac3CloudSolver,
  • Dirac3ContinuousCloudSolver,
  • Dirac1CloudSolver,
  • )
  • class DecompBase(QuadraticModel):
  • """An Base class for decomposition algorithms.
  • Parameters
  • ----------
  • relaxation_schedule: Relaxation schedule used by Dirac-3; default:
  • 2.
  • num_samples: Number of samples used by Dirac-3; default: 1.
  • """
  • def __init__(
  • self,
  • relaxation_schedule=2,
  • num_samples=1,
  • ):
  • super(self).__init__(None, None, None)
  • self.relaxation_schedule = relaxation_schedule
  • self.num_samples = num_samples
  • def _get_hamiltonian(
  • self,
  • X: np.array,
  • ):
  • pass
  • def _set_model(self, J, C, sum_constraint):
  • # Set hamiltonians
  • self._C = C
  • self._J = J
  • self._H = C, J
  • self._sum_constraint = sum_constraint
  • # Set upper_bound
  • num_variables = C.shape[0]
  • self.upper_bound = sum_constraint * np.ones((num_variables,))
  • return
  • def _solve(self):
  • solver = Dirac3ContinuousCloudSolver()
  • response = solver.solve(
  • self,
  • relaxation_schedule=self.relaxation_schedule,
  • solution_precision=1,
  • sum_constraint=self._sum_constraint,
  • num_samples=self.num_samples,
  • )
  • sol = response["results"]["solutions"][0]
  • return sol, response
  • def _solve_d1_test(self):
  • qubo = self._J
  • # Make sure matrix is symmetric to machine precision
  • qubo = 0.5 * (qubo + qubo.transpose())
  • # Instantiate
  • qci = QciClient()
  • # Create json objects
  • qubo_json = {
  • "file_name": "qubo_tutorial.json",
  • "file_config": {
  • "qubo": {"data": qubo, "num_variables": qubo.shape[0]},
  • },
  • }
  • response_json = qci.upload_file(file=qubo_json)
  • qubo_file_id = response_json["file_id"]
  • # Setup job json
  • job_params = {
  • "device_type": "dirac-1",
  • "alpha": 1.0,
  • "num_samples": 20,
  • }
  • job_json = qci.build_job_body(
  • job_type="sample-qubo",
  • job_params=job_params,
  • qubo_file_id=qubo_file_id,
  • job_name="tutorial_eqc1",
  • job_tags=["tutorial_eqc1"],
  • )
  • print(job_json)
  • # Run the job
  • job_response_json = qci.process_job(
  • job_body=job_json,
  • )
  • print(job_response_json)
  • results = job_response_json["results"]
  • energies = results["energies"]
  • samples = results["solutions"]
  • if True:
  • print("Energies:", energies)
  • sol = np.array(samples[0])
  • print(sol)
  • return sol
  • def fit(self, X):
  • pass
  • def transform(self, X: np.array):
  • pass
  • def fit_transform(self, X):
  • pass
  • def get_dynamic_range(self):
  • C = self._C
  • J = self._J
  • if C is None:
  • return
  • if J is None:
  • return
  • absc = np.abs(C)
  • absj = np.abs(J)
  • minc = np.min(absc[absc > 0])
  • maxc = np.max(absc)
  • minj = np.min(absj[absj > 0])
  • maxj = np.max(absj)
  • minval = min(minc, minj)
  • maxval = max(maxc, maxj)
  • return 10 * np.log10(maxval / minval)
  • class PCA(DecompBase):
  • """
  • An implementation of Principal component analysis (PCA) that
  • uses QCi's Dirac-3.
  • Linear dimensionality reduction using Singular Value
  • Decomposition of the data to project it to a lower dimensional
  • space.
  • Parameters
  • ----------
  • n_components: Number of components to keep; if n_components is not
  • set all components are kept; default: None.
  • relaxation_schedule: Relaxation schedule used by Dirac-3; default:
  • 2.
  • num_samples: Number of samples used by Dirac-3; default: 1.
  • Examples
  • -----------
  • >>> from sklearn import datasets
  • >>> iris = datasets.load_iris()
  • >>> X = iris.data
  • >>> from sklearn.preprocessing import StandardScaler
  • >>> scaler = StandardScaler()
  • >>> X = scaler.fit_transform(X)
  • >>> from eqc_models.ml.decomposition import PCA
  • >>> from contextlib import redirect_stdout
  • >>> import io
  • >>> f = io.StringIO()
  • >>> with redirect_stdout(f):
  • ... obj = PCA(
  • ... n_components=4,
  • ... relaxation_schedule=2,
  • ... num_samples=1,
  • ... )
  • ... X_pca = obj.fit_transform(X)
  • """
  • def __init__(
  • self,
  • n_components=None,
  • relaxation_schedule=2,
  • num_samples=1,
  • ):
  • self.n_components = n_components
  • self.relaxation_schedule = relaxation_schedule
  • self.num_samples = num_samples
  • self.X = None
  • self.X_pca = None
  • def _get_hamiltonian(
  • self,
  • X: np.array,
  • ):
  • num_records = X.shape[0]
  • num_features = X.shape[1]
  • J = -np.matmul(X.transpose(), X)
  • assert J.shape[0] == num_features
  • assert J.shape[1] == num_features
  • C = np.zeros((num_features, 1))
  • return J, C
  • def _get_first_component(self, X):
  • J, C = self._get_hamiltonian(X)
  • assert J.shape[0] == J.shape[1], "Inconsistent hamiltonian size!"
  • assert J.shape[0] == C.shape[0], "Inconsistent hamiltonian size!"
  • self._set_model(J, C, 1.0)
  • sol, response = self._solve()
  • assert len(sol) == C.shape[0], "Inconsistent solution size!"
  • fct = np.linalg.norm(sol)
  • if fct > 0:
  • fct = 1.0 / fct
  • v0 = fct * np.array(sol)
  • v0 = v0.reshape((v0.shape[0], 1))
  • lambda0 = np.matmul(np.matmul(v0.transpose(), -J), v0)[0][0]
  • assert lambda0 >= 0, "Unexpected negative eigenvalue!"
  • fct = np.sqrt(lambda0)
  • if fct > 0:
  • fct = 1.0 / fct
  • u0 = fct * np.matmul(X, v0)
  • u0 = u0.reshape(-1)
  • fct = np.linalg.norm(u0)
  • if fct > 0:
  • fct = 1.0 / fct
  • u0 = fct * u0
  • return u0, response
  • def fit(self, X):
  • """
  • Build a PCA object from the training set X.
  • Parameters
  • ----------
  • X : {array-like, sparse matrix} of shape (n_samples, n_features)
  • The training input samples.
  • Returns
  • -------
  • responses.
  • A dirct containing Dirac responses.
  • """
  • num_features = X.shape[1]
  • if self.n_components is None:
  • n_components = num_features
  • else:
  • n_components = self.n_components
  • n_components = min(n_components, num_features)
  • self.X = X.copy()
  • self.X_pca = []
  • resp_hash = {}
  • for i in range(n_components):
  • u, resp = self._get_first_component(X)
  • self.X_pca.append(u)
  • u = u.reshape((u.shape[0], 1))
  • X = X - np.matmul(
  • u,
  • np.matmul(u.transpose(), X),
  • )
  • assert X.shape == self.X.shape, "Inconsistent size!"
  • resp_hash["component_%d_response" % (i + 1)] = resp
  • self.X_pca = np.array(self.X_pca).transpose()
  • assert self.X_pca.shape[0] == self.X.shape[0]
  • assert self.X_pca.shape[1] == n_components
  • return resp_hash
  • def transform(self, X: np.array):
  • """
  • Apply dimensionality reduction to X.
  • X is projected on the first principal components previously extracted
  • from a training set.
  • Parameters
  • ----------
  • X : array-like of shape (n_samples, n_features)
  • New data, where `n_samples` is the number of samples
  • and `n_features` is the number of features.
  • Returns
  • -------
  • X_new : array-like of shape (n_samples, n_components)
  • Projection of X in the first principal components, where `n_samples`
  • is the number of samples and `n_components` is the number of the components.
  • """
  • if self.X is None:
  • return
  • return self.X_pca
  • def fit_transform(self, X):
  • """
  • Fit the model with X and apply the dimensionality reduction on X.
  • Parameters
  • ----------
  • X : array-like of shape (n_samples, n_features)
  • Training data, where `n_samples` is the number of samples
  • and `n_features` is the number of features.
  • Returns
  • -------
  • X_new : ndarray of shape (n_samples, n_components)
  • Transformed values.
  • """
  • self.fit(X)
  • return self.transform(X)