Source code for qci_client.optimization.client
- """Client for QCi's optimization API."""
- import concurrent.futures
- import time
- from typing import Optional, Union
- import warnings
- import requests
- from requests.adapters import HTTPAdapter, Retry
- from requests.compat import urljoin
- from requests_futures.sessions import FuturesSession
- from qci_client.utilities import log_to_console, now_utc_ms
- import qci_client.auth.client
- from qci_client.optimization import utilities
- from qci_client.optimization.data_converter import data_to_json
- from qci_client.optimization import enum
- from qci_client.utilities import raise_for_status
- RESULTS_CHECK_INTERVAL_S = 2.5
- RETRY_TOTAL = 7
- BACKOFF_FACTOR = 2
- STATUS_FORCELIST = [502, 503, 504]
- [docs]
- class OptimizationClient:  
-     """Used to run optimization jobs against QCi hardware."""
-     def __init__(  
-         self,
-         *,
-         url: Optional[str] = None,
-         api_token: Optional[str] = None,
-         timeout: Optional[float] = None,
-         max_workers: int = 8,
-         compress: bool = False,
-     ):
-         """
-         Provides access to QCi's public API for running optimization problems on Dirac
-         devices, including file uploads/downloads and submitting/retrieving entire jobs.
-         :param url: url basepath to API endpoint, including scheme, if None, then falls
-             back to QCI_API_URL environment variable
-         :param api_token: refresh token for authenticating to API, if None, then falls
-             back to QCI_TOKEN environment variable
-         :param timeout: number of seconds before timing out requests, None waits
-             indefinitely
-         :param max_workers: number of threads for concurrent file download calls
-         :param compress: compress file metadata and parts before uploading
-         """
-         
-         self._auth_client = qci_client.auth.client.AuthClient(
-             url=url, api_token=api_token, timeout=timeout
-         )
-         self._max_workers = max_workers
-         self._compress = compress
-         
-         self._session = requests.Session()
-         self._session.mount(
-             "https://",
-             HTTPAdapter(
-                 max_retries=Retry(
-                     total=RETRY_TOTAL,
-                     backoff_factor=BACKOFF_FACTOR,
-                     status_forcelist=STATUS_FORCELIST,
-                 )
-             ),
-         )
-     @property
-     def url(self) -> str:
-         """Return API URL."""
-         return self._auth_client.url
-     @property
-     def api_token(self) -> str:
-         """Return API token."""
-         return self._auth_client.api_token
-     @property
-     def timeout(self) -> Optional[float]:
-         """Return timeout setting."""
-         return self._auth_client.timeout
-     @property
-     def max_workers(self) -> int:
-         """Return maximum number of concurrent workers for file operations."""
-         return self._max_workers
-     @property
-     def compress(self) -> bool:
-         """Return file compression usage flag."""
-         return self._compress
-     @property
-     def jobs_url(self):
-         """Get jobs URL."""
-         return urljoin(self.url, "optimization/v1/jobs")
- [docs]
-     def get_job_id_url(self, *, job_id: str) -> str:
-         """Get job URL with job ID."""
-         return f"{self.jobs_url}/{job_id}"
- [docs]
-     def get_job_status_url(self, *, job_id: str) -> str:
-         """Get job-status URL using job ID."""
-         return f"{self.get_job_id_url(job_id=job_id)}/status"
- [docs]
-     def get_job_metrics_url(self, job_id: str) -> str:
-         """Get legacy job-metrics URL using job ID."""
-         return f"{self.get_job_id_url(job_id=job_id)}/metrics"
- [docs]
-     def get_job_metrics_v1_url(self, job_id: str) -> str:
-         """Get v1 job-metrics URL using job ID."""
-         return f"{self.get_job_id_url(job_id=job_id)}/metrics/v1"
- [docs]
-     def get_job_metrics_v2_url(self, job_id: str) -> str:
-         """Get v2 job-metrics URL using job ID."""
-         return f"{self.get_job_id_url(job_id=job_id)}/metrics/v2"
- [docs]
-     def get_job_allocations_url(self) -> str:
-         """Get job-allocations URL."""
-         return f"{self.jobs_url}/allocations"
-     @property
-     def files_url(self):
-         """Get files URL."""
-         return urljoin(self.url, "optimization/v1/files")
- [docs]
-     def get_file_id_url(self, *, file_id: str) -> str:
-         """Get file URL with file ID."""
-         return f"{self.files_url}/{file_id}"
- [docs]
-     def get_file_contents_url(self, *, file_id: str, part_number: int) -> str:
-         """Get file contents URL with file ID and file part number."""
-         return f"{self.get_file_id_url(file_id=file_id)}/contents/{str(part_number)}"
- [docs]
-     def upload_file(self, *, file: dict) -> dict:
-         """
-         Upload file (metadata and then parts concurrently). Returns dict with file ID.
-         """
-         
-         
-         file = data_to_json(file=file)
-         with FuturesSession(max_workers=self.max_workers) as session:
-             session.mount(
-                 "https://",
-                 HTTPAdapter(
-                     max_retries=Retry(
-                         total=RETRY_TOTAL,
-                         backoff_factor=BACKOFF_FACTOR,
-                         status_forcelist=STATUS_FORCELIST,
-                     )
-                 ),
-             )
-             post_response_future = session.post(
-                 self.files_url,
-                 headers=self._auth_client.headers_without_connection_close,
-                 timeout=self.timeout,
-                 json=utilities.get_post_request_body(file=file),
-             )
-             for response_future in concurrent.futures.as_completed(
-                 [post_response_future], self.timeout
-             ):
-                 response = response_future.result()
-                 raise_for_status(response=response)
-             file_id = response.json()["file_id"]
-             file_part_generator = utilities.file_part_generator(
-                 file=file, compress=self.compress
-             )
-             patch_response_futures = []
-             if self.compress:
-                 for part_body, part_number in file_part_generator:
-                     patch_response_futures.append(
-                         session.patch(
-                             self.get_file_contents_url(
-                                 file_id=file_id, part_number=part_number
-                             ),
-                             headers=self._auth_client.headers_without_connection_close,
-                             timeout=self.timeout,
-                             data=utilities.zip_payload(
-                                 payload=utilities.get_patch_request_body(file=part_body)
-                             ),
-                         )
-                     )
-             else:
-                 for part_body, part_number in file_part_generator:
-                     patch_response_futures.append(
-                         session.patch(
-                             self.get_file_contents_url(
-                                 file_id=file_id, part_number=part_number
-                             ),
-                             headers=self._auth_client.headers_without_connection_close,
-                             timeout=self.timeout,
-                             json=utilities.get_patch_request_body(file=part_body),
-                         )
-                     )
-             
-             
-             for response_future in concurrent.futures.as_completed(
-                 patch_response_futures
-             ):
-                 raise_for_status(response=response_future.result())
-         return {"file_id": file_id}
- [docs]
-     def download_file(self, *, file_id: str) -> dict:
-         """Download file (metadata and then parts concurrently)."""
-         
-         
-         with FuturesSession(max_workers=self.max_workers) as session:
-             session.mount(
-                 "https://",
-                 HTTPAdapter(
-                     max_retries=Retry(
-                         total=RETRY_TOTAL,
-                         backoff_factor=BACKOFF_FACTOR,
-                         status_forcelist=STATUS_FORCELIST,
-                     )
-                 ),
-             )
-             get_response_future = session.get(
-                 self.get_file_id_url(file_id=file_id),
-                 headers=self._auth_client.headers_without_connection_close,
-                 timeout=self.timeout,
-             )
-             for response_future in concurrent.futures.as_completed(
-                 [get_response_future], self.timeout
-             ):
-                 response = response_future.result()
-                 raise_for_status(response=response_future.result())
-             
-             file = {**response.json()}
-             
-             file.pop("last_accessed_rfc3339")
-             file.pop("upload_date_rfc3339")
-             get_response_futures = [
-                 session.get(
-                     self.get_file_contents_url(
-                         file_id=file_id, part_number=part_number
-                     ),
-                     headers=self._auth_client.headers_without_connection_close,
-                     timeout=self.timeout,
-                 )
-                 for part_number in range(1, file["num_parts"] + 1)
-             ]
-             
-             for response_future in concurrent.futures.as_completed(
-                 get_response_futures
-             ):
-                 raise_for_status(response=response_future.result())
-             
-             for response_future in get_response_futures:
-                 file_part = response_future.result().json()
-                 
-                 for file_type, file_type_config in file_part["file_config"].items():
-                     if file_type not in file["file_config"]:
-                         file["file_config"][file_type] = {}
-                     for key, value in file_type_config.items():
-                         if key not in file["file_config"][file_type]:
-                             file["file_config"][file_type][key] = []
-                         file["file_config"][file_type][key] += value
-         return file
- [docs]
-     def submit_job(self, *, job_body: dict) -> dict:
-         """
-         Submit a job via a request to QCi's optimization API.
-         Args:
-             job_body: formatted json body that includes all parameters for the job
-         Returns:
-             Response from POST call to API (see :meth:`get_job_results`)
-         """
-         response = self._session.post(
-             self.jobs_url,
-             json=job_body,
-             headers=self._auth_client.headers_without_connection_close,
-             timeout=self.timeout,
-         )
-         raise_for_status(response=response)
-         return response.json()
- [docs]
-     def cancel_job(self, *, job_id: str) -> dict:
-         """
-         Cancel a job via a PATCH request to QCi's optimization API.
-         Only SUBMITTED, QUEUED, and RUNNING jobs will be successfully cancelled, raising
-         an exception otherwise.
-         Args:
-             job_id: ID of job
-         Returns:
-             Response from PATCH call to API
-         """
-         response = self._session.patch(
-             self.get_job_id_url(job_id=job_id),
-             json={
-                 "job_status": {
-                     "cancelled_at_rfc3339nano": now_utc_ms(),
-                 },
-             },
-             headers=self._auth_client.headers_without_connection_close,
-             timeout=self.timeout,
-         )
-         raise_for_status(response=response)
-         return response.json()
- [docs]
-     def get_job_results(self, *, job_id: str) -> dict:
-         """
-         Get job_info, status, and results of a (possibly uncompleted) job by its ID.
-         Args:
-             job_id: ID of job
-         Returns:
-             Dictionary with latest job_info, status, and results.
-         """
-         job_info = self.get_job_response(job_id=job_id)
-         status = enum.JobStatus(self.get_job_status(job_id=job_id)["status"])
-         if status == enum.JobStatus.COMPLETED:
-             
-             file = self.download_file(file_id=job_info["job_result"]["file_id"])
-             results = utilities.get_file_config(file=file)[0]
-         else:
-             results = None
-         return {"job_info": job_info, "status": status.value, "results": results}
- [docs]
-     def get_job_status(self, *, job_id: str) -> dict:
-         """
-         Get the status of a job by its ID.
-         Args:
-             job_id: ID of job
-         Returns:
-             Response from GET call to API
-         """
-         response = self._session.get(
-             self.get_job_status_url(job_id=job_id),
-             headers=self._auth_client.headers_without_connection_close,
-             timeout=self.timeout,
-         )
-         raise_for_status(response=response)
-         return response.json()
- [docs]
-     def get_job_metrics(self, *, job_id: str) -> dict:
-         """
-         Get the metrics for a job by its ID. v2 metrics are tried first, then
-         legacy/v1 metrics.
-         Args:
-             job_id: ID of job
-         Returns:
-             Response from GET call to API
-         """
-         
-         response = self._session.get(
-             self.get_job_metrics_v2_url(job_id=job_id),
-             headers=self._auth_client.headers_without_connection_close,
-             timeout=self.timeout,
-         )
-         
-         if response.status_code == requests.codes["not_found"]:
-             response = self._session.get(
-                 self.get_job_metrics_url(job_id=job_id),  
-                 headers=self._auth_client.headers_without_connection_close,
-                 timeout=self.timeout,
-             )
-         raise_for_status(response=response)
-         
-         
-         
-         if response.json().get("job_metrics") is None:
-             response = self._session.get(
-                 self.get_job_metrics_url(job_id=job_id),  
-                 headers=self._auth_client.headers_without_connection_close,
-                 timeout=self.timeout,
-             )
-             raise_for_status(response=response)
-         return response.json()
- [docs]
-     def get_job_response(self, *, job_id: str) -> dict:
-         """
-         Get a response for a job by id, which may/may not be finished.
-         :param job_id: ID of job
-         :return dict: json response
-         """
-         response = self._session.get(
-             self.get_job_id_url(job_id=job_id),
-             headers=self._auth_client.headers_without_connection_close,
-             timeout=self.timeout,
-         )
-         raise_for_status(response=response)
-         return response.json()
- [docs]
-     def get_allocations(self) -> dict:
-         """
-         Get allocations for running jobs on different device classes.
-         :return dict: json response
-         """
-         response = self._session.get(
-             self.get_job_allocations_url(),
-             headers=self._auth_client.headers_without_connection_close,
-             timeout=self.timeout,
-         )
-         raise_for_status(response=response)
-         return response.json()
- [docs]
-     def build_job_body(  
-         self,
-         *,
-         job_type: Union[enum.JobType, str],
-         job_params: dict,
-         qubo_file_id: Optional[str] = None,
-         graph_file_id: Optional[str] = None,
-         hamiltonian_file_id: Optional[str] = None,
-         objective_file_id: Optional[str] = None,
-         constraints_file_id: Optional[str] = None,
-         polynomial_file_id: Optional[str] = None,
-         job_name: Optional[str] = None,
-         job_tags: Optional[list] = None,
-     ) -> dict:
-         """
-         Constructs body for job submission requests.
-         Args:
-             job_type: an enum.JobType or one of the string values defined in enum.JobType
-             job_params: dict of params to be passed to job submission in "params" key
-             qubo_file_id: file id from files API for uploaded qubo
-             graph_file_id: file id from files API for uploaded graph
-             hamiltonian_file_id: file id from files API for uploaded hamiltonian
-             objective_file_id: file id from files API for uploaded objective
-             constraints_file_id: file id from files API for uploaded constraints
-             polynomial_file_id: file id from files API for uploaded polynomial
-             job_name: user specified name for job submission
-             job_tags: user specified labels for classifying and filtering user jobs after submission
-         Returns:
-             None
-         """
-         problem_config = {}
-         device_config = {}
-         
-         job_type = enum.JobType(job_type)
-         device_type_param = job_params.get("device_type")
-         if device_type_param is None:
-             raise ValueError(
-                 "no 'device_type' specified in job_params, must be one of "
-                 f"{enum.DEVICE_TYPES_SORTED}"
-             )
-         
-         device_type = enum.DeviceType(device_type_param)
-         num_samples = job_params.get("num_samples")
-         if num_samples is not None:
-             
-             device_config["num_samples"] = num_samples
-         if job_type == enum.JobType.GRAPH_PARTITIONING:
-             if device_type not in enum.DEVICE_TYPES_QUBIT:
-                 raise ValueError(
-                     f"{job_type.value} not supported on {device_type.value}"
-                 )
-             problem_type = enum.ProblemType.GP
-             if not graph_file_id:
-                 raise AssertionError(
-                     "graph_file_id must be specified for the given job_type "
-                     f"'{job_type.value}'"
-                 )
-             problem_config["graph_file_id"] = graph_file_id
-             if "num_paritions" in job_params:
-                 
-                 problem_config["num_paritions"] = job_params["num_paritions"]
-             if "alpha" in job_params:
-                 
-                 problem_config["alpha"] = job_params["alpha"]
-             if "gamma" in job_params:
-                 
-                 problem_config["gamma"] = job_params["gamma"]
-         elif job_type == enum.JobType.SAMPLE_CONTRAINT:
-             if device_type not in enum.DEVICE_TYPES_QUBIT:
-                 raise ValueError(
-                     f"{job_type.value} not supported on {device_type.value}"
-                 )
-             problem_type = enum.ProblemType.QLCBO
-             if not constraints_file_id:
-                 raise AssertionError(
-                     "At least constraints_file_id must be specified for job_type "
-                     f"'{job_type.value}'"
-                 )
-             problem_config["constraints_file_id"] = constraints_file_id
-             problem_config["objective_file_id"] = objective_file_id  
-             if "alpha" in job_params:
-                 
-                 problem_config["alpha"] = job_params["alpha"]
-             if "atol" in job_params:
-                 
-                 problem_config["atol"] = job_params["atol"]
-         elif job_type == enum.JobType.SAMPLE_HAMILTONIAN:
-             if device_type not in enum.DEVICE_TYPES_NORMALIZED_QUDIT:
-                 raise ValueError(
-                     f"{job_type.value} not supported on {device_type.value}"
-                 )
-             problem_type = enum.ProblemType.NQHO
-             
-             device_type = enum.DeviceType.DIRAC3_NORMALIZED_QUDIT
-             if "solution_precision" in job_params:
-                 
-                 warnings.warn(
-                     "the 'solution_precision' key is no longer supported as is ignored, instead "
-                     "client-side postprocessing can be used to distill continuous solutions"
-                 )
-             if "sum_constraint" in job_params:
-                 
-                 device_config["sum_constraint"] = job_params["sum_constraint"]
-             if "relaxation_schedule" in job_params:
-                 
-                 device_config["relaxation_schedule"] = job_params["relaxation_schedule"]
-             if "mean_photon_number" in job_params:
-                 
-                 device_config["mean_photon_number"] = job_params["mean_photon_number"]
-             if "quantum_fluctuation_coefficient" in job_params:
-                 
-                 device_config["quantum_fluctuation_coefficient"] = job_params[
-                     "quantum_fluctuation_coefficient"
-                 ]
-             if (not hamiltonian_file_id and not polynomial_file_id) or (
-                 hamiltonian_file_id and polynomial_file_id
-             ):
-                 raise AssertionError(
-                     "exactly one of hamiltonian_file_id or polynomial_file_id must be "
-                     f"specified for job_type='{job_type.value}'"
-                 )
-             if hamiltonian_file_id:
-                 problem_config["hamiltonian_file_id"] = (
-                     hamiltonian_file_id  
-                 )
-             else:
-                 problem_config["polynomial_file_id"] = polynomial_file_id
-         elif job_type == enum.JobType.SAMPLE_HAMILTONIAN_INTEGER:
-             if device_type not in enum.DEVICE_TYPES_QUDIT:
-                 raise ValueError(
-                     f"{job_type.value} not supported on {device_type.value}"
-                 )
-             problem_type = enum.ProblemType.QHO
-             
-             device_type = enum.DeviceType.DIRAC3_QUDIT
-             if "num_levels" not in job_params:
-                 raise AssertionError("num_levels is a required field")
-             device_config["num_levels"] = job_params["num_levels"]
-             if "relaxation_schedule" in job_params:
-                 
-                 device_config["relaxation_schedule"] = job_params["relaxation_schedule"]
-             if "mean_photon_number" in job_params:
-                 
-                 device_config["mean_photon_number"] = job_params["mean_photon_number"]
-             if "quantum_fluctuation_coefficient" in job_params:
-                 
-                 device_config["quantum_fluctuation_coefficient"] = job_params[
-                     "quantum_fluctuation_coefficient"
-                 ]
-             if (not hamiltonian_file_id and not polynomial_file_id) or (
-                 hamiltonian_file_id and polynomial_file_id
-             ):
-                 raise AssertionError(
-                     "exactly one of hamiltonian_file_id or polynomial_file_id must be "
-                     f"specified for job_type='{job_type.value}'"
-                 )
-             if hamiltonian_file_id:
-                 problem_config["hamiltonian_file_id"] = (
-                     hamiltonian_file_id  
-                 )
-             else:
-                 problem_config["polynomial_file_id"] = polynomial_file_id
-         elif job_type == enum.JobType.SAMPLE_HAMILTONIAN_ISING:
-             if device_type not in enum.DEVICE_TYPES_QUBIT:
-                 raise ValueError(
-                     f"{job_type.value} not supported on {device_type.value}"
-                 )
-             problem_type = enum.ProblemType.IHO
-             if (not hamiltonian_file_id and not polynomial_file_id) or (
-                 hamiltonian_file_id and polynomial_file_id
-             ):
-                 raise AssertionError(
-                     "exactly one of hamiltonian_file_id or polynomial_file_id must be "
-                     f"specified for job_type='{job_type.value}'"
-                 )
-             if hamiltonian_file_id:
-                 problem_config["hamiltonian_file_id"] = (
-                     hamiltonian_file_id  
-                 )
-             else:
-                 problem_config["polynomial_file_id"] = polynomial_file_id
-         elif job_type == enum.JobType.SAMPLE_QUBO:
-             if device_type not in enum.DEVICE_TYPES_QUBIT:
-                 raise ValueError(
-                     f"{job_type.value} not supported on {device_type.value}"
-                 )
-             problem_type = enum.ProblemType.QUBO
-             if not qubo_file_id:
-                 raise AssertionError(
-                     f"qubo_file_id must be specified for job_type '{job_type.value}'"
-                 )
-             problem_config["qubo_file_id"] = qubo_file_id
-         else:
-             raise ValueError(f"unsupported job_type '{job_type.value}'")
-         job_submission: dict = {
-             "problem_config": {problem_type.value: problem_config},
-             "device_config": {device_type.value: device_config},
-         }
-         if job_name is not None:
-             
-             job_submission["job_name"] = job_name
-         if job_tags is not None:
-             
-             job_submission["job_tags"] = job_tags
-         return {"job_submission": job_submission}
- [docs]
-     def process_job(
-         self, *, job_body: dict, wait: bool = True, verbose: bool = True
-     ) -> dict:
-         """
-         :param job_body: formatted json dict for body of job submission request
-         :param wait: wait synchronously for job to complete
-         :param verbose: track operations' progress using timestamped console logs
-         :return:
-             if wait is True, then dict with job_info, status, and results and results
-                 fields (results is None if job is not successfully COMPLETED)
-             if wait is False, then response dict from submitted job, which includes
-                 job_id for subsequent retrieval (see :meth:`get_job_results`)
-         """
-         dirac_allocation = self.get_allocations()["allocations"]["dirac"]
-         dirac_allocation_s = dirac_allocation["seconds"]
-         log = f"Dirac allocation balance = {dirac_allocation_s} s"
-         if not dirac_allocation["metered"]:
-             log += " (unmetered)"
-         log_to_console(log=log, verbose=verbose)
-         submit_job_response = self.submit_job(job_body=job_body)
-         job_id = submit_job_response["job_id"]
-         log_to_console(log=f"Job submitted: job_id='{job_id}'", verbose=verbose)
-         if wait:
-             status = enum.JobStatus.SUBMITTED
-             while status not in enum.JOB_STATUSES_FINAL:
-                 latest_status = enum.JobStatus(
-                     self.get_job_status(job_id=job_id)["status"]
-                 )
-                 if latest_status != status:
-                     status = latest_status
-                     log_to_console(log=status.value, verbose=verbose)
-                 time.sleep(RESULTS_CHECK_INTERVAL_S)
-             dirac_allocation = self.get_allocations()["allocations"]["dirac"]
-             dirac_allocation_s = dirac_allocation["seconds"]
-             log = f"Dirac allocation balance = {dirac_allocation_s} s"
-             if not dirac_allocation["metered"]:
-                 log += " (unmetered)"
-             log_to_console(log=log, verbose=verbose)
-             return self.get_job_results(job_id=job_id)
-         return submit_job_response
- [docs]
-     def list_files(self) -> dict:
-         """
-         List files (metadata only).
-         :return: dict containing list of files
-         """
-         response = self._session.get(
-             self.files_url,
-             headers=self._auth_client.headers_without_connection_close,
-             timeout=self.timeout,
-         )
-         raise_for_status(response=response)
-         return response.json()