Source code for qci_client.data_converter
- """Functions for data conversion."""
 - from math import floor
 - import sys
 - import time
 - from typing import Union
 - import networkx as nx
 - import numpy as np
 - import scipy.sparse as sp
 - from qci_client import enum
 - MEMORY_MAX: int = 8 * 1000000  
 
- [docs]
 - def get_size(obj, seen=None) -> int:
 -     """
 -     Recursively finds size of objects
 -     :param obj: data object to recursively compute size of
 -     :param seen: takes a set and is used in the recursive step only to record whether an object has been counted yet.
 -     :return int:
 -     """
 -     size = sys.getsizeof(obj)
 -     if seen is None:
 -         seen = set()
 -     obj_id = id(obj)
 -     if obj_id in seen:
 -         return 0
 -     
 -     
 -     seen.add(obj_id)
 -     if isinstance(obj, dict):
 -         size += sum(get_size(v, seen) for v in obj.values())
 -         size += sum(get_size(k, seen) for k in obj.keys())
 -     elif hasattr(obj, "__dict__"):
 -         size += get_size(obj.__dict__, seen)
 -     elif hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes, bytearray)):
 -         size += sum(get_size(i, seen) for i in obj)
 -     return size
 
- def _get_soln_size(soln):
 -     
 -     if isinstance(soln[0], dict):
 -         return get_size(soln)
 -     return sys.getsizeof(soln[0]) * len(soln)
 
- [docs]
 - def compute_results_step_len(data: Union[np.ndarray, list]) -> int:
 -     """
 -     Compute the step length for "chunking" the providd data.
 -     Args:
 -         data: An numpy array or list of data
 -     Returns:
 -         The step length for "chunking" the data
 -     """
 -     
 -     soln_mem = _get_soln_size(data)
 -     
 -     chunk_ratio = MEMORY_MAX / soln_mem
 -     step_len = floor(chunk_ratio) if chunk_ratio >= 1 else 1
 -     return step_len
 
- [docs]
 - def data_to_json(file: dict, debug: bool = False) -> dict:
 -     """
 -     Converts data in file input into JSON-serializable dictionary that can be passed to Qatalyst REST API
 -     Args:
 -         file: file dictionary whose data of type numpy.ndarray, scipy.sparse.spmatrix, or networkx.Graph is to be converted
 -         debug: Optional, if set to True, enables debug output (default = False for no debug output)
 -     Returns:
 -         file dictionary with JSON-serializable data
 -     """
 -     start_time_s = time.perf_counter()
 -     supported_file_types = [type.value for type in enum.JOB_INPUTS_FILE_TYPES]
 -     supported_file_types.sort()
 -     supported_file_types = tuple(supported_file_types)
 -     matrix_file_types = [type.value for type in enum.JOB_INPUTS_MATRIX_FILE_TYPES]
 -     matrix_file_types.sort()
 -     matrix_file_types = tuple(matrix_file_types)
 -     file_type = enum.get_file_type(file=file).value
 -     if file_type not in supported_file_types:
 -         raise AssertionError(
 -             f"data conversion not supported for file type '{file_type}', supported "
 -             f"types are {supported_file_types}"
 -         )
 -     data = file['file_config'][file_type]['data']
 -     if file_type == "graph":
 -         if not isinstance(data, nx.Graph):
 -             raise AssertionError("file_type 'graph' data must be a networkx.Graph")
 -         file_config = {
 -             **nx.node_link_data(data),
 -             "num_edges": data.number_of_edges(),
 -             "num_nodes": data.number_of_nodes(),
 -         }
 -     elif file_type in matrix_file_types:
 -         if isinstance(data, nx.Graph):
 -             raise AssertionError(
 -                 f"file_type '{file_type}' data cannot be a networkx.Graph"
 -             )
 -         data_ls = []
 -         if sp.isspmatrix_dok(data):
 -             for idx, val in zip(data.keys(), data.values()):
 -                 
 -                 
 -                 data_ls.append({"i": int(idx[0]), "j": int(idx[1]), "val": float(val)})
 -         elif sp.isspmatrix(data) or isinstance(data, np.ndarray):
 -             data = sp.coo_matrix(data)
 -             for i, j, val in zip(
 -                 data.row.tolist(), data.col.tolist(), data.data.tolist()
 -             ):
 -                 data_ls.append({"i": i, "j": j, "val": val})
 -         else:
 -             raise ValueError(
 -                 f"file_type '{file_type}' only supports types numpy.ndarray and "
 -                 f"scipy.sparse.spmatrix, got {type(data)}"
 -             )
 -         file_config = {"data": data_ls}
 -         rows, cols = data.get_shape()
 -         if file_type == "constraints":
 -             
 -             file_config.update({"num_constraints": rows, "num_variables": cols-1})
 -         else:
 -             
 -             file_config["num_variables"] = rows
 -     else:
 -         
 -         file_config = file["file_config"][file_type]
 -     if debug:
 -         print(f"Time to convert data to json: {time.perf_counter()-start_time_s} s.")
 -     return {
 -         "file_name": file.get("file_name", f"{file_type}.json"),
 -         "file_config": {file_type: file_config}
 -     }