FoundryProductsTechnologyCompanyInvestor relationsResource libraryNews
Contact us
Resource library
    Introduction to Dirac-3 home
    Dirac-3 Developer Beginner Guide
    Dirac-3 user guide
    Dirac-3 spec sheet
    Entropy Quantum Computing overview
    Multibody formulation
    qci-client software package
      Getting Started
      Basic Usage
      qci-client
      Dependencies
      Index
      Module Index
    eqc-direct software package

Couldn’t find what you are looking for? Reach out to technical support.

Contact support
Privacy PolicyCookie PolicyTerms of UseForward Looking StatementsAccessibility Statement
Terms and Conditions of SaleEnd User License Agreement

© 2018-2026 Quantum Computing Inc.

Download

Source code for qci_client.optimization.utilities

  • # Copyright 2023-2024, Quantum Computing Incorporated
  • """Utilities, especially for creating request bodies."""
  • import gzip
  • from io import BytesIO
  • import json
  • from math import floor
  • import sys
  • from typing import Generator, Tuple
  • from qci_client.optimization import types
  • from qci_client.optimization import enum
  • # We want to limit the memory size of each uploaded chunk to be safely below the max of
  • # 15 * MebiByte (~15MB). See https://git.qci-dev.com/qci-dev/optimization-files-api/.
  • MEMORY_MAX: int = 8 * 1000000 # 8MB
  • [docs]
  • def get_file_type(*, file: dict) -> enum.FileType:
  • """Get file type from a file."""
  • file_config_keys = list(file["file_config"].keys())
  • if len(file_config_keys) != 1:
  • raise ValueError(
  • "improper number of files specified in file_config (should be exactly one)"
  • )
  • return enum.FileType(file_config_keys[0])
  • [docs]
  • def get_file_config(*, file: dict) -> Tuple[dict, enum.FileType]:
  • """Get file configuration and file type from a file."""
  • file_type = get_file_type(file=file)
  • return file["file_config"][file_type.value], file_type
  • [docs]
  • def get_post_request_body(*, file: dict) -> types.MetadataPostRequestBody:
  • """Format metadata body."""
  • file_config, file_type = get_file_config(file=file)
  • optional_fields = {}
  • if "file_name" in file:
  • optional_fields["file_name"] = file["file_name"]
  • if file_type == enum.FileType.CONSTRAINTS:
  • return types.MetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.ConstraintsMetadataConfig(
  • constraints=types.ConstraintsMetadata(
  • num_constraints=file_config["num_constraints"],
  • num_variables=file_config["num_variables"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.GRAPH:
  • if "directed" in file_config:
  • optional_fields["directed"] = file_config["directed"]
  • if "multigraph" in file_config:
  • optional_fields["multigraph"] = file_config["multigraph"]
  • if "graph" in file_config:
  • optional_fields["graph"] = file_config["graph"]
  • return types.MetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.GraphMetadataConfig(
  • graph=types.GraphMetadata(
  • **optional_fields,
  • num_edges=file_config["num_edges"],
  • num_nodes=file_config["num_nodes"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.HAMILTONIAN:
  • return types.MetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.HamiltonianMetadataConfig(
  • hamiltonian=types.HamiltonianMetadata(
  • num_variables=file_config["num_variables"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.OBJECTIVE:
  • return types.MetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.ObjectiveMetadataConfig(
  • objective=types.ObjectiveMetadata(
  • num_variables=file_config["num_variables"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.POLYNOMIAL:
  • return types.MetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.PolynomialMetadataConfig(
  • polynomial=types.PolynomialMetadata(
  • min_degree=file_config["min_degree"],
  • max_degree=file_config["max_degree"],
  • num_variables=file_config["num_variables"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.QUBO:
  • return types.MetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.QuboMetadataConfig(
  • qubo=types.QuboMetadata(
  • num_variables=file_config["num_variables"],
  • )
  • ),
  • )
  • raise ValueError(f"unsupported/non-input file type: '{file_type.value}'")
  • [docs]
  • def get_patch_request_body(*, file: dict) -> types.PartPatchRequestBody:
  • """Format part body."""
  • file_config, file_type = get_file_config(file=file)
  • if file_type == enum.FileType.CONSTRAINTS:
  • return types.PartPatchRequestBody(
  • file_config=types.ConstraintsPartConfig(
  • constraints=types.ConstraintsPart(data=file_config["data"])
  • ),
  • )
  • if file_type == enum.FileType.GRAPH:
  • return types.PartPatchRequestBody(
  • file_config=types.GraphPartConfig(
  • graph=types.GraphPart(
  • links=file_config["links"],
  • nodes=file_config["nodes"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.HAMILTONIAN:
  • return types.PartPatchRequestBody(
  • file_config=types.HamiltonianPartConfig(
  • hamiltonian=types.HamiltonianPart(data=file_config["data"])
  • ),
  • )
  • if file_type == enum.FileType.OBJECTIVE:
  • return types.PartPatchRequestBody(
  • file_config=types.ObjectivePartConfig(
  • objective=types.ObjectivePart(data=file_config["data"])
  • ),
  • )
  • if file_type == enum.FileType.POLYNOMIAL:
  • return types.PartPatchRequestBody(
  • file_config=types.PolynomialPartConfig(
  • polynomial=types.PolynomialPart(data=file_config["data"])
  • ),
  • )
  • if file_type == enum.FileType.QUBO:
  • return types.PartPatchRequestBody(
  • file_config=types.QuboPartConfig(
  • qubo=types.QuboPart(data=file_config["data"])
  • ),
  • )
  • raise ValueError(f"unsupported/non-input file type: '{file_type.value}'")
  • [docs]
  • def zip_payload(*, payload: types.PartPatchRequestBody) -> bytes:
  • """
  • :param payload: str - json contents of file to be zipped
  • :return: zipped request_body
  • """
  • with BytesIO() as fileobj:
  • with gzip.GzipFile(fileobj=fileobj, mode="w", compresslevel=6) as file:
  • file.write(json.dumps(payload).encode("utf-8"))
  • return fileobj.getvalue()
  • [docs]
  • def file_part_generator(*, file: dict, compress: bool) -> Generator:
  • """
  • Break file-to-upload's data dictionary into chunks, formatting correctly with each
  • returned chunk.
  • :param file: file to break up into file parts
  • :param compress: whether or not file parts are to be compressed
  • :return: generator of (part_body, part_number) tuples
  • """
  • if compress:
  • # The user has chosen to compress their files for upload we want a large
  • # chunksize to try to maximize compression for each of the chunks.
  • # Prior to merged stack, this value was 200000, which was too big in unit tests.
  • data_chunk_size_max = 20000
  • else:
  • # We are using the multipart upload as a validated sharding system that is
  • # similar to Mongo GridFS. Mongo recommends 256KB for that system, this value
  • # keeps uploaded chunks below this value. After some testing, we decided to
  • # limit this to chunks of 10000 elements for performance reasons.
  • data_chunk_size_max = 10000
  • file_config, file_type = get_file_config(file=file)
  • if file_type in enum.FILE_TYPES_JOB_INPUTS_NON_GRAPH:
  • return _data_generator(
  • file_type=file_type,
  • file_config=file_config,
  • step_length=data_chunk_size_max,
  • )
  • if file_type == enum.FileType.GRAPH:
  • return _graph_generator(
  • file_type=file_type,
  • file_config=file_config,
  • step_length=data_chunk_size_max,
  • )
  • # For results data, the n^2 sized data is in the solutions field so chunk it up.
  • if file_type in enum.FILE_TYPES_JOB_RESULTS:
  • return _results_generator(
  • file_type=file_type,
  • file_config=file_config,
  • step_length=_compute_results_step_len(data=file_config["solutions"][0]),
  • )
  • raise ValueError(f"unsupported file type: {file_type.value}")
  • def _get_size(*, obj, seen=None) -> int:
  • """
  • Recursively finds size of objects
  • :param obj: data object to recursively compute size of
  • :param seen: takes a set and is used in the recursive step only to record whether an
  • object has been counted yet.
  • :return int:
  • """
  • size = sys.getsizeof(obj)
  • if seen is None:
  • seen = set()
  • obj_id = id(obj)
  • if obj_id in seen:
  • return 0
  • # Important mark as seen *before* entering recursion to gracefully handle
  • # self-referential objects
  • seen.add(obj_id)
  • if isinstance(obj, dict):
  • size += sum(_get_size(obj=v, seen=seen) for v in obj.values())
  • size += sum(_get_size(obj=k, seen=seen) for k in obj.keys())
  • elif hasattr(obj, "__dict__"):
  • size += _get_size(obj=obj.__dict__, seen=seen)
  • elif hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes, bytearray)):
  • size += sum(_get_size(obj=i, seen=seen) for i in obj)
  • return size
  • def _get_soln_size(*, soln):
  • # Check whether first entry is a graph node/class assignment, e.g.,
  • # {'id': 4, 'class': 2}.
  • if isinstance(soln[0], dict):
  • return _get_size(obj=soln)
  • return sys.getsizeof(soln[0]) * len(soln)
  • def _compute_results_step_len(*, data: list) -> int:
  • """
  • Compute the step length for "chunking" the provided data.
  • Args:
  • data: A list of data
  • Returns:
  • The step length for "chunking" the data
  • """
  • # total mem size of soln vector
  • soln_mem = _get_soln_size(soln=data)
  • # num_vars * step_len < 30k => step_len < 30k/num_vars
  • chunk_ratio = MEMORY_MAX / soln_mem
  • step_len = max(floor(chunk_ratio), 1)
  • return step_len
  • def _data_generator(
  • *, file_type: enum.FileType, file_config: dict, step_length: int
  • ) -> Generator:
  • # data may be empty, so use max against 1.
  • for part_number, i in enumerate(
  • range(0, max(1, len(file_config["data"])), step_length)
  • ):
  • chunk = {
  • "file_config": {
  • file_type.value: {
  • "data": file_config["data"][i : i + step_length],
  • }
  • }
  • }
  • yield chunk, part_number + 1 # content endpoint has 1-based uploads
  • def _graph_generator(
  • *, file_type: enum.FileType, file_config: dict, step_length: int
  • ) -> Generator:
  • # links and nodes may both be empty, so use max against 1.
  • for part_number, i in enumerate(
  • range(
  • 0,
  • max(1, len(file_config["links"]), len(file_config["nodes"])),
  • step_length,
  • )
  • ):
  • chunk = {
  • "file_config": {
  • file_type.value: {
  • "links": file_config["links"][i : i + step_length],
  • "nodes": file_config["nodes"][i : i + step_length],
  • }
  • }
  • }
  • yield chunk, part_number + 1 # content endpoint has 1-based uploads
  • def _results_generator(
  • *, file_type: enum.FileType, file_config: dict, step_length: int
  • ) -> Generator:
  • for part_number, i in enumerate(
  • range(0, max(1, len(file_config["solutions"])), step_length)
  • ):
  • chunk = {"file_config": {file_type.value: {}}}
  • for key, value in file_config.items():
  • chunk["file_config"][file_type.value][key] = value[i : i + step_length]
  • yield chunk, part_number + 1 # content endpoint has 1-based uploads
Previous page

Content

  • Source code for qci_client.optimization.utilities