FoundryProductsTechnologyCompanyInvestor relationsResource libraryNews
Contact us
Resource library
    Introduction to Dirac-3 home
    Dirac-3 Developer Beginner Guide
    Dirac-3 user guide
    Dirac-3 spec sheet
    Entropy Quantum Computing overview
    Multibody formulation
    qci-client software package
    eqc-direct software package
      Getting Started
      Basic Usage
      Advanced Usage
      eqc_direct
      Dependencies

Couldn’t find what you are looking for? Reach out to technical support.

Contact support
Privacy PolicyCookie PolicyTerms of UseForward Looking StatementsAccessibility Statement
Terms and Conditions of SaleEnd User License Agreement

© 2018-2026 Quantum Computing Inc.

Download

Source code for eqc_direct.client

  • """
  • :class:`.EqcClient` contains all RPC calls to process, get system status,
  • and fetch results.
  • """
  • import logging
  • import time
  • import os
  • import warnings
  • from typing import TypedDict, List, Optional, Union
  • import grpc
  • from grpc._channel import _InactiveRpcError
  • import numpy as np
  • from . import eqc_pb2, eqc_pb2_grpc
  • from .utils import (
  • SysStatus,
  • message_to_dict,
  • PREC_MIN_RECOMMENDED_LEVELS,
  • get_decimal_places
  • )
  • [docs]
  • class InactiveRpcError(Exception):
  • """Custom exception wrapper around grpc._channel._InactiveRpcError."""
  • [docs]
  • class EqcResult(TypedDict):
  • """
  • EQC results object. Will not contain a energy or solution if err_code is not 0.
  • :param err_code: the error code for a given job. Full list of :code:`err_code`
  • values can be found :class:`eqc_direct.utils.JobCodes`
  • :param err_desc: the error description for a given job submission. Full list of
  • :code:`err_desc` values can be found in :class:`eqc_direct.utils.JobCodes`
  • :param preprocessing_time: data validation and time to re-format input data for
  • running on the device in seconds
  • :param runtime: solving time in seconds for Dirac hardware
  • :param energy: energy for best solution found (float32 precision)
  • :param solution: vector representing the lowest energy solution (float32 precision)
  • :param distilled_runtime: runtime for distillation of solutions in seconds
  • :param distilled_energy: energy for distilled solution for input polynomial
  • (float32 precision)
  • :param distilled_solution: a vector representing the solution after
  • the distillation procedure is applied to the original solution
  • derived from the hardware. (float32 precision)
  • :note:
  • * solutions are length n vector of floats \
  • that sum to the device constraint
  • .. Must use native python types to ensure can be dumped to json
  • """
  • err_code: int
  • err_desc: str
  • preprocessing_time: float
  • runtime: float
  • energy: Optional[float]
  • solution: Optional[List[float]]
  • distilled_runtime: Optional[float]
  • distilled_energy: Optional[float]
  • distilled_solution: Optional[List[float]]
  • [docs]
  • class EqcClient:
  • """
  • Provides calls to process jobs using EQC RPC server
  • :param ip_address: The IP address of the RPC server
  • :param port: The port that the RPC server is running on
  • :param max_data_size: the max send and recieve message length for RPC server
  • .. note::
  • :code:`lock_id` is used by a variety of class functions.
  • It is set to an empty string by default since default for device server
  • :code:`lock_id` is also an empty string. This allows for single user
  • processing without having to acquire a device lock.
  • .. All GRPC calls follow a specific pattern:
  • .. 1. Fill in data to be sent in message stub
  • .. 2. Send data using stub service method
  • .. 3. Parse response
  • """
  • def __init__(
  • self,
  • ip_address: str = os.getenv("DEVICE_IP_ADDRESS", "localhost"),
  • port: str = os.getenv("DEVICE_PORT", "50051"),
  • max_data_size: int = 512 * 1024 * 1024,
  • ):
  • self._ip_address = ip_address
  • self._max_data_size = max_data_size
  • self._ip_add_port = ip_address + ":" + port
  • self._channel_opt = [
  • ("grpc.max_send_message_length", max_data_size),
  • ("grpc.max_receive_message_length", max_data_size),
  • ]
  • self.channel = grpc.insecure_channel(
  • self._ip_add_port,
  • options=self._channel_opt,
  • )
  • self.eqc_stub = eqc_pb2_grpc.EqcServiceStub(self.channel)
  • [docs]
  • def submit_job( # pylint: disable=R0913, R0914
  • self,
  • poly_coefficients: np.ndarray,
  • poly_indices: np.ndarray,
  • num_variables: Optional[int] = None,
  • lock_id: str = "",
  • sum_constraint: Union[int, float] = 10000,
  • relaxation_schedule: int = 2,
  • solution_precision: Optional[float] = None,
  • ) -> dict:
  • """
  • Submits data to be processed by EQC device
  • :param poly_coefficients:
  • coefficient values for the polynomial to be minimized
  • :param poly_indices:
  • list of lists containing polynomial indices associated with
  • coefficient values for problem to be optimized.
  • :param num_variables: the number of total variables for the submitted
  • polynomial must not be less than max index in :code:`poly_indices`.
  • If no value is provided then will be set to max value in
  • :code:`poly_indices`.
  • :param lock_id: a UUID to allow for multi-user processing
  • :param sum_constraint: a normalization constraint that is applied to the
  • problem space that is used to calculate :code:`energy`. This
  • parameter will be rounded if exceeds float32 precision
  • (e.g. 7-decimal places). Value must be between 1 and 10000.
  • :param relaxation_schedule: four different schedules represented
  • in integer parameter. Higher values reduce the variation in
  • the analog spin values and therefore, are more probable to lead to
  • improved objective function energy for input problem.
  • Accepts range of values in set {1, 2, 3, 4}.
  • :param solution_precision: the level of precision to apply to the solutions.
  • This parameter will be rounded if exceeds float32 precision
  • (e.g. 7-decimal places). If specified a distillation method is
  • applied to the continuous solutions to map them to the submitted
  • :code:`solution_precision`. Input :code:`solution_precision` must
  • satisfy :code:`solution_precision` greater than or equal to
  • :code:`sum_constraint`/10000 in order to be valid.
  • Also :code:`sum_constraint` must be divisible by :code:`solution_precision`.
  • If :code:`solution_precision` is not specified no distillation will be
  • applied to the solution derived by the device.
  • :return: a member of :class:`eqc_direct.utils.JobCodes` as a dict
  • with the following keys:
  • - **err_code**: `int`- job submission error code
  • - **err_desc**: `str`- error code description for submission
  • """
  • # set here and in process_job
  • # need to set default as
  • if solution_precision is None:
  • solution_precision = 0
  • poly_coefficients = np.array(poly_coefficients)
  • poly_indices = np.array(poly_indices)
  • coefficient_dtype = poly_coefficients.dtype
  • if not (
  • np.issubdtype(coefficient_dtype, np.integer)
  • or (
  • np.issubdtype(coefficient_dtype, np.floating)
  • and np.finfo(coefficient_dtype).bits <= 32
  • )
  • ):
  • warn_dtype_msg = (
  • f"Max precision for EQC device is float32 input type "
  • f"was dtype {np.dtype(coefficient_dtype).name}."
  • f" Input matrix will be rounded"
  • )
  • logging.warning(warn_dtype_msg)
  • warnings.warn(warn_dtype_msg, Warning)
  • if get_decimal_places(solution_precision)>7:
  • soln_prec_warn = (
  • f"`solution_precision`precision is greater than 7 "
  • f"decimal places. Will be modified on submission to "
  • f"device to float32 precision"
  • )
  • logging.warning(soln_prec_warn)
  • warnings.warn(soln_prec_warn, Warning)
  • if get_decimal_places(sum_constraint)>7:
  • sum_constraint_warn = (
  • f"`sum_constraint` precision is greater than 7 decimal "
  • f"places. Will be modified on submission to device "
  • f"to float32"
  • )
  • logging.warning(sum_constraint_warn)
  • warnings.warn(sum_constraint_warn, Warning)
  • try:
  • _, degree_poly = poly_indices.shape
  • except ValueError as err:
  • err_msg = "`poly_indices` array must be two dimensions"
  • logging.error(err_msg, exc_info=True)
  • raise ValueError(err_msg) from err
  • if not num_variables:
  • num_variables = np.max(poly_indices)
  • # flatten rowwise for matrix
  • poly_indices = poly_indices.flatten(order="c").tolist()
  • job_input = eqc_pb2.JobInput(
  • num_variables=num_variables,
  • degree=degree_poly,
  • poly_indices=poly_indices,
  • coef_values=poly_coefficients.tolist(),
  • sum_constraint=sum_constraint,
  • relaxation_schedule=relaxation_schedule,
  • soln_precision=solution_precision,
  • lock_id=lock_id,
  • )
  • try:
  • job_results = self.eqc_stub.SubmitJob(job_input)
  • except _InactiveRpcError as exc:
  • # Make error easier to read/detect by consumers.
  • raise InactiveRpcError(
  • "EQC submit_job failed due to grpc._channel._InactiveRpcError."
  • ) from exc
  • return message_to_dict(job_results)
  • [docs]
  • def fetch_result(self, lock_id: str = "") -> EqcResult:
  • """
  • Request last EQC job results. Returns results from the most recent
  • run on the device.
  • :param lock_id: a valid :code:`lock_id` that matches current device
  • :code:`lock_id`
  • :return: an :class:`.EqcResult` object
  • """
  • fetch_input = eqc_pb2.LockMessage(lock_id=lock_id)
  • try:
  • eqc_results = self.eqc_stub.FetchResults(fetch_input)
  • except _InactiveRpcError as exc:
  • # Make error easier to read/detect by consumers.
  • raise InactiveRpcError(
  • "EQC fetch_results failed due to grpc._channel._InactiveRpcError."
  • ) from exc
  • result = message_to_dict(eqc_results)
  • # need to interpret result with correct precision
  • # grpc serialization deserialization causes a change in the values
  • # in order to ensure that results can be dumped to json
  • # must use native python types if use float(np.float32(num))
  • # then will get corrupted bits so must cast to str first
  • result["solution"] = [
  • float(f"{np.float32(val):.7f}") for val in result["solution"]
  • ]
  • result["distilled_solution"] = [
  • float(f"{np.float32(val):.7f}") for val in result["distilled_solution"]
  • ]
  • result["energy"] = float(f"{np.float32(result['energy']):.7f}")
  • result["distilled_energy"] = float(
  • f"{np.float32(result['distilled_energy']):.7f}"
  • )
  • return result
  • [docs]
  • def system_status(self) -> dict:
  • """
  • Client call to obtain EQC system status
  • :returns: a member of :class:`eqc_direct.utils.SysStatus` as a dict:
  • - **status_code**: `int`- current system status code
  • - **status_desc**: `str`- description of current system status
  • """
  • try:
  • sys_resp = self.eqc_stub.SystemStatus(eqc_pb2.Empty())
  • except _InactiveRpcError as exc:
  • raise InactiveRpcError(
  • "EQC system_status failed due to grpc._channel._InactiveRpcError."
  • ) from exc
  • return message_to_dict(sys_resp)
  • [docs]
  • def acquire_lock(self) -> dict:
  • """
  • Makes a single attempt to acquire exclusive lock on hardware execution.
  • Locking can be used to ensure orderly processing in multi-user environments.
  • Lock can only be acquired when no other user has acquired the lock or when
  • the system has been idle for 60 seconds while another user has the lock.
  • This idle timeout prevents one user from blocking other users from using
  • the machine even if they are not active.
  • :return:
  • a member of :class:`eqc_direct.utils.LockManageStatus` as a dict along
  • with an additional key :code:`lock_id`:
  • - **lock_id**: `str`- if acquired the current device `lock_id`
  • else empty string
  • - **status_code**: `int`- status code for lock id acquisition
  • - **status_desc**: `str`- a description for the associated status code
  • """
  • try:
  • acquire_lock_resp = self.eqc_stub.AcquireLock(eqc_pb2.Empty())
  • except _InactiveRpcError as exc:
  • raise InactiveRpcError(
  • "EQC acquire_lock failed due to grpc._channel._InactiveRpcError."
  • ) from exc
  • return {
  • "lock_id": acquire_lock_resp.lock_id,
  • "status_code": acquire_lock_resp.lock_status.status_code,
  • "status_desc": acquire_lock_resp.lock_status.status_desc,
  • }
  • [docs]
  • def release_lock(self, lock_id: str = "") -> dict:
  • """
  • Releases exclusive lock for running health check or submitting job
  • :param lock_id: a UUID with currently acquired exclusive device lock
  • :return: a member of :class:`eqc_direct.utils.LockManageStatus` as a dict:
  • - **status_code**: `int`- status code for lock id acquisition
  • - **status_desc**: `str`- a description for the associated status code
  • """
  • release_input = eqc_pb2.LockMessage(lock_id=lock_id)
  • try:
  • release_lock_resp = self.eqc_stub.ReleaseLock(release_input)
  • except _InactiveRpcError as exc:
  • raise InactiveRpcError(
  • "EQC release_lock failed due to grpc._channel._InactiveRpcError."
  • ) from exc
  • return message_to_dict(release_lock_resp)
  • [docs]
  • def check_lock(self, lock_id: str = "") -> dict:
  • """
  • Checks if submitted :code:`lock_id` has execution lock on the device
  • :param lock_id: a UUID which will be checked to determine if has exclusive
  • device execution lock
  • :return: a member of :class:`eqc_direct.utils.LockCheckStatus` as a dict:
  • - **status_code**: `int`- status code for lock check
  • - **status_desc**: `str`- a description for the associated status code
  • """
  • check_input = eqc_pb2.LockMessage(lock_id=lock_id)
  • check_output = self.eqc_stub.CheckLock(check_input)
  • return message_to_dict(check_output)
  • [docs]
  • def stop_running_process(self, lock_id: str = "") -> dict:
  • """
  • Stops a running process either a health check or a Eqc job.
  • Process locks will release automatically based on a timeout
  • which is maintained in the server code if they are
  • not released using this.
  • :param lock_id: requires a lock_id that was acquired by
  • :return:
  • a member of :class:`eqc_direct.utils.SysStatus`
  • as dict with following keys:
  • - **status_code**: `int`- the system code after stopping
  • - **status_desc**: `str`- the associated system status description
  • """
  • stop_input = eqc_pb2.LockMessage(lock_id=lock_id)
  • try:
  • stop_resp = self.eqc_stub.StopRunning(stop_input)
  • except _InactiveRpcError as exc:
  • raise InactiveRpcError(
  • "EQC stop_running_process failed due to "
  • "grpc._channel._InactiveRpcError."
  • ) from exc
  • return message_to_dict(stop_resp)
  • [docs]
  • def wait_for_lock(self) -> tuple:
  • """
  • Waits for lock indefinitely calling :func:`acquire_lock`
  • :return: a tuple of the following items:
  • - **lock_id**: `str`- exclusive lock for device execution with a timeout
  • - **start_queue_ts**: `int`- time in ns on which lock was acquired is an int
  • - **end_queue_ts**: `int`- time in ns on which queue for
  • lock ended is an int.
  • """
  • lock_id = ""
  • start_queue_ts = time.time_ns()
  • while lock_id == "":
  • sys_code = self.system_status()["status_code"]
  • # this is based on the error statuses are 3 and above
  • if sys_code >= 3:
  • raise RuntimeError(f"System unavailable status_code: {sys_code}")
  • lock_id = self.acquire_lock()["lock_id"]
  • # only sleep if didn't get lock on device
  • if lock_id == "":
  • time.sleep(1)
  • end_queue_ts = time.time_ns()
  • return lock_id, start_queue_ts, end_queue_ts
  • [docs]
  • def system_version(self) -> dict:
  • """
  • Provides information regarding Dirac server
  • :return: a dict with a single item:
  • - **server_version**: `str` - the current gRPC server version
  • """
  • try:
  • sys_ver_resp = self.eqc_stub.ServerVersion(eqc_pb2.Empty())
  • except _InactiveRpcError as exc:
  • raise InactiveRpcError(
  • "EQC system_version call failed due to inactive grpc channel"
  • ) from exc
  • return message_to_dict(sys_ver_resp)
  • [docs]
  • def process_job( # pylint: disable=R0913
  • self,
  • poly_coefficients: np.ndarray,
  • poly_indices: np.ndarray,
  • num_variables: Optional[int] = None,
  • lock_id: str = "",
  • sum_constraint: Union[int, float] = 10000,
  • relaxation_schedule: int = 2,
  • solution_precision: Optional[float] = None,
  • ) -> dict:
  • """
  • Processes a job by:
  • 1. Submitting job
  • 2. Checks for status, until completes or fails
  • 3. Returns results
  • :param poly_coefficients: coefficient values for the polynomial to be minimized
  • :param poly_indices:
  • list of lists containing polynomial indices associated with
  • coefficient values for problem to be optimized.
  • :param lock_id: a UUID to allow for multi-user processing
  • :param sum_constraint: a normalization constraint that is applied to the
  • problem space that is used to calculate :code:`energy`. This
  • parameter will be rounded if exceeds float32 precision
  • (e.g. 7-decimal places). Value must be between 1 and 10000.
  • :param relaxation_schedule: four different schedules represented
  • in integer parameter. Higher values reduce the variation in
  • the analog spin values and therefore, are more probable to lead to
  • improved objective function energy for input problem.
  • Accepts range of values in set {1, 2, 3, 4}.
  • :param solution_precision: the level of precision to apply to the solutions.
  • This parameter will be rounded if exceeds float32 precision
  • (e.g. 7-decimal places). If specified a distillation method is
  • applied to the continuous solutions to map them to the submitted
  • :code:`solution_precision`. Input :code:`solution_precision` must
  • satisfy :code:`solution_precision` greater than or equal to
  • :code:`sum_constraint`/10000 in order to be valid.
  • Also :code:`sum_constraint` must be divisible by :code:`solution_precision`.
  • If :code:`solution_precision` is not specified no distillation will be
  • applied to the solution derived by the device.
  • :return: dict of results and timings with the following keys:
  • - results: :class:`.EqcResult` dict
  • - start_job_ts: time in ns marking start of job_submission
  • - end_job_ts: time in ns marking end of job submission complete
  • """
  • # Must also change submit_job default set
  • if not solution_precision:
  • solution_precision = 0
  • start_job = time.time_ns()
  • submit_job_resp = self.submit_job(
  • poly_coefficients=poly_coefficients,
  • poly_indices=poly_indices,
  • num_variables=num_variables,
  • lock_id=lock_id,
  • sum_constraint=sum_constraint,
  • relaxation_schedule=relaxation_schedule,
  • solution_precision=solution_precision,
  • )
  • logging.info("Job submitted")
  • if submit_job_resp["err_code"] != 0:
  • err_msg = f"Job submission failed with response: {submit_job_resp}"
  • logging.error(err_msg, exc_info=True)
  • raise RuntimeError(err_msg)
  • sys_code = self.system_status()["status_code"]
  • while sys_code != SysStatus.IDLE["status_code"]:
  • sys_code = self.system_status()["status_code"]
  • # this is based on the error statuses are 3 and above
  • if sys_code > 3:
  • err_msg = f"System unavailable status_code: {sys_code}"
  • logging.error(err_msg, exc_info=True)
  • raise RuntimeError(err_msg)
  • # only sleep if not idle
  • if sys_code != SysStatus.IDLE["status_code"]:
  • time.sleep(1)
  • end_job = time.time_ns()
  • # pull in results after is idle
  • logging.info("Fetching results")
  • job_result = self.fetch_result(lock_id=lock_id)
  • if job_result["err_code"] != 0:
  • raise RuntimeError(
  • f"Job execution error\n"
  • f"err_code: {job_result['err_code']}\n"
  • f"err_desc: {job_result['err_desc']}"
  • )
  • job_result["start_job_ts"] = start_job
  • job_result["end_job_ts"] = end_job
  • return job_result
Next page

Content

  • Source code for eqc_direct.client