Source code for emucore_direct.client
- """
 - Client gRPC services for running on FPGA reservoir computer EmuCore developed by QCi.
 - """
 - import json
 - import os
 - import time
 - from typing import List, TypedDict
 - import grpc
 - from grpc._channel import _InactiveRpcError
 - import numpy as np
 - from emucore_direct import emucore_pb2, emucore_pb2_grpc
 - from emucore_direct.utils import (
 -     bytes_to_array,
 -     message_to_dict,
 -     prep_input_data,
 -     set_filter_coefficients,
 - )
 - grpc_service_config = json.dumps(
 -     {
 -         "methodConfig": [
 -             {
 -                 "name": [{"service": "EmuCore.EmuCoreService"}],
 -                 "retryPolicy": {
 -                     "maxAttempts": 5, 
 -                     "initialBackoff": "0.2s",
 -                     "maxBackoff": "10s",
 -                     "backoffMultiplier": 2.5,
 -                     "retryableStatusCodes": ["UNAVAILABLE"],
 -                 },
 -             }
 -         ]
 -     }
 - )
 - MAX_INPUT_SIZE = 20*1024*1024
 
- [docs]
 - class StatusMessage(TypedDict):
 -     """
 -     Structure of responses for configuration requests to EmuCore device.
 -     
 -     :param status: the status of the request
 -     :param message: a description for the recieved status
 -     """
 -     status: int
 -     message: str
 
- [docs]
 - class InactiveRpcError(Exception):
 -     """Custom exception wrapper around grpc._channel._InactiveRpcError."""
 
- [docs]
 - class EmuCoreClient:
 -     """
 -     Provides services for accessing EmuCore server
 -     :param ip_addr: the IP address of the gRPC server
 -     :param port:
 -         The port that the RPC server is running on
 -     :param max_data_size: int
 -         The max send and recieve message length for RPC server
 -     .. note::
 -        :code:`lock_id` is used by a variety of class functions.
 -        It is set to an empty string by default since default for device server
 -        :code:`lock_id` is also an empty string. This allows for single user
 -        processing without having to acquire a device lock.
 -     .. All GRPC calls follow a specific pattern:
 -     ..   1. Fill in data to be sent in message stub
 -     ..   2. Send data using stub service method
 -     ..   3. Parse response
 -     """
 -     def __init__(
 -         self,
 -         ip_addr: str = os.getenv("DEVICE_IP_ADDRESS", "localhost"),
 -         port: str = os.getenv("DEVICE_PORT", "50051"),
 -         max_data_size: int =512 * 1024 * 1024,
 -     ):
 -         self._ip_add_port = ip_addr + ":" + port
 -         self._channel_opt = [
 -             ("grpc.max_send_message_length", max_data_size),
 -             ("grpc.max_receive_message_length", max_data_size),
 -             ("grpc.service_config", grpc_service_config),
 -         ]
 -         self.channel = grpc.insecure_channel(
 -             self._ip_add_port, options=self._channel_opt
 -         )
 -         self.stub = emucore_pb2_grpc.EmuCoreServiceStub(self.channel)
 
- [docs]
 -     def check_lock(self, lock_id: str = "") -> dict:
 -         """
 -         Checks if submitted :code:`lock_id` has execution lock on the device
 -         :param lock_id: a UUID which will be checked to determine if has exclusive
 -             device execution lock
 -         :return: a dict with following keys:
 -            - **status_code**: `int`- status code for lock check
 -            - **status_desc**: `str`- a description for the associated status code
 -         """
 -         check_input = emucore_pb2.lock_message(lock_id=lock_id)
 -         check_output = self.stub.check_lock(check_input)
 -         return message_to_dict(check_output)
 
- [docs]
 -     def reservoir_reset(self,
 -                         lock_id) -> StatusMessage:
 -         """
 -         Resets a reservoir instance by clearing RAM on the server
 -         :param lock_id: a lock_id which has an active reserve on the device
 -         
 -         :return: 
 -            dictionary with with values from members of
 -            :class:`emucore_direct.types.StatusResponses`
 -         """
 -         reset_message = emucore_pb2.lock_message(lock_id=lock_id)
 -         return message_to_dict(self.stub.reservoir_reset(reset_message))
 
- [docs]
 -     def system_info(self) -> dict:
 -         """
 -         Provides system info on call
 -         
 -         :return: 
 -            dict with following keys:
 -            - **system_name**: `str`- product name
 -            - **system_version**: `str`- server version
 -            
 -         """
 -         sys_info_resp = self.stub.system_info(emucore_pb2.empty_message())
 -         return message_to_dict(sys_info_resp)
 
- [docs]
 -     def acquire_lock(self) -> dict:
 -         """
 -         Attempts to acquire exclusive lock for submitting jobs
 -         :return:
 -            a member of :class:`emucore_direct.types.LockManageStatus` as a dict along
 -            with an additional key :code:`lock_id`:
 -            - **lock_id**: `str`- if acquired the current device `lock_id`
 -              else empty string
 -            - **status**: `int`- status code for lock id acquisition
 -            - **message**: `str`- a description for the associated status code
 -         """
 -         try:
 -             acquire_lock_resp = self.stub.acquire_lock(emucore_pb2.empty_message())
 -         except _InactiveRpcError as exc:
 -             raise InactiveRpcError(
 -                 "acquire_lock failed due to grpc._channel._InactiveRpcError."
 -             ) from exc
 -         return message_to_dict(acquire_lock_resp)
 
- [docs]
 -     def release_lock(self, lock_id):
 -         """
 -         Releases exclusive lock for submitting data to reservoir
 -         :param lock_id: a UUID with currently acquired exclusive device lock
 -         :return: 
 -            a dict with the following keys:
 -            - **lock_released**: `bool`- if released is True else False
 -            - **message**: `str`- a description of release operation result
 -         """
 -         release_input = emucore_pb2.lock_message(lock_id = lock_id)
 -         try:
 -             release_lock_resp = self.stub.release_lock(release_input)
 -         except _InactiveRpcError as exc:
 -             raise InactiveRpcError(
 -                 "release_lock failed due to grpc._channel._InactiveRpcError."
 -             ) from exc
 -         return message_to_dict(release_lock_resp)
 
- [docs]
 -     def rc_config(self,
 -                lock_id: str,
 -                vbias: float,
 -                gain: float,
 -                num_nodes: int,
 -                num_taps: int) -> StatusMessage:
 -         """
 -         Configures reservoir model and how data will be processed by the reservoir.
 -         :param lock_id: a lock_id which has an active reserve on the device
 -         :param vbias: bias to apply to each node in reservoir. Range for parameter [0,1].
 -         :param gain: memory setting for system how long should inputs effect reservoir similar
 -                to beta in adaptive gradient descent range for parameter [0,1]
 -         :param num_nodes: the total number of hidden nodes to instantiate within the reservoir,
 -                a single hidden layer
 -         :param num_taps: number of connections in reservoir, generally should be set to less
 -            than the number of nodes in reservoir. Defines interconnection between nodes.
 -         :return: dictionary with with values from one of the members of
 -               :class:`emucore_direct.types.StatusResponses`           
 -         """
 -         filter_coefs = set_filter_coefficients(num_taps=num_taps)
 -         config_message = emucore_pb2.rc_config_message(lock_id=lock_id,
 -                                                    vbias = vbias,
 -                                                    gain = gain,
 -                                                    num_nodes = num_nodes,
 -                                                    num_taps = num_taps)
 -         rc_status = self.stub.rc_config(config_message)
 -         return message_to_dict(rc_status)
 
- [docs]
 -     def rc_run(self,
 -                lock_id: str,
 -                reservoir_input: List[int],):
 -         """
 -         Runs a series of data through the reservoir and returns response from device
 -         based on current reservoir configuration.
 -         :param lock_id: a lock_id which has an active reserve on the device
 -         :param reservoir_input: a list of digitized values to input to the reservoir must be
 -                less than MAX_INPUT_SIZE
 -         :return: a dictionary with the folowing keys:
 -            - **status**: `int`- the status for the reservoir submission
 -            - **message**: `str`- a description of the status for the submission
 -            - **states**: `bytes`- response from reservoir as bytes.
 -         """
 -         assert MAX_INPUT_SIZE>=len(reservoir_input), \
 -             f"Input to reservoir must be of less than or equal to {MAX_INPUT_SIZE} was {len(reservoir_input)}"
 -         run_message = emucore_pb2.rc_run_message(lock_id=lock_id,
 -                                                    input = reservoir_input,)
 -         rc_response = self.stub.rc_run(run_message)
 -         return message_to_dict(rc_response)
 
- [docs]
 -     def wait_for_lock(self):
 -         """
 -         Waits for lock indefinitely calling :func:`acquire_lock`
 -         :return: a tuple of the following items:
 -            - **lock_id**: `str`- exclusive lock for device execution with a timeout
 -            - **start_queue_ts**: `int`- time in ns when began lock acquisition.
 -            - **end_queue_ts**: `int`- time in ns when lock was acquired.
 -         """
 -         lock_id = ""
 -         start_queue_ts = time.time_ns()
 -         while lock_id == "":
 -             lock_id = self.acquire_lock()["lock_id"]
 -             
 -             if lock_id == "":
 -                 time.sleep(1)
 -         end_queue_ts = time.time_ns()
 -         return lock_id, start_queue_ts, end_queue_ts        
 
- [docs]
 -     def process_all_data(self,
 -                          lock_id: str,
 -                          input_data: np.ndarray,
 -                          num_nodes: int,
 -                          density: float,
 -                          feature_scaling: float,
 -                          max_scale_val: float=None,
 -                          weights: np.ndarray=None,
 -                          seed_val_weights: int=13):
 -         """
 -         Run dataset through reservoir:
 -         1. Get lock
 -         2. Apply scaling and random weights mask to input data
 -         3. Run data through reservoir
 -         4. Combine data from reservoir responses and reshape based on number of nodes
 -         5. Release lock
 -         :param lock_id: a UUID that currently has lock on the device
 -         :param input_data: data or series to process via reservoir
 -         :param num_nodes: the total number of hidden nodes to instantiate within the reservoir,
 -             a single hidden layer (this is also used to apply random weights to to the data
 -             as well as reshape data recieved from reservoir back to correct output dimension)
 -         :param feature_scaling: after applying max abs scalar feature scaling factor applied 
 -         :param max_scale_val: max absolute value used to scale data if provided
 -         :param seed_val_weights: seeds randomness for weigths to allow for reproducibility
 -         :note: if doing multiple runs without reset the max value mustn't exceed original
 -             data max value in order for results to be processed properly. 
 -         :return:
 -            a tuple of the following elements:
 -            
 -            - **reservoir_response**: `np.ndarray`- reservoir response represented as an array dimension of array will be nrows of input matrix by num nodes.
 -            - **max_scale_value**: `np.ndarray`- the scaling value that was applied to the input data before it was processed by the reservoir.
 -            - **weights**: `np.ndarray`- the weights that were used to apply the random mask to the data prior to being processed by the reservoir.
 -         """
 -         n_rows, n_cols = input_data.shape
 -         input_packets, max_scale_val, weights = prep_input_data(
 -             input_data=input_data,
 -             num_nodes=num_nodes,
 -             density=density,
 -             feature_scaling=feature_scaling,
 -             max_scale_val=max_scale_val,
 -             weights=weights,
 -             seed_val_weights=seed_val_weights)
 -         try:
 -             reservoir_resp = np.array([])
 -             for i in input_packets:
 -                 packet_resp = self.rc_run(
 -                     lock_id=lock_id,
 -                     reservoir_input=i)
 -                 packet_bytes = bytearray(np.array(packet_resp["states"]))            
 -                 packet_arr = bytes_to_array(input_bytes=packet_bytes)
 -                 reservoir_resp = np.concatenate((reservoir_resp, packet_arr),axis=0)
 -         except Exception as err:
 -             print("ERROR OCCURRED", err)
 -             raise RuntimeError("Error while processing data") from err
 -         return np.array(reservoir_resp).reshape(n_rows,num_nodes), max_scale_val, weights