FoundryProductsTechnologyCompanyInvestor relationsResource libraryNews
Contact us
Resource library
    Resource library home
    Developer resources
    Applications
    Lessons
    Research and publications
    Support
      Software packages
        eqc-direct software package
        qci-client software package
        uqrng-direct software package
        emucore-direct software package
        eqc-models software package
          Getting Started
          Basic Usage
          eqc_models
          Dependencies
      Spec sheets
      User guides

Couldn’t find what you are looking for? Reach out to technical support.

Contact support
Privacy PolicyCookie PolicyTerms of UseForward Looking StatementsAccessibility Statement
Terms and Conditions of SaleEnd User License Agreement

© 2018-2026 Quantum Computing Inc.

Download

Default

Source code for eqc_models.ml.classifierqboost

  • # (C) Quantum Computing Inc., 2024.
  • # Import libs
  • import os
  • import sys
  • import time
  • import datetime
  • import json
  • import gc
  • import warnings
  • from functools import wraps
  • from multiprocessing import shared_memory, Pool, set_start_method, Manager
  • from multiprocessing.managers import SharedMemoryManager
  • import numpy as np
  • from sklearn.tree import DecisionTreeClassifier
  • from sklearn.naive_bayes import GaussianNB
  • from sklearn.linear_model import LogisticRegression
  • from sklearn.gaussian_process import GaussianProcessClassifier
  • from sklearn.gaussian_process.kernels import RBF
  • from eqc_models.ml.classifierbase import ClassifierBase
  • #from eqc_models.ml.cvqboost_hamiltonian import get_hamiltonian_pyx
  • def timer(func):
  • @wraps(func)
  • def wrapper(*args, **kwargs):
  • beg_time = time.time()
  • val = func(*args, **kwargs)
  • end_time = time.time()
  • tot_time = end_time - beg_time
  • print(
  • "Runtime of %s: %0.2f seconds!"
  • % (
  • func.__name__,
  • tot_time,
  • )
  • )
  • return val
  • return wrapper
  • class WeakClassifier:
  • def __init__(
  • self,
  • X_train,
  • y_train,
  • weak_cls_type,
  • max_depth=10,
  • min_samples_split=100,
  • num_jobs=1,
  • ):
  • assert X_train.shape[0] == len(y_train)
  • self.X_train = X_train
  • self.y_train = y_train
  • if weak_cls_type == "dct":
  • self.clf = DecisionTreeClassifier(
  • max_depth=max_depth,
  • min_samples_split=min_samples_split,
  • random_state=0,
  • )
  • elif weak_cls_type == "nb":
  • self.clf = GaussianNB()
  • elif weak_cls_type == "lg":
  • self.clf = LogisticRegression(random_state=0)
  • elif weak_cls_type == "gp":
  • self.clf = GaussianProcessClassifier(
  • kernel=1.0 * RBF(1.0),
  • random_state=0,
  • )
  • else:
  • assert False, (
  • "Unknown weak classifier type <%s>!" % weak_cls_type
  • )
  • def train(self):
  • self.clf.fit(self.X_train, self.y_train)
  • def predict(self, X):
  • return self.clf.predict(X)
  • [docs]
  • class QBoostClassifier(ClassifierBase):
  • """An implementation of QBoost classifier that uses QCi's Dirac-3.
  • Parameters
  • ----------
  • relaxation_schedule: Relaxation schedule used by Dirac-3;
  • default: 2.
  • num_samples: Number of samples used by Dirac-3; default: 1.
  • lambda_coef: A penalty multiplier; default: 0.
  • weak_cls_schedule: Weak classifier schedule. Is either 1, 2,
  • or 3; default: 2.
  • weak_cls_type: Type of weak classifier
  • - dct: Decison tree classifier
  • - nb: Naive Baysian classifier
  • - lg: Logistic regression
  • - gp: Gaussian process classifier
  • default: dct.
  • weak_max_depth: Max depth of the tree. Applied only when
  • weak_cls_type="dct". Default: 10.
  • weak_min_samples_split: The minimum number of samples required
  • to split an internal node. Applied only when
  • weak_cls_type="dct". Default: 100.
  • Examples
  • -----------
  • >>> from sklearn import datasets
  • >>> from sklearn.preprocessing import MinMaxScaler
  • >>> from sklearn.model_selection import train_test_split
  • >>> iris = datasets.load_iris()
  • >>> X = iris.data
  • >>> y = iris.target
  • >>> scaler = MinMaxScaler()
  • >>> X = scaler.fit_transform(X)
  • >>> for i in range(len(y)):
  • ... if y[i] == 0:
  • ... y[i] = -1
  • ... elif y[i] == 2:
  • ... y[i] = 1
  • >>> X_train, X_test, y_train, y_test = train_test_split(
  • ... X,
  • ... y,
  • ... test_size=0.2,
  • ... random_state=42,
  • ... )
  • >>> from eqc_models.ml.classifierqboost import QBoostClassifier
  • >>> obj = QBoostClassifier(
  • ... relaxation_schedule=2,
  • ... num_samples=1,
  • ... lambda_coef=0.0,
  • ... )
  • >>> from contextlib import redirect_stdout
  • >>> import io
  • >>> f = io.StringIO()
  • >>> with redirect_stdout(f):
  • ... obj.fit(X_train, y_train)
  • ... y_train_prd = obj.predict(X_train)
  • ... y_test_prd = obj.predict(X_test)
  • """
  • def __init__(
  • self,
  • relaxation_schedule=2,
  • num_samples=1,
  • lambda_coef=0,
  • weak_cls_schedule=2,
  • weak_cls_type="lg",
  • weak_max_depth=10,
  • weak_min_samples_split=100,
  • weak_cls_strategy="multi_processing",
  • weak_cls_num_jobs=None,
  • ):
  • super(QBoostClassifier).__init__()
  • assert weak_cls_schedule in [1, 2, 3]
  • assert weak_cls_type in ["dct", "nb", "lg", "gp"]
  • assert weak_cls_strategy in [
  • "multi_processing",
  • "multi_processing_shm",
  • "sequential",
  • ]
  • self.relaxation_schedule = relaxation_schedule
  • self.num_samples = num_samples
  • self.lambda_coef = lambda_coef
  • self.weak_cls_schedule = weak_cls_schedule
  • self.weak_cls_type = weak_cls_type
  • self.weak_max_depth = weak_max_depth
  • self.weak_min_samples_split = weak_min_samples_split
  • self.weak_cls_strategy = weak_cls_strategy
  • if weak_cls_num_jobs is None or weak_cls_num_jobs <= 0:
  • self.weak_cls_num_jobs = os.cpu_count()
  • else:
  • self.weak_cls_num_jobs = int(weak_cls_num_jobs)
  • self.h_list = []
  • self.ind_list = []
  • self.classes_ = None
  • @timer
  • def _build_weak_classifiers_sq(self, X, y):
  • n_records = X.shape[0]
  • n_dims = X.shape[1]
  • assert len(y) == n_records
  • self.h_list = []
  • self.ind_list = []
  • num_workers = self.weak_cls_num_jobs
  • tasks = []
  • for l in range(n_dims):
  • weak_classifier = WeakClassifier(
  • X[:, [l]],
  • y,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • weak_classifier.train()
  • self.ind_list.append([l])
  • self.h_list.append(weak_classifier)
  • if self.weak_cls_schedule >= 2:
  • for i in range(n_dims):
  • for j in range(i + 1, n_dims):
  • weak_classifier = WeakClassifier(
  • X[:, [i, j]],
  • y,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • weak_classifier.train()
  • self.ind_list.append([i, j])
  • self.h_list.append(weak_classifier)
  • if self.weak_cls_schedule >= 3:
  • for i in range(n_dims):
  • for j in range(i + 1, n_dims):
  • for k in range(j + 1, n_dims):
  • weak_classifier = WeakClassifier(
  • X[:, [i, j, k]],
  • y,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • weak_classifier.train()
  • self.ind_list.append([i, j, k])
  • self.h_list.append(weak_classifier)
  • return
  • def _train_weak_classifier_mp(
  • self,
  • indices,
  • X_subset,
  • y,
  • n_records,
  • n_dims,
  • weak_cls_type,
  • weak_max_depth,
  • weak_min_samples_split,
  • ):
  • # Train the weak classifier
  • weak_classifier = WeakClassifier(
  • X_subset,
  • y,
  • weak_cls_type,
  • weak_max_depth,
  • weak_min_samples_split,
  • )
  • weak_classifier.train()
  • return indices, weak_classifier
  • @timer
  • def _build_weak_classifiers_mp(self, X, y):
  • n_records = X.shape[0]
  • n_dims = X.shape[1]
  • assert len(y) == n_records
  • self.h_list = []
  • self.ind_list = []
  • num_workers = self.weak_cls_num_jobs
  • print(f"Using {num_workers} workers to build weak classifiers.")
  • set_start_method("fork", force=True)
  • tasks = []
  • for l in range(n_dims):
  • tasks.append(
  • (
  • [l],
  • X[:, [l]],
  • y,
  • n_records,
  • n_dims,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • )
  • if self.weak_cls_schedule >= 2:
  • for i in range(n_dims):
  • for j in range(i + 1, n_dims):
  • tasks.append(
  • (
  • [i, j],
  • X[:, [i, j]],
  • y,
  • n_records,
  • n_dims,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • )
  • if self.weak_cls_schedule >= 3:
  • for i in range(n_dims):
  • for j in range(i + 1, n_dims):
  • for k in range(j + 1, n_dims):
  • tasks.append(
  • (
  • [i, j, k],
  • X[:, [i, j, k]],
  • y,
  • n_records,
  • n_dims,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • )
  • # Parallel execution using Pool
  • with Pool(processes=num_workers) as pool:
  • results = pool.starmap(self._train_weak_classifier_mp, tasks)
  • pool.join()
  • pool.close()
  • for indices, weak_classifier in results:
  • self.ind_list.append(indices)
  • self.h_list.append(weak_classifier)
  • return
  • def _train_weak_classifier_shm(
  • self,
  • indices,
  • shm_X_name,
  • shm_y_name,
  • shared_list,
  • n_records,
  • n_dims,
  • weak_cls_type,
  • weak_max_depth,
  • weak_min_samples_split,
  • ):
  • """Train a weak classifier using shared memory."""
  • shm_X_worker = shared_memory.SharedMemory(name=shm_X_name)
  • shm_y_worker = shared_memory.SharedMemory(name=shm_y_name)
  • X_shared = np.ndarray(
  • (n_records, n_dims), dtype=np.float32, buffer=shm_X_worker.buf
  • )
  • y_shared = np.ndarray(
  • (n_records,), dtype=np.float32, buffer=shm_y_worker.buf
  • )
  • X_subset = X_shared[:, indices]
  • weak_classifier = WeakClassifier(
  • X_subset,
  • y_shared,
  • weak_cls_type,
  • weak_max_depth,
  • weak_min_samples_split,
  • )
  • weak_classifier.train()
  • shared_list.append((indices, weak_classifier))
  • shm_X_worker.close()
  • shm_y_worker.close()
  • @timer
  • def _build_weak_classifiers_shm(self, X, y):
  • n_records = X.shape[0]
  • n_dims = X.shape[1]
  • assert len(y) == n_records
  • self.h_list = []
  • self.ind_list = []
  • num_workers = self.weak_cls_num_jobs
  • print(f"Using {num_workers} workers to build weak classifiers.")
  • set_start_method("fork", force=True)
  • X = np.ascontiguousarray(X, dtype=np.float32)
  • y = np.ascontiguousarray(y, dtype=np.float32)
  • with SharedMemoryManager() as shm_manager:
  • shm_X = shm_manager.SharedMemory(size=X.nbytes)
  • shm_y = shm_manager.SharedMemory(size=y.nbytes)
  • X_shared = np.ndarray(X.shape, dtype=X.dtype, buffer=shm_X.buf)
  • y_shared = np.ndarray(y.shape, dtype=y.dtype, buffer=shm_y.buf)
  • np.copyto(X_shared, X)
  • np.copyto(y_shared, y)
  • with Manager() as manager:
  • shared_list = manager.list()
  • tasks = []
  • for l in range(n_dims):
  • tasks.append(
  • (
  • [l],
  • shm_X.name,
  • shm_y.name,
  • shared_list,
  • n_records,
  • n_dims,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • )
  • if self.weak_cls_schedule >= 2:
  • for i in range(n_dims):
  • for j in range(i + 1, n_dims):
  • tasks.append(
  • (
  • [i, j],
  • shm_X.name,
  • shm_y.name,
  • shared_list,
  • n_records,
  • n_dims,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • )
  • if self.weak_cls_schedule >= 3:
  • for i in range(n_dims):
  • for j in range(i + 1, n_dims):
  • for k in range(j + 1, n_dims):
  • tasks.append(
  • (
  • [i, j, k],
  • shm_X.name,
  • shm_y.name,
  • shared_list,
  • n_records,
  • n_dims,
  • self.weak_cls_type,
  • self.weak_max_depth,
  • self.weak_min_samples_split,
  • )
  • )
  • with Pool(processes=num_workers) as pool:
  • results = pool.starmap(
  • self._train_weak_classifier_shm, tasks
  • )
  • pool.close()
  • pool.join()
  • for item in list(shared_list):
  • self.ind_list.append(item[0])
  • self.h_list.append(item[1])
  • shm_X.close()
  • shm_X.unlink()
  • shm_y.close()
  • shm_y.unlink()
  • def _infer_one_weak_classifier(self, cls_ind, X_subset):
  • return self.h_list[cls_ind].predict(X_subset)
  • def _infer_weak_classifiers(self, X):
  • n_classifiers = len(self.h_list)
  • num_workers = self.weak_cls_num_jobs
  • print(f"Using {num_workers} workers for inference.")
  • set_start_method("fork", force=True)
  • tasks = []
  • for i in range(n_classifiers):
  • tasks.append((i, X[:, self.ind_list[i]]))
  • with Pool(processes=num_workers) as pool:
  • results = pool.starmap(self._infer_one_weak_classifier, tasks)
  • return list(results)
  • [docs]
  • def fit(self, X, y):
  • """
  • Build a QBoost classifier from the training set (X, y).
  • Parameters
  • ----------
  • X : {array-like, sparse matrix} of shape (n_samples, n_features)
  • The training input samples.
  • y : array-like of shape (n_samples,)
  • The target values.
  • Returns
  • -------
  • Response of Dirac-3 in JSON format.
  • """
  • assert X.shape[0] == y.shape[0], "Inconsistent sizes!"
  • assert set(y) == {-1, 1}, "Target values should be in {-1, 1}"
  • self.classes_ = set(y)
  • J, C, sum_constraint = self.get_hamiltonian(X, y)
  • assert J.shape[0] == J.shape[1], "Inconsistent hamiltonian size!"
  • assert J.shape[0] == C.shape[0], "Inconsistent hamiltonian size!"
  • self.set_model(J, C, sum_constraint)
  • sol, response = self.solve()
  • assert len(sol) == C.shape[0], "Inconsistent solution size!"
  • self.params = self.convert_sol_to_params(sol)
  • assert len(self.params) == len(self.h_list), "Inconsistent size!"
  • return response
  • [docs]
  • def predict_raw(self, X: np.array):
  • """
  • Predict raw output of the classifier for input X.
  • Parameters
  • ----------
  • X : {array-like, sparse matrix} of shape (n_samples, n_features)
  • Returns
  • -------
  • y : ndarray of shape (n_samples,)
  • The predicted raw output of the classifier.
  • """
  • n_records = X.shape[0]
  • n_classifiers = len(self.h_list)
  • y = np.zeros(shape=(n_records), dtype=np.float32)
  • h_vals = np.array(
  • [
  • self.h_list[i].predict(X[:, self.ind_list[i]])
  • for i in range(n_classifiers)
  • ]
  • )
  • y = np.tensordot(self.params, h_vals, axes=(0, 0))
  • return y
  • [docs]
  • def predict(self, X: np.array):
  • """
  • Predict classes for X.
  • Parameters
  • ----------
  • X : {array-like, sparse matrix} of shape (n_samples, n_features)
  • Returns
  • -------
  • y : ndarray of shape (n_samples,)
  • The predicted classes.
  • """
  • y = self.predict_raw(X)
  • y = np.sign(y)
  • return y
  • [docs]
  • @timer
  • def get_hamiltonian(
  • self,
  • X: np.array,
  • y: np.array,
  • ):
  • X = np.array(X, dtype=np.float32)
  • y = np.array(y, dtype=np.float32)
  • if self.weak_cls_strategy == "multi_processing":
  • self._build_weak_classifiers_mp(X, y)
  • elif self.weak_cls_strategy == "multi_processing_shm":
  • self._build_weak_classifiers_shm(X, y)
  • elif self.weak_cls_strategy == "sequential":
  • self._build_weak_classifiers_sq(X, y)
  • print("Built %d weak classifiers!" % len(self.h_list))
  • n_classifiers = len(self.h_list)
  • n_records = X.shape[0]
  • h_vals = np.array(
  • [
  • self.h_list[i].predict(X[:, self.ind_list[i]])
  • for i in range(n_classifiers)
  • ]
  • )
  • J = np.tensordot(h_vals, h_vals, axes=(1, 1))
  • J += np.diag(self.lambda_coef * np.ones((n_classifiers)))
  • C = -2.0 * np.tensordot(h_vals, y, axes=(1, 0))
  • # J, C = get_hamiltonian_pyx(y, h_vals, self.lambda_coef, n_records)
  • C = C.reshape((n_classifiers, 1))
  • return J, C, 1.0
  • [docs]
  • def convert_sol_to_params(self, sol):
  • return np.array(sol)
Previous page

Content

  • Source code for eqc_models.ml.classifierqboost