FoundryProductsTechnologyCompanyInvestor relationsResource libraryNews
Contact us
Resource library
    Resource library home
    Developer resources
      Entropy quantum optimization
        Dirac Quick Start
        Dirac-3 Developer Beginner Guide
        Multibody formulation
        QUBO formulation
        QLCBO formulation
        QBoost formulation
        qci-client software package
          Getting Started
          Basic Usage
          qci-client
          Dependencies
          Index
          Module Index
        eqc-direct software package
        eqc-models software package
      Quantum random number generation
      Reservoir computing
    Applications
    Lessons
    Research and publications
    Support

Couldn’t find what you are looking for? Reach out to technical support.

Contact support
Privacy PolicyCookie PolicyTerms of UseForward Looking StatementsAccessibility Statement
Terms and Conditions of SaleEnd User License Agreement

© 2018-2026 Quantum Computing Inc.

Download

Default

Source code for qci_client.optimization.utilities

  • # Copyright 2023-2024, Quantum Computing Incorporated
  • """Utilities, especially for creating request bodies."""
  • import gzip
  • from io import BytesIO
  • import json
  • from math import floor
  • import sys
  • from typing import Generator, Tuple
  • from qci_client.optimization import types
  • from qci_client.optimization import enum
  • # We want to limit the memory size of each uploaded chunk to be safely below the max of
  • # 15 * MebiByte (~15MB). See https://git.qci-dev.com/qci-dev/optimization-files-api/.
  • MEMORY_MAX: int = 8 * 1000000 # 8MB
  • [docs]
  • def get_file_type(*, file: dict) -> enum.FileType:
  • """Get file type from a file."""
  • file_config_keys = list(file["file_config"].keys())
  • if len(file_config_keys) != 1:
  • raise ValueError(
  • "improper number of files specified in file_config (should be exactly one)"
  • )
  • return enum.FileType(file_config_keys[0])
  • [docs]
  • def get_file_config(*, file: dict) -> Tuple[dict, enum.FileType]:
  • """Get file configuration and file type from a file."""
  • file_type = get_file_type(file=file)
  • return file["file_config"][file_type.value], file_type
  • [docs]
  • def get_post_request_body(*, file: dict) -> types.MetadataPostRequestBody:
  • """Format metadata body."""
  • file_config, file_type = get_file_config(file=file)
  • optional_fields = {}
  • if "file_name" in file:
  • optional_fields["file_name"] = file["file_name"]
  • if file_type == enum.FileType.CONSTRAINTS:
  • return types.MetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.ConstraintsMetadataConfig(
  • constraints=types.ConstraintsMetadata(
  • num_constraints=file_config["num_constraints"],
  • num_variables=file_config["num_variables"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.GRAPH:
  • if "directed" in file_config:
  • optional_fields["directed"] = file_config["directed"]
  • if "multigraph" in file_config:
  • optional_fields["multigraph"] = file_config["multigraph"]
  • if "graph" in file_config:
  • optional_fields["graph"] = file_config["graph"]
  • return types.MetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.GraphMetadataConfig(
  • graph=types.GraphMetadata(
  • **optional_fields,
  • num_edges=file_config["num_edges"],
  • num_nodes=file_config["num_nodes"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.HAMILTONIAN:
  • return types.MetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.HamiltonianMetadataConfig(
  • hamiltonian=types.HamiltonianMetadata(
  • num_variables=file_config["num_variables"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.OBJECTIVE:
  • return types.MetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.ObjectiveMetadataConfig(
  • objective=types.ObjectiveMetadata(
  • num_variables=file_config["num_variables"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.POLYNOMIAL:
  • return types.MetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.PolynomialMetadataConfig(
  • polynomial=types.PolynomialMetadata(
  • min_degree=file_config["min_degree"],
  • max_degree=file_config["max_degree"],
  • num_variables=file_config["num_variables"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.QUBO:
  • return types.MetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.QuboMetadataConfig(
  • qubo=types.QuboMetadata(
  • num_variables=file_config["num_variables"],
  • )
  • ),
  • )
  • raise ValueError(f"unsupported/non-input file type: '{file_type.value}'")
  • [docs]
  • def get_patch_request_body(*, file: dict) -> types.PartPatchRequestBody:
  • """Format part body."""
  • file_config, file_type = get_file_config(file=file)
  • if file_type == enum.FileType.CONSTRAINTS:
  • return types.PartPatchRequestBody(
  • file_config=types.ConstraintsPartConfig(
  • constraints=types.ConstraintsPart(data=file_config["data"])
  • ),
  • )
  • if file_type == enum.FileType.GRAPH:
  • return types.PartPatchRequestBody(
  • file_config=types.GraphPartConfig(
  • graph=types.GraphPart(
  • links=file_config["links"],
  • nodes=file_config["nodes"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.HAMILTONIAN:
  • return types.PartPatchRequestBody(
  • file_config=types.HamiltonianPartConfig(
  • hamiltonian=types.HamiltonianPart(data=file_config["data"])
  • ),
  • )
  • if file_type == enum.FileType.OBJECTIVE:
  • return types.PartPatchRequestBody(
  • file_config=types.ObjectivePartConfig(
  • objective=types.ObjectivePart(data=file_config["data"])
  • ),
  • )
  • if file_type == enum.FileType.POLYNOMIAL:
  • return types.PartPatchRequestBody(
  • file_config=types.PolynomialPartConfig(
  • polynomial=types.PolynomialPart(data=file_config["data"])
  • ),
  • )
  • if file_type == enum.FileType.QUBO:
  • return types.PartPatchRequestBody(
  • file_config=types.QuboPartConfig(
  • qubo=types.QuboPart(data=file_config["data"])
  • ),
  • )
  • raise ValueError(f"unsupported/non-input file type: '{file_type.value}'")
  • [docs]
  • def zip_payload(*, payload: types.PartPatchRequestBody) -> bytes:
  • """
  • :param payload: str - json contents of file to be zipped
  • :return: zipped request_body
  • """
  • with BytesIO() as fileobj:
  • with gzip.GzipFile(fileobj=fileobj, mode="w", compresslevel=6) as file:
  • file.write(json.dumps(payload).encode("utf-8"))
  • return fileobj.getvalue()
  • [docs]
  • def file_part_generator(*, file: dict, compress: bool) -> Generator:
  • """
  • Break file-to-upload's data dictionary into chunks, formatting correctly with each
  • returned chunk.
  • :param file: file to break up into file parts
  • :param compress: whether or not file parts are to be compressed
  • :return: generator of (part_body, part_number) tuples
  • """
  • if compress:
  • # The user has chosen to compress their files for upload we want a large
  • # chunksize to try to maximize compression for each of the chunks.
  • # Prior to merged stack, this value was 200000, which was too big in unit tests.
  • data_chunk_size_max = 20000
  • else:
  • # We are using the multipart upload as a validated sharding system that is
  • # similar to Mongo GridFS. Mongo recommends 256KB for that system, this value
  • # keeps uploaded chunks below this value. After some testing, we decided to
  • # limit this to chunks of 10000 elements for performance reasons.
  • data_chunk_size_max = 10000
  • file_config, file_type = get_file_config(file=file)
  • if file_type in enum.FILE_TYPES_JOB_INPUTS_NON_GRAPH:
  • return _data_generator(
  • file_type=file_type,
  • file_config=file_config,
  • step_length=data_chunk_size_max,
  • )
  • if file_type == enum.FileType.GRAPH:
  • return _graph_generator(
  • file_type=file_type,
  • file_config=file_config,
  • step_length=data_chunk_size_max,
  • )
  • # For results data, the n^2 sized data is in the solutions field so chunk it up.
  • if file_type in enum.FILE_TYPES_JOB_RESULTS:
  • return _results_generator(
  • file_type=file_type,
  • file_config=file_config,
  • step_length=_compute_results_step_len(data=file_config["solutions"][0]),
  • )
  • raise ValueError(f"unsupported file type: {file_type.value}")
  • def _get_size(*, obj, seen=None) -> int:
  • """
  • Recursively finds size of objects
  • :param obj: data object to recursively compute size of
  • :param seen: takes a set and is used in the recursive step only to record whether an
  • object has been counted yet.
  • :return int:
  • """
  • size = sys.getsizeof(obj)
  • if seen is None:
  • seen = set()
  • obj_id = id(obj)
  • if obj_id in seen:
  • return 0
  • # Important mark as seen *before* entering recursion to gracefully handle
  • # self-referential objects
  • seen.add(obj_id)
  • if isinstance(obj, dict):
  • size += sum(_get_size(obj=v, seen=seen) for v in obj.values())
  • size += sum(_get_size(obj=k, seen=seen) for k in obj.keys())
  • elif hasattr(obj, "__dict__"):
  • size += _get_size(obj=obj.__dict__, seen=seen)
  • elif hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes, bytearray)):
  • size += sum(_get_size(obj=i, seen=seen) for i in obj)
  • return size
  • def _get_soln_size(*, soln):
  • # Check whether first entry is a graph node/class assignment, e.g.,
  • # {'id': 4, 'class': 2}.
  • if isinstance(soln[0], dict):
  • return _get_size(obj=soln)
  • return sys.getsizeof(soln[0]) * len(soln)
  • def _compute_results_step_len(*, data: list) -> int:
  • """
  • Compute the step length for "chunking" the provided data.
  • Args:
  • data: A list of data
  • Returns:
  • The step length for "chunking" the data
  • """
  • # total mem size of soln vector
  • soln_mem = _get_soln_size(soln=data)
  • # num_vars * step_len < 30k => step_len < 30k/num_vars
  • chunk_ratio = MEMORY_MAX / soln_mem
  • step_len = max(floor(chunk_ratio), 1)
  • return step_len
  • def _data_generator(
  • *, file_type: enum.FileType, file_config: dict, step_length: int
  • ) -> Generator:
  • # data may be empty, so use max against 1.
  • for part_number, i in enumerate(
  • range(0, max(1, len(file_config["data"])), step_length)
  • ):
  • chunk = {
  • "file_config": {
  • file_type.value: {
  • "data": file_config["data"][i : i + step_length],
  • }
  • }
  • }
  • yield chunk, part_number + 1 # content endpoint has 1-based uploads
  • def _graph_generator(
  • *, file_type: enum.FileType, file_config: dict, step_length: int
  • ) -> Generator:
  • # links and nodes may both be empty, so use max against 1.
  • for part_number, i in enumerate(
  • range(
  • 0,
  • max(1, len(file_config["links"]), len(file_config["nodes"])),
  • step_length,
  • )
  • ):
  • chunk = {
  • "file_config": {
  • file_type.value: {
  • "links": file_config["links"][i : i + step_length],
  • "nodes": file_config["nodes"][i : i + step_length],
  • }
  • }
  • }
  • yield chunk, part_number + 1 # content endpoint has 1-based uploads
  • def _results_generator(
  • *, file_type: enum.FileType, file_config: dict, step_length: int
  • ) -> Generator:
  • for part_number, i in enumerate(
  • range(0, max(1, len(file_config["solutions"])), step_length)
  • ):
  • chunk = {"file_config": {file_type.value: {}}}
  • for key, value in file_config.items():
  • chunk["file_config"][file_type.value][key] = value[i : i + step_length]
  • yield chunk, part_number + 1 # content endpoint has 1-based uploads
Previous page

Content

  • Source code for qci_client.optimization.utilities