Source code for qci_client.utilities

  • """
  • Utilities for Python client for files REST API for optimization service.
  • Copyright 2023, Quantum Computing Incorporated
  • """
  • import gzip
  • from io import BytesIO
  • import json
  • from math import floor
  • import sys
  • from typing import Generator
  • from qci_client import enum, types
  • # We want to limit the memory size of each uploaded chunk to be safely below the max of
  • # 15 * MebiByte (~15MB). See https://git.qci-dev.com/qci-dev/optimization-files-api/.
  • MEMORY_MAX: int = 8 * 1000000 # 8MB
  • def get_post_request_body( # pylint: disable=too-many-branches,too-many-return-statements
  • *, file: dict
  • ) -> types.MetadataPostRequestBody:
  • """
  • Format metadata body.
  • """
  • file_type = enum.get_file_type(file=file)
  • file_config = file["file_config"][file_type.value]
  • optional_fields = {}
  • if "file_name" in file:
  • optional_fields["file_name"] = file["file_name"]
  • if file_type == enum.FileType.CONSTRAINTS:
  • return types.InputMetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.ConstraintsMetadataConfig(
  • constraints=types.ConstraintsMetadata(
  • num_constraints=file_config["num_constraints"],
  • num_variables=file_config["num_variables"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.GRAPH:
  • if "directed" in file_config:
  • optional_fields["directed"] = file_config["directed"]
  • if "multigraph" in file_config:
  • optional_fields["multigraph"] = file_config["multigraph"]
  • if "graph" in file_config:
  • optional_fields["graph"] = file_config["graph"]
  • return types.InputMetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.GraphMetadataConfig(
  • graph=types.GraphMetadata(
  • **optional_fields,
  • num_edges=file_config["num_edges"],
  • num_nodes=file_config["num_nodes"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.HAMILTONIAN:
  • return types.InputMetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.HamiltonianMetadataConfig(
  • hamiltonian=types.HamiltonianMetadata(
  • num_variables=file_config["num_variables"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.OBJECTIVE:
  • return types.InputMetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.ObjectiveMetadataConfig(
  • objective=types.ObjectiveMetadata(
  • num_variables=file_config["num_variables"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.POLYNOMIAL:
  • return types.InputMetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.PolynomialMetadataConfig(
  • polynomial=types.PolynomialMetadata(
  • min_degree=file_config["min_degree"],
  • max_degree=file_config["max_degree"],
  • num_variables=file_config["num_variables"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.QUBO:
  • return types.InputMetadataPostRequestBody(
  • **optional_fields,
  • file_config=types.QuboMetadataConfig(
  • qubo=types.QuboMetadata(
  • num_variables=file_config["num_variables"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.GP_RESULTS:
  • return types.ResultsMetadataPostRequestBody(
  • **optional_fields,
  • user_id=file["user_id"],
  • organization_id=file["organization_id"],
  • file_config=types.GpResultsMetadataConfig(
  • graph_partitioning_results=types.GpResultsMetadata()
  • ),
  • )
  • if file_type == enum.FileType.IHO_RESULTS:
  • return types.ResultsMetadataPostRequestBody(
  • **optional_fields,
  • user_id=file["user_id"],
  • organization_id=file["organization_id"],
  • file_config=types.IhoResultsMetadataConfig(
  • ising_hamiltonian_optimization_results=types.IhoResultsMetadata()
  • ),
  • )
  • if file_type == enum.FileType.NQHO_CONTINUOUS_RESULTS:
  • return types.ResultsMetadataPostRequestBody(
  • **optional_fields,
  • user_id=file["user_id"],
  • organization_id=file["organization_id"],
  • file_config=types.NqhoContinuousResultsMetadataConfig(
  • normalized_qudit_hamiltonian_optimization_continuous_results=types.NqhoContinuousResultsMetadata() # pylint: disable=line-too-long
  • ),
  • )
  • if file_type == enum.FileType.NQHO_INTEGER_RESULTS:
  • return types.ResultsMetadataPostRequestBody(
  • **optional_fields,
  • user_id=file["user_id"],
  • organization_id=file["organization_id"],
  • file_config=types.NqhoIntegerResultsMetadataConfig(
  • normalized_qudit_hamiltonian_optimization_integer_results=types.NqhoIntegerResultsMetadata() # pylint: disable=line-too-long
  • ),
  • )
  • if file_type == enum.FileType.QLCBO_RESULTS:
  • return types.ResultsMetadataPostRequestBody(
  • **optional_fields,
  • user_id=file["user_id"],
  • organization_id=file["organization_id"],
  • file_config=types.QlcboResultsMetadataConfig(
  • quadratic_linearly_constrained_binary_optimization_results=types.QlcboResultsMetadata() # pylint: disable=line-too-long
  • ),
  • )
  • if file_type == enum.FileType.QUBO_RESULTS:
  • return types.ResultsMetadataPostRequestBody(
  • **optional_fields,
  • user_id=file["user_id"],
  • organization_id=file["organization_id"],
  • file_config=types.QuboResultsMetadataConfig(
  • quadratic_unconstrained_binary_optimization_results=types.QuboResultsMetadata() # pylint: disable=line-too-long
  • ),
  • )
  • raise ValueError(f"unsupported file type: '{file_type.value}'")
  • def get_patch_request_body( # pylint: disable=too-many-return-statements
  • *, file: dict
  • ) -> types.PartPatchRequestBody:
  • """Format part body."""
  • file_type = enum.get_file_type(file=file)
  • file_config = file["file_config"][file_type.value]
  • if file_type == enum.FileType.CONSTRAINTS:
  • return types.PartPatchRequestBody(
  • file_config=types.ConstraintsPartConfig(
  • constraints=types.ConstraintsPart(data=file_config["data"])
  • ),
  • )
  • if file_type == enum.FileType.GRAPH:
  • return types.PartPatchRequestBody(
  • file_config=types.GraphPartConfig(
  • graph=types.GraphPart(
  • links=file_config["links"],
  • nodes=file_config["nodes"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.HAMILTONIAN:
  • return types.PartPatchRequestBody(
  • file_config=types.HamiltonianPartConfig(
  • hamiltonian=types.HamiltonianPart(data=file_config["data"])
  • ),
  • )
  • if file_type == enum.FileType.OBJECTIVE:
  • return types.PartPatchRequestBody(
  • file_config=types.ObjectivePartConfig(
  • objective=types.ObjectivePart(data=file_config["data"])
  • ),
  • )
  • if file_type == enum.FileType.POLYNOMIAL:
  • return types.PartPatchRequestBody(
  • file_config=types.PolynomialPartConfig(
  • polynomial=types.PolynomialPart(data=file_config["data"])
  • ),
  • )
  • if file_type == enum.FileType.QUBO:
  • return types.PartPatchRequestBody(
  • file_config=types.QuboPartConfig(
  • qubo=types.QuboPart(data=file_config["data"])
  • ),
  • )
  • if file_type == enum.FileType.GP_RESULTS:
  • return types.PartPatchRequestBody(
  • file_config=types.GpResultsPartConfig(
  • graph_partitioning_results=types.GpResultsPart(
  • balances=file_config["balances"],
  • counts=file_config["counts"],
  • cut_sizes=file_config["cut_sizes"],
  • energies=file_config["energies"],
  • feasibilities=file_config["feasibilities"],
  • partitions=file_config["partitions"],
  • solutions=file_config["solutions"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.IHO_RESULTS:
  • return types.PartPatchRequestBody(
  • file_config=types.IhoResultsPartConfig(
  • ising_hamiltonian_optimization_results=types.IhoResultsPart(
  • counts=file_config["counts"],
  • energies=file_config["energies"],
  • solutions=file_config["solutions"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.NQHO_CONTINUOUS_RESULTS:
  • return types.PartPatchRequestBody(
  • file_config=types.NqhoContinuousResultsPartConfig(
  • normalized_qudit_hamiltonian_optimization_continuous_results=types.NqhoContinuousResultsPart( # pylint: disable=line-too-long
  • counts=file_config["counts"],
  • energies=file_config["energies"],
  • solutions=file_config["solutions"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.NQHO_INTEGER_RESULTS:
  • return types.PartPatchRequestBody(
  • file_config=types.NqhoIntegerResultsPartConfig(
  • normalized_qudit_hamiltonian_optimization_integer_results=types.NqhoIntegerResultsPart( # pylint: disable=line-too-long
  • counts=file_config["counts"],
  • energies=file_config["energies"],
  • solutions=file_config["solutions"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.QLCBO_RESULTS:
  • return types.PartPatchRequestBody(
  • file_config=types.QlcboResultsPartConfig(
  • quadratic_linearly_constrained_binary_optimization_results=types.QlcboResultsPart( # pylint: disable=line-too-long
  • counts=file_config["counts"],
  • energies=file_config["energies"],
  • feasibilities=file_config["feasibilities"],
  • objective_values=file_config["objective_values"],
  • solutions=file_config["solutions"],
  • )
  • ),
  • )
  • if file_type == enum.FileType.QUBO_RESULTS:
  • return types.PartPatchRequestBody(
  • file_config=types.QuboResultsPartConfig(
  • quadratic_unconstrained_binary_optimization_results=types.QuboResultsPart( # pylint: disable=line-too-long
  • counts=file_config["counts"],
  • energies=file_config["energies"],
  • solutions=file_config["solutions"],
  • )
  • ),
  • )
  • raise ValueError(f"unsupported file type: '{file_type.value}'")
  • def zip_payload(*, payload: types.PartPatchRequestBody) -> bytes:
  • """
  • :param payload: str - json contents of file to be zipped
  • :return: zipped request_body
  • """
  • with BytesIO() as fileobj:
  • with gzip.GzipFile(fileobj=fileobj, mode="w", compresslevel=6) as file:
  • file.write(json.dumps(payload).encode("utf-8"))
  • return fileobj.getvalue()
  • def file_part_generator(*, file: dict, compress: bool) -> Generator:
  • """
  • Break file-to-upload's data dictionary into chunks, formatting correctly with each
  • returned chunk.
  • :param file: file to break up into file parts
  • :param compress: whether or not file parts are to be compressed
  • :return: generator of (part_body, part_number) tuples
  • """
  • if compress:
  • # The user has chosen to compress their files for upload we want a large
  • # chunksize to try to maximize compression for each of the chunks.
  • # Prior to merged stack, this value was 200000, which was too big in unit tests.
  • data_chunk_size_max = 20000
  • else:
  • # We are using the multipart upload as a validated sharding system that is
  • # similar to Mongo GridFS. Mongo recommends 256KB for that system, this value
  • # keeps uploaded chunks below this value. After some testing, we decided to
  • # limit this to chunks of 10000 elements for performance reasons.
  • data_chunk_size_max = 10000
  • file_type = enum.get_file_type(file=file)
  • file_config = file["file_config"][file_type.value]
  • if file_type in enum.JOB_INPUTS_NON_GRAPH_FILE_TYPES:
  • return _data_generator(
  • file_type=file_type,
  • file_config=file_config,
  • step_length=data_chunk_size_max,
  • )
  • if file_type == enum.FileType.GRAPH:
  • return _graph_generator(
  • file_type=file_type,
  • file_config=file_config,
  • step_length=data_chunk_size_max,
  • )
  • # For results data, the n^2 sized data is in the solutions field so chunk it up.
  • if file_type in enum.JOB_RESULTS_FILE_TYPES:
  • return _results_generator(
  • file_type=file_type,
  • file_config=file_config,
  • step_length=_compute_results_step_len(file_config["solutions"][0]),
  • )
  • raise ValueError(f"unhandled file_type: {file_type.value}")
  • def _get_size(obj, seen=None) -> int:
  • """
  • Recursively finds size of objects
  • :param obj: data object to recursively compute size of
  • :param seen: takes a set and is used in the recursive step only to record whether an
  • object has been counted yet.
  • :return int:
  • """
  • size = sys.getsizeof(obj)
  • if seen is None:
  • seen = set()
  • obj_id = id(obj)
  • if obj_id in seen:
  • return 0
  • # Important mark as seen *before* entering recursion to gracefully handle
  • # self-referential objects
  • seen.add(obj_id)
  • if isinstance(obj, dict):
  • size += sum(_get_size(v, seen) for v in obj.values())
  • size += sum(_get_size(k, seen) for k in obj.keys())
  • elif hasattr(obj, "__dict__"):
  • size += _get_size(obj.__dict__, seen)
  • elif hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes, bytearray)):
  • size += sum(_get_size(i, seen) for i in obj)
  • return size
  • def _get_soln_size(soln):
  • # Check whether first entry is a graph node/class assignment, e.g.,
  • # {'id': 4, 'class': 2}.
  • if isinstance(soln[0], dict):
  • return _get_size(soln)
  • return sys.getsizeof(soln[0]) * len(soln)
  • def _compute_results_step_len(data: list) -> int:
  • """
  • Compute the step length for "chunking" the provided data.
  • Args:
  • data: A list of data
  • Returns:
  • The step length for "chunking" the data
  • """
  • # total mem size of soln vector
  • soln_mem = _get_soln_size(data)
  • # num_vars * step_len < 30k => step_len < 30k/num_vars
  • chunk_ratio = MEMORY_MAX / soln_mem
  • step_len = max(floor(chunk_ratio), 1)
  • return step_len
  • def _data_generator(
  • *, file_type: enum.FileType, file_config: dict, step_length: int
  • ) -> Generator:
  • # data may be empty, so use max against 1.
  • for part_number, i in enumerate(
  • range(0, max(1, len(file_config["data"])), step_length)
  • ):
  • chunk = {
  • "file_config": {
  • file_type.value: {
  • "data": file_config["data"][i : i + step_length],
  • }
  • }
  • }
  • yield chunk, part_number + 1 # content endpoint has 1-based uploads
  • def _graph_generator(
  • *, file_type: enum.FileType, file_config: dict, step_length: int
  • ) -> Generator:
  • # links and nodes may both be empty, so use max against 1.
  • for part_number, i in enumerate(
  • range(
  • 0,
  • max(1, len(file_config["links"]), len(file_config["nodes"])),
  • step_length,
  • )
  • ):
  • chunk = {
  • "file_config": {
  • file_type.value: {
  • "links": file_config["links"][i : i + step_length],
  • "nodes": file_config["nodes"][i : i + step_length],
  • }
  • }
  • }
  • yield chunk, part_number + 1 # content endpoint has 1-based uploads
  • def _results_generator(
  • *, file_type: enum.FileType, file_config: dict, step_length: int
  • ) -> Generator:
  • for part_number, i in enumerate(
  • range(0, max(1, len(file_config["solutions"])), step_length)
  • ):
  • chunk = {"file_config": {file_type.value: {}}}
  • for key, value in file_config.items():
  • chunk["file_config"][file_type.value][key] = value[i : i + step_length]
  • yield chunk, part_number + 1 # content endpoint has 1-based uploads