Download

Source code for eqc_models.solvers.eqcdirect

  • # (C) Quantum Computing Inc., 2024.
  • from typing import Dict
  • import logging
  • import numpy as np
  • from eqc_direct.client import EqcClient
  • from eqc_models.base.base import ModelSolver, EqcModel
  • log = logging.getLogger(name=__name__)
  • class EqcDirectMixin:
  • ip_addr = None
  • port = None
  • def connect(self, ip_addr : str, port : str) -> str:
  • """ Explicitly set device address; if environment is configured with the connection, this call is not required """
  • self.ip_addr = ip_addr
  • self.port = port
  • client = self.client
  • return client.system_status()["status_desc"]
  • @property
  • def client(self):
  • params = {}
  • if self.ip_addr is not None:
  • params["ip_address"] = self.ip_addr
  • if self.port is not None:
  • params["port"] = self.port
  • return EqcClient(**params)
  • [docs]
  • class EqcDirectSolver(ModelSolver, EqcDirectMixin):
  • [docs]
  • def solve(self, model:EqcModel, relaxation_schedule:int=2, precision : float = 1.0) -> Dict:
  • poly_coefficients, poly_indices = model.sparse
  • # print(poly_indices)
  • if model.machine_slacks > 0:
  • # add a single 0 coefficient entry as the next-highest index
  • highest_idx = int(np.max(poly_indices))
  • # print("POLY HIGHEST", highest_idx)
  • for i in range(model.machine_slacks):
  • addtl_index = [0 for i in range(len(poly_indices[0]))]
  • addtl_index[-1] = highest_idx + i + 1
  • poly_indices = poly_indices.tolist() + [addtl_index]
  • poly_coefficients = poly_coefficients.tolist() + [0]
  • # print(poly_indices)
  • scval = model.sum_constraint
  • client = self.client
  • lock_id, start_ts, end_ts = client.wait_for_lock()
  • log.debug("Got device lock id %s. Wait time %f", lock_id, end_ts - start_ts)
  • resp = None
  • try:
  • log.debug("Calling device with parameters relaxation_schedule %d sum_constraint %s lock_id %s solution_precision %f",
  • relaxation_schedule, scval, lock_id, precision)
  • resp = client.process_job(poly_coefficients=poly_coefficients,
  • poly_indices=poly_indices,
  • relaxation_schedule=relaxation_schedule,
  • sum_constraint = scval,
  • lock_id = lock_id,
  • solution_precision=precision)
  • log.debug("Received response with status %s", resp["err_desc"])
  • log.debug("Runtime %f resulting in energy %f", resp["runtime"], resp["energy"])
  • log.debug("Distillation runtime %s resulting in energy %f", resp["distilled_runtime"], resp["distilled_energy"])
  • finally:
  • client.release_lock(lock_id=lock_id)
  • if resp is not None:
  • solution = resp["solution"]
  • energy = resp["energy"]
  • runtime = resp["runtime"]
  • dirac3_sol = np.array(solution)
  • log.debug("Energy %f Runtime %f Solution Size %i Solution Sum %f",
  • energy, runtime, len(dirac3_sol), sum(dirac3_sol))
  • else:
  • raise RuntimeError("FAILED TO GET RESPONSE")
  • return resp
  • [docs]
  • class Dirac3DirectSolver(EqcDirectSolver):
  • """
  • Naming this for when when other devices are available and have
  • different requirements. For instance, Dirac-3 requires the
  • summation constraint parameter, but others might not. The same
  • could be true for relaxation schedule.
  • """