Download

Source code for eqc_models.ml.classifierqboost

  • # (C) Quantum Computing Inc., 2024.
  • # Import libs
  • import os
  • import sys
  • import time
  • import datetime
  • import json
  • import gc
  • import warnings
  • from functools import wraps
  • from multiprocessing import shared_memory, Pool, set_start_method, Manager
  • from multiprocessing.managers import SharedMemoryManager
  • import numpy as np
  • from sklearn.tree import DecisionTreeClassifier
  • from sklearn.naive_bayes import GaussianNB
  • from sklearn.linear_model import LogisticRegression
  • from sklearn.gaussian_process import GaussianProcessClassifier
  • from sklearn.gaussian_process.kernels import RBF
  • from eqc_models.ml.classifierbase import ClassifierBase
  • #from eqc_models.ml.cvqboost_hamiltonian import get_hamiltonian_pyx
  • def timer(func):
  • @wraps(func)
  • def wrapper(*args, **kwargs):
  • beg_time = time.time()
  • val = func(*args, **kwargs)
  • end_time = time.time()
  • tot_time = end_time - beg_time
  • print(
  • "Runtime of %s: %0.2f seconds!"
  • % (
  • func.__name__,
  • tot_time,
  • )
  • )
  • return val
  • return wrapper
  • class WeakClassifier:
  • def __init__(
  • self,
  • X_train,
  • y_train,
  • weak_cls_type,
  • max_depth=10,
  • min_samples_split=100,
  • num_jobs=1,
  • ):
  • assert X_train.shape[0] == len(y_train)
  • self.X_train = X_train
  • self.y_train = y_train
  • if weak_cls_type == "dct":
  • self.clf = DecisionTreeClassifier(
  • max_depth=max_depth,
  • min_samples_split=min_samples_split,
  • random_state=0,
  • )
  • elif weak_cls_type == "nb":
  • self.clf = GaussianNB()
  • elif weak_cls_type == "lg":
  • self.clf = LogisticRegression(random_state=0)
  • elif weak_cls_type == "gp":
  • self.clf = GaussianProcessClassifier(
  • kernel=1.0 * RBF(1.0),
  • random_state=0,
  • )
  • else:
  • assert False, (
  • "Unknown weak classifier type <%s>!" % weak_cls_type
  • )
  • def train(self):
  • self.clf.fit(self.X_train, self.y_train)
  • def predict(self, X):
  • return self.clf.predict(X)
  • [docs]
  • class QBoostClassifier(ClassifierBase):
  • """An implementation of QBoost classifier that uses QCi's Dirac-3.
  • Parameters
  • ----------
  • relaxation_schedule: Relaxation schedule used by Dirac-3;
  • default: 2.
  • num_samples: Number of samples used by Dirac-3; default: 1.
  • lambda_coef: A penalty multiplier; default: 0.
  • weak_cls_schedule: Weak classifier schedule. Is either 1, 2,
  • or 3; default: 2.
  • weak_cls_type: Type of weak classifier
  • - dct: Decison tree classifier
  • - nb: Naive Baysian classifier
  • - lg: Logistic regression
  • - gp: Gaussian process classifier
  • default: dct.
  • weak_max_depth: Max depth of the tree. Applied only when
  • weak_cls_type="dct". Default: 10.
  • weak_min_samples_split: The minimum number of samples required
  • to split an internal node. Applied only when
  • weak_cls_type="dct". Default: 100.
  • Examples
  • -----------
  • >>> from sklearn import datasets
  • >>> from sklearn.preprocessing import MinMaxScaler
  • >>> from sklearn.model_selection import train_test_split
  • >>> iris = datasets.load_iris()
  • >>> X = iris.data
  • >>> y = iris.target
  • >>> scaler = MinMaxScaler()
  • >>> X = scaler.fit_transform(X)
  • >>> for i in range(len(y)):
  • ... if y[i] == 0:
  • ... y[i] = -1
  • ... elif y[i] == 2:
  • ... y[i] = 1
  • >>> X_train, X_test, y_train, y_test = train_test_split(
  • ... X,
  • ... y,
  • ... test_size=0.2,
  • ... random_state=42,
  • ... )
  • >>> from eqc_models.ml.classifierqboost import QBoostClassifier
  • >>> obj = QBoostClassifier(
  • ... relaxation_schedule=2,
  • ... num_samples=1,
  • ... lambda_coef=0.0,
  • ... )
  • >>> from contextlib import redirect_stdout
  • >>> import io
  • >>> f = io.StringIO()
  • >>> with redirect_stdout(f):
  • ... obj.fit(X_train, y_train)
  • ... y_train_prd = obj.predict(X_train)
  • ... y_test_prd = obj.predict(X_test)
  • """
  • def __init__(
  • self,
  • relaxation_schedule=2,
  • num_samples=1,
  • lambda_coef=0,
  • weak_cls_schedule=2,
  • weak_cls_type="lg",
  • weak_max_depth=10,
  • weak_min_samples_split=100,
  • weak_cls_strategy="multi_processing",
  • weak_cls_num_jobs=None,
  • ):
  • super(QBoostClassifier).__init__()
  • assert weak_cls_schedule in [1, 2, 3]
  • assert weak_cls_type in ["dct", "nb", "lg", "gp"]
  • assert weak_cls_strategy in [
  • "multi_processing",
  • "multi_processing_shm",
  • "sequential",
  • ]
  • self.relaxation_schedule = relaxation_schedule
  • self.num_samples = num_samples
  • self.lambda_coef = lambda_coef
  • self.weak_cls_schedule = weak_cls_schedule
  • self.weak_cls_type = weak_cls_type
  • self.weak_max_depth = weak_max_depth
  • self.weak_min_samples_split = weak_min_samples_split
  • self.weak_cls_strategy = weak_cls_strategy
  • if weak_cls_num_jobs is None or weak_cls_num_jobs <= 0:
  • self.weak_cls_num_jobs = os.cpu_count()
  • else:
  • self.weak_cls_num_jobs = int(weak_cls_num_jobs)
  • self.h_list = []
  • self.ind_list = []
  • self.classes_ = None
  • @timer
  • def _build_weak_classifiers_sq(self, X, y):
  • n_records = X.shape[0]
  • n_dims = X.shape[1]
  • assert len(y) == n_records
  • self.h_list = []
  • self.ind_list = []
  • num_workers = self.weak_cls_num_jobs
  • tasks = []
  • for l in range(n_dims):
  • weak_classifier = WeakClassifier(
  • X[:, [l]],
  • y,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • weak_classifier.train()
  • self.ind_list.append([l])
  • self.h_list.append(weak_classifier)
  • if self.weak_cls_schedule >= 2:
  • for i in range(n_dims):
  • for j in range(i + 1, n_dims):
  • weak_classifier = WeakClassifier(
  • X[:, [i, j]],
  • y,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • weak_classifier.train()
  • self.ind_list.append([i, j])
  • self.h_list.append(weak_classifier)
  • if self.weak_cls_schedule >= 3:
  • for i in range(n_dims):
  • for j in range(i + 1, n_dims):
  • for k in range(j + 1, n_dims):
  • weak_classifier = WeakClassifier(
  • X[:, [i, j, k]],
  • y,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • weak_classifier.train()
  • self.ind_list.append([i, j, k])
  • self.h_list.append(weak_classifier)
  • return
  • def _train_weak_classifier_mp(
  • self,
  • indices,
  • X_subset,
  • y,
  • n_records,
  • n_dims,
  • weak_cls_type,
  • weak_max_depth,
  • weak_min_samples_split,
  • ):
  • # Train the weak classifier
  • weak_classifier = WeakClassifier(
  • X_subset,
  • y,
  • weak_cls_type,
  • weak_max_depth,
  • weak_min_samples_split,
  • )
  • weak_classifier.train()
  • return indices, weak_classifier
  • @timer
  • def _build_weak_classifiers_mp(self, X, y):
  • n_records = X.shape[0]
  • n_dims = X.shape[1]
  • assert len(y) == n_records
  • self.h_list = []
  • self.ind_list = []
  • num_workers = self.weak_cls_num_jobs
  • print(f"Using {num_workers} workers to build weak classifiers.")
  • set_start_method("fork", force=True)
  • tasks = []
  • for l in range(n_dims):
  • tasks.append(
  • (
  • [l],
  • X[:, [l]],
  • y,
  • n_records,
  • n_dims,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • )
  • if self.weak_cls_schedule >= 2:
  • for i in range(n_dims):
  • for j in range(i + 1, n_dims):
  • tasks.append(
  • (
  • [i, j],
  • X[:, [i, j]],
  • y,
  • n_records,
  • n_dims,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • )
  • if self.weak_cls_schedule >= 3:
  • for i in range(n_dims):
  • for j in range(i + 1, n_dims):
  • for k in range(j + 1, n_dims):
  • tasks.append(
  • (
  • [i, j, k],
  • X[:, [i, j, k]],
  • y,
  • n_records,
  • n_dims,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • )
  • # Parallel execution using Pool
  • with Pool(processes=num_workers) as pool:
  • results = pool.starmap(self._train_weak_classifier_mp, tasks)
  • pool.join()
  • pool.close()
  • for indices, weak_classifier in results:
  • self.ind_list.append(indices)
  • self.h_list.append(weak_classifier)
  • return
  • def _train_weak_classifier_shm(
  • self,
  • indices,
  • shm_X_name,
  • shm_y_name,
  • shared_list,
  • n_records,
  • n_dims,
  • weak_cls_type,
  • weak_max_depth,
  • weak_min_samples_split,
  • ):
  • """Train a weak classifier using shared memory."""
  • shm_X_worker = shared_memory.SharedMemory(name=shm_X_name)
  • shm_y_worker = shared_memory.SharedMemory(name=shm_y_name)
  • X_shared = np.ndarray(
  • (n_records, n_dims), dtype=np.float32, buffer=shm_X_worker.buf
  • )
  • y_shared = np.ndarray(
  • (n_records,), dtype=np.float32, buffer=shm_y_worker.buf
  • )
  • X_subset = X_shared[:, indices]
  • weak_classifier = WeakClassifier(
  • X_subset,
  • y_shared,
  • weak_cls_type,
  • weak_max_depth,
  • weak_min_samples_split,
  • )
  • weak_classifier.train()
  • shared_list.append((indices, weak_classifier))
  • shm_X_worker.close()
  • shm_y_worker.close()
  • @timer
  • def _build_weak_classifiers_shm(self, X, y):
  • n_records = X.shape[0]
  • n_dims = X.shape[1]
  • assert len(y) == n_records
  • self.h_list = []
  • self.ind_list = []
  • num_workers = self.weak_cls_num_jobs
  • print(f"Using {num_workers} workers to build weak classifiers.")
  • set_start_method("fork", force=True)
  • X = np.ascontiguousarray(X, dtype=np.float32)
  • y = np.ascontiguousarray(y, dtype=np.float32)
  • with SharedMemoryManager() as shm_manager:
  • shm_X = shm_manager.SharedMemory(size=X.nbytes)
  • shm_y = shm_manager.SharedMemory(size=y.nbytes)
  • X_shared = np.ndarray(X.shape, dtype=X.dtype, buffer=shm_X.buf)
  • y_shared = np.ndarray(y.shape, dtype=y.dtype, buffer=shm_y.buf)
  • np.copyto(X_shared, X)
  • np.copyto(y_shared, y)
  • with Manager() as manager:
  • shared_list = manager.list()
  • tasks = []
  • for l in range(n_dims):
  • tasks.append(
  • (
  • [l],
  • shm_X.name,
  • shm_y.name,
  • shared_list,
  • n_records,
  • n_dims,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • )
  • if self.weak_cls_schedule >= 2:
  • for i in range(n_dims):
  • for j in range(i + 1, n_dims):
  • tasks.append(
  • (
  • [i, j],
  • shm_X.name,
  • shm_y.name,
  • shared_list,
  • n_records,
  • n_dims,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • )
  • if self.weak_cls_schedule >= 3:
  • for i in range(n_dims):
  • for j in range(i + 1, n_dims):
  • for k in range(j + 1, n_dims):
  • tasks.append(
  • (
  • [i, j, k],
  • shm_X.name,
  • shm_y.name,
  • shared_list,
  • n_records,
  • n_dims,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • )
  • with Pool(processes=num_workers) as pool:
  • results = pool.starmap(
  • self._train_weak_classifier_shm, tasks
  • )
  • pool.close()
  • pool.join()
  • for item in list(shared_list):
  • self.ind_list.append(item[0])
  • self.h_list.append(item[1])
  • shm_X.close()
  • shm_X.unlink()
  • shm_y.close()
  • shm_y.unlink()
  • def _infer_one_weak_classifier(self, cls_ind, X_subset):
  • return self.h_list[cls_ind].predict(X_subset)
  • def _infer_weak_classifiers(self, X):
  • n_classifiers = len(self.h_list)
  • num_workers = self.weak_cls_num_jobs
  • print(f"Using {num_workers} workers for inference.")
  • set_start_method("fork", force=True)
  • tasks = []
  • for i in range(n_classifiers):
  • tasks.append((i, X[:, self.ind_list[i]]))
  • with Pool(processes=num_workers) as pool:
  • results = pool.starmap(self._infer_one_weak_classifier, tasks)
  • return list(results)
  • [docs]
  • def fit(self, X, y):
  • """
  • Build a QBoost classifier from the training set (X, y).
  • Parameters
  • ----------
  • X : {array-like, sparse matrix} of shape (n_samples, n_features)
  • The training input samples.
  • y : array-like of shape (n_samples,)
  • The target values.
  • Returns
  • -------
  • Response of Dirac-3 in JSON format.
  • """
  • assert X.shape[0] == y.shape[0], "Inconsistent sizes!"
  • assert set(y) == {-1, 1}, "Target values should be in {-1, 1}"
  • self.classes_ = set(y)
  • J, C, sum_constraint = self.get_hamiltonian(X, y)
  • assert J.shape[0] == J.shape[1], "Inconsistent hamiltonian size!"
  • assert J.shape[0] == C.shape[0], "Inconsistent hamiltonian size!"
  • self.set_model(J, C, sum_constraint)
  • sol, response = self.solve()
  • assert len(sol) == C.shape[0], "Inconsistent solution size!"
  • self.params = self.convert_sol_to_params(sol)
  • assert len(self.params) == len(self.h_list), "Inconsistent size!"
  • return response
  • [docs]
  • def predict_raw(self, X: np.array):
  • """
  • Predict raw output of the classifier for input X.
  • Parameters
  • ----------
  • X : {array-like, sparse matrix} of shape (n_samples, n_features)
  • Returns
  • -------
  • y : ndarray of shape (n_samples,)
  • The predicted raw output of the classifier.
  • """
  • n_records = X.shape[0]
  • n_classifiers = len(self.h_list)
  • y = np.zeros(shape=(n_records), dtype=np.float32)
  • h_vals = np.array(
  • [
  • self.h_list[i].predict(X[:, self.ind_list[i]])
  • for i in range(n_classifiers)
  • ]
  • )
  • y = np.tensordot(self.params, h_vals, axes=(0, 0))
  • return y
  • [docs]
  • def predict(self, X: np.array):
  • """
  • Predict classes for X.
  • Parameters
  • ----------
  • X : {array-like, sparse matrix} of shape (n_samples, n_features)
  • Returns
  • -------
  • y : ndarray of shape (n_samples,)
  • The predicted classes.
  • """
  • y = self.predict_raw(X)
  • y = np.sign(y)
  • return y
  • [docs]
  • @timer
  • def get_hamiltonian(
  • self,
  • X: np.array,
  • y: np.array,
  • ):
  • X = np.array(X, dtype=np.float32)
  • y = np.array(y, dtype=np.float32)
  • if self.weak_cls_strategy == "multi_processing":
  • self._build_weak_classifiers_mp(X, y)
  • elif self.weak_cls_strategy == "multi_processing_shm":
  • self._build_weak_classifiers_shm(X, y)
  • elif self.weak_cls_strategy == "sequential":
  • self._build_weak_classifiers_sq(X, y)
  • print("Built %d weak classifiers!" % len(self.h_list))
  • n_classifiers = len(self.h_list)
  • n_records = X.shape[0]
  • h_vals = np.array(
  • [
  • self.h_list[i].predict(X[:, self.ind_list[i]])
  • for i in range(n_classifiers)
  • ]
  • )
  • J = np.tensordot(h_vals, h_vals, axes=(1, 1))
  • J += np.diag(self.lambda_coef * np.ones((n_classifiers)))
  • C = -2.0 * np.tensordot(h_vals, y, axes=(1, 0))
  • # J, C = get_hamiltonian_pyx(y, h_vals, self.lambda_coef, n_records)
  • C = C.reshape((n_classifiers, 1))
  • return J, C, 1.0
  • [docs]
  • def convert_sol_to_params(self, sol):
  • return np.array(sol)