- import os
- import sys
- import time
- import datetime
- import json
- import warnings
- from functools import wraps
- import numpy as np
- from qci_client import QciClient
- from eqc_models import QuadraticModel
- from eqc_models.solvers.qciclient import (
- Dirac3CloudSolver,
- Dirac3ContinuousCloudSolver,
- Dirac1CloudSolver,
- )
- class DecompBase(QuadraticModel):
- """An Base class for decomposition algorithms.
-
- Parameters
- ----------
-
- relaxation_schedule: Relaxation schedule used by Dirac-3; default:
- 2.
-
- num_samples: Number of samples used by Dirac-3; default: 1.
- """
-
- def __init__(
- self,
- relaxation_schedule=2,
- num_samples=1,
- ):
- super(self).__init__(None, None, None)
- self.relaxation_schedule = relaxation_schedule
- self.num_samples = num_samples
- def _get_hamiltonian(
- self,
- X: np.array,
- ):
- pass
- def _set_model(self, J, C, sum_constraint):
-
- self._C = C
- self._J = J
- self._H = C, J
- self._sum_constraint = sum_constraint
-
- num_variables = C.shape[0]
- self.upper_bound = sum_constraint * np.ones((num_variables,))
- return
- def _solve(self):
- solver = Dirac3ContinuousCloudSolver()
- response = solver.solve(
- self,
- relaxation_schedule=self.relaxation_schedule,
- solution_precision=1,
- sum_constraint=self._sum_constraint,
- num_samples=self.num_samples,
- )
- sol = response["results"]["solutions"][0]
- return sol, response
- def _solve_d1_test(self):
- qubo = self._J
-
- qubo = 0.5 * (qubo + qubo.transpose())
-
- qci = QciClient()
-
- qubo_json = {
- "file_name": "qubo_tutorial.json",
- "file_config": {
- "qubo": {"data": qubo, "num_variables": qubo.shape[0]},
- },
- }
- response_json = qci.upload_file(file=qubo_json)
- qubo_file_id = response_json["file_id"]
-
- job_params = {
- "device_type": "dirac-1",
- "alpha": 1.0,
- "num_samples": 20,
- }
- job_json = qci.build_job_body(
- job_type="sample-qubo",
- job_params=job_params,
- qubo_file_id=qubo_file_id,
- job_name="tutorial_eqc1",
- job_tags=["tutorial_eqc1"],
- )
- print(job_json)
-
- job_response_json = qci.process_job(
- job_body=job_json,
- )
- print(job_response_json)
- results = job_response_json["results"]
- energies = results["energies"]
- samples = results["solutions"]
- if True:
- print("Energies:", energies)
- sol = np.array(samples[0])
- print(sol)
- return sol
- def fit(self, X):
- pass
- def transform(self, X: np.array):
- pass
- def fit_transform(self, X):
- pass
- def get_dynamic_range(self):
- C = self._C
- J = self._J
- if C is None:
- return
- if J is None:
- return
- absc = np.abs(C)
- absj = np.abs(J)
- minc = np.min(absc[absc > 0])
- maxc = np.max(absc)
- minj = np.min(absj[absj > 0])
- maxj = np.max(absj)
- minval = min(minc, minj)
- maxval = max(maxc, maxj)
- return 10 * np.log10(maxval / minval)
- class PCA(DecompBase):
- """
- An implementation of Principal component analysis (PCA) that
- uses QCi's Dirac-3.
- Linear dimensionality reduction using Singular Value
- Decomposition of the data to project it to a lower dimensional
- space.
- Parameters
- ----------
-
- n_components: Number of components to keep; if n_components is not
- set all components are kept; default: None.
-
- relaxation_schedule: Relaxation schedule used by Dirac-3; default:
- 2.
-
- num_samples: Number of samples used by Dirac-3; default: 1.
- Examples
- -----------
-
- >>> from sklearn import datasets
- >>> iris = datasets.load_iris()
- >>> X = iris.data
- >>> from sklearn.preprocessing import StandardScaler
- >>> scaler = StandardScaler()
- >>> X = scaler.fit_transform(X)
- >>> from eqc_models.ml.decomposition import PCA
- >>> from contextlib import redirect_stdout
- >>> import io
- >>> f = io.StringIO()
- >>> with redirect_stdout(f):
- ... obj = PCA(
- ... n_components=4,
- ... relaxation_schedule=2,
- ... num_samples=1,
- ... )
- ... X_pca = obj.fit_transform(X)
-
- """
-
- def __init__(
- self,
- n_components=None,
- relaxation_schedule=2,
- num_samples=1,
- ):
- self.n_components = n_components
- self.relaxation_schedule = relaxation_schedule
- self.num_samples = num_samples
- self.X = None
- self.X_pca = None
- def _get_hamiltonian(
- self,
- X: np.array,
- ):
- num_records = X.shape[0]
- num_features = X.shape[1]
- J = -np.matmul(X.transpose(), X)
- assert J.shape[0] == num_features
- assert J.shape[1] == num_features
- C = np.zeros((num_features, 1))
- return J, C
- def _get_first_component(self, X):
- J, C = self._get_hamiltonian(X)
- assert J.shape[0] == J.shape[1], "Inconsistent hamiltonian size!"
- assert J.shape[0] == C.shape[0], "Inconsistent hamiltonian size!"
- self._set_model(J, C, 1.0)
- sol, response = self._solve()
- assert len(sol) == C.shape[0], "Inconsistent solution size!"
- fct = np.linalg.norm(sol)
- if fct > 0:
- fct = 1.0 / fct
- v0 = fct * np.array(sol)
- v0 = v0.reshape((v0.shape[0], 1))
- lambda0 = np.matmul(np.matmul(v0.transpose(), -J), v0)[0][0]
- assert lambda0 >= 0, "Unexpected negative eigenvalue!"
- fct = np.sqrt(lambda0)
- if fct > 0:
- fct = 1.0 / fct
- u0 = fct * np.matmul(X, v0)
- u0 = u0.reshape(-1)
- fct = np.linalg.norm(u0)
- if fct > 0:
- fct = 1.0 / fct
- u0 = fct * u0
- return u0, response
- def fit(self, X):
- """
- Build a PCA object from the training set X.
- Parameters
- ----------
- X : {array-like, sparse matrix} of shape (n_samples, n_features)
- The training input samples.
- Returns
- -------
- responses.
- A dirct containing Dirac responses.
- """
- num_features = X.shape[1]
- if self.n_components is None:
- n_components = num_features
- else:
- n_components = self.n_components
- n_components = min(n_components, num_features)
- self.X = X.copy()
- self.X_pca = []
- resp_hash = {}
- for i in range(n_components):
- u, resp = self._get_first_component(X)
- self.X_pca.append(u)
- u = u.reshape((u.shape[0], 1))
- X = X - np.matmul(
- u,
- np.matmul(u.transpose(), X),
- )
- assert X.shape == self.X.shape, "Inconsistent size!"
- resp_hash["component_%d_response" % (i + 1)] = resp
-
- self.X_pca = np.array(self.X_pca).transpose()
- assert self.X_pca.shape[0] == self.X.shape[0]
- assert self.X_pca.shape[1] == n_components
- return resp_hash