Source code for ansys.eigen.python.grpc.client

"""Python implementation of the gRPC API Eigen Example client."""

import grpc
import numpy as np

import ansys.eigen.python.grpc.constants as constants
import ansys.eigen.python.grpc.generated.grpcdemo_pb2 as grpcdemo_pb2
import ansys.eigen.python.grpc.generated.grpcdemo_pb2_grpc as grpcdemo_pb2_grpc


[docs] class DemoGRPCClient: """Provides the API Eigen Example client class for interacting via gRPC.""" def __init__(self, ip="127.0.0.1", port=50051, timeout=1, test=None): """Initialize connection to the API Eigen server. Parameters ---------- ip : str, optional IP or DNS to which to connect. The default is "127.0.0.1". port : int, optional Port to connect to. The default is 50051. timeout : int, optional Number of seconds to wait before returning a timeout in the connection. The default is 1. test : object, optional Test GRPCDemoStub to connect to. The default is ``None``. This argument is only intended for test purposes. Raises ------ IOError Error if the client was unable to connect to the server. """ # For test purposes, provide a stub directly if test is not None: self._stub = test return self._stub = None self._channel_str = "%s:%d" % (ip, port) self.channel = grpc.insecure_channel(self._channel_str) # Verify connection try: grpc.channel_ready_future(self.channel).result(timeout=timeout) except grpc.FutureTimeoutError: raise IOError("Unable to connect to server at %s" % self._channel_str) # Set up the stub self._stub = grpcdemo_pb2_grpc.GRPCDemoStub(self.channel) print("Connected to server at %s:%d" % (ip, port)) # ================================================================================================= # PUBLIC METHODS for Client operations # =================================================================================================
[docs] def request_greeting(self, name): """Method that requests a greeting from the server. Parameters ---------- name : str Name of the "client". For example, "Michael". """ # Build the greeting request request = grpcdemo_pb2.HelloRequest(name=name) # Send the request response = self._stub.SayHello(request) # Show the server's response print("The server answered: " + response.message)
[docs] def flip_vector(self, vector): """Flip the position of a numpy.ndarray vector such that [A, B, C, D] --> [D, C, B, A]. Parameters ---------- vector : numpy.ndarray Vector to flip. Returns ------- numpy.ndarray Flipped vector. """ # Generate the metadata and the amount of chunks per vector md, chunks = self._generate_md("vectors", "vec", vector) # Build the stream (i.e. generator) vector_gen = self._generate_vector_stream(chunks, vector) # Call the server method and retrieve the result response_iterator = self._stub.FlipVector(vector_gen, metadata=md) # Now, convert to a numpy.ndarray to continue nominal operations (outside the client) nparray = self._read_nparray_from_vector(response_iterator) # Return only the first element (expecting a single vector) return nparray[0]
[docs] def add_vectors(self, *args): """Add numpy.ndarray vectors using the Eigen library on the server side. Returns ------- numpy.ndarray Result of the given numpy.ndarrays. """ # Generate the metadata and the amount of chunks per vector md, chunks = self._generate_md("vectors", "vec", *args) # Build the stream (i.e. generator) vector_iterator = self._generate_vector_stream(chunks, *args) # Call the server method and retrieve the result response_iterator = self._stub.AddVectors(vector_iterator, metadata=md) # Now, convert to a numpy.ndarray to continue nominal operations (outside the client) nparray = self._read_nparray_from_vector(response_iterator) # Return only the first element (expecting a single vector) return nparray[0]
[docs] def multiply_vectors(self, *args): """Multiply numpy.ndarray vectors using the Eigen library on the server side. Returns ------- numpy.ndarray Result of the multiplication of numpy.ndarray vectors. Despite returning a numpy.ndarray, the result only contains one value because it is a dot product. """ # Generate the metadata and the amount of chunks per vector md, chunks = self._generate_md("vectors", "vec", *args) # Build the stream (generator) vector_iterator = self._generate_vector_stream(chunks, *args) # Call the server method and retrieve the result response_iterator = self._stub.MultiplyVectors(vector_iterator, metadata=md) # Convert to a numpy.ndarray to continue nominal operations (outside the client) nparray = self._read_nparray_from_vector(response_iterator) # Return only the first element (expecting a single vector) return nparray[0]
[docs] def add_matrices(self, *args): """Add numpy.ndarray matrices using the Eigen library on the server side. Returns ------- numpy.ndarray Resulting numpy.ndarray of the matrices addition. """ # Generate the metadata and the amount of chunks per Matrix md, chunks = self._generate_md("matrices", "mat", *args) # Build the stream (i.e. generator) matrix_iterator = self._generate_matrix_stream(chunks, *args) # Call the server method and retrieve the result response_iterator = self._stub.AddMatrices(matrix_iterator, metadata=md) # Now, convert to a numpy.ndarray to continue nominal operations (outside the client) nparray = self._read_nparray_from_matrix(response_iterator) # Return only the first element (expecting a single matrix) return nparray[0]
[docs] def multiply_matrices(self, *args): """Multiply numpy.ndarray matrices using the Eigen library on the server side. Returns ------- numpy.ndarray Resulting numpy.ndarray of the matrices' multiplication. """ # Generate the metadata and the amount of chunks per matrix md, chunks = self._generate_md("matrices", "mat", *args) # Build the stream (i.e. generator) matrix_iterator = self._generate_matrix_stream(chunks, *args) # Call the server method and retrieve the result response_iterator = self._stub.MultiplyMatrices(matrix_iterator, metadata=md) # Convert to a numpy.ndarray to continue nominal operations (outside the client) nparray = self._read_nparray_from_matrix(response_iterator) # Return only the first element (expecting a single matrix) return nparray[0]
# ================================================================================================= # PRIVATE METHODS for Client operations # ================================================================================================= def _generate_md(self, message_type: str, abbrev: str, *args: np.ndarray): # Initialize the metadata and the chunks list for each full message md = [] chunks = [] # Find how many arguments are transmitting md.append(("full-" + message_type, str(len(args)))) # Loop over all input arguments idx = 1 for arg in args: # Perform some argument input sanity checks if message_type == "vectors" and abbrev == "vec": self._sanity_check_vector(arg) elif message_type == "matrices" and abbrev == "mat": self._sanity_check_matrix(arg) else: raise RuntimeError("Invalid usage of _generate_md function.") # Check the size of the arrays # If size is surpassed, determine chunks needed if arg.nbytes > constants.MAX_CHUNKSIZE: # Determine how many chunks are needed # # Max amount of elements per chunk max_elems = constants.MAX_CHUNKSIZE // arg.itemsize # Bulk number of chunks needed bulk_chunks = arg.size // max_elems # The remainder amount of elements (if any) remainder = arg.size % max_elems # This list provides the last index up to which to # process in each partial vector or matrix message last_idx_chunk = [] for i in range(1, bulk_chunks + 1): last_idx_chunk.append(i * max_elems) # Take into account that if there is a remainder, include # one last partial vector or matrix message if remainder != 0: last_idx_chunk.append(arg.size) # Append the results md.append((abbrev + str(idx) + "-messages", str(len(last_idx_chunk)))) chunks.append(last_idx_chunk) else: # Otherwise deal with a single message... Append results. md.append((abbrev + str(idx) + "-messages", str(1))) chunks.append([arg.size]) # Increase idx by 1 idx += 1 # Return the metadata and the chunks list for each vector or matrix return md, chunks def _generate_vector_stream(self, chunks: "list[list[int]]", *args: np.ndarray): # Loop over all input arguments for arg, vector_chunks in zip(args, chunks): # Perform some argument input sanity checks self._sanity_check_vector(arg) # If sanity checks are fine... yield the corresponding vector message # # Loop over the chunk indices processed_idx = 0 for last_idx_chunk in vector_chunks: # Use tmp_idx in yield function and update the processed_idx afterwards tmp_idx = processed_idx processed_idx = last_idx_chunk # Yield! yield grpcdemo_pb2.Vector( data_type=constants.NP_DTYPE_TO_DATATYPE[arg.dtype.type], vector_size=arg.shape[0], vector_as_chunk=arg[tmp_idx:last_idx_chunk].tobytes(), ) def _generate_matrix_stream(self, chunks: "list[list[int]]", *args: np.ndarray): # Loop over all input arguments for arg, matrix_chunks in zip(args, chunks): # Perform some argument input sanity checks. self._sanity_check_matrix(arg) # If sanity checks are fine... yield the corresponding matrix message # # When dealing with matrices, ravel it to a 1D array (avoids copy) arg_as_vec = arg.ravel() # Loop over the chunk indices processed_idx = 0 for last_idx_chunk in matrix_chunks: # Use tmp_idx in yield function and update the processed_idx afterwards tmp_idx = processed_idx processed_idx = last_idx_chunk # Yield yield grpcdemo_pb2.Matrix( data_type=constants.NP_DTYPE_TO_DATATYPE[arg.dtype.type], matrix_rows=arg.shape[0], matrix_cols=arg.shape[1], matrix_as_chunk=arg_as_vec[tmp_idx:last_idx_chunk].tobytes(), ) def _read_nparray_from_vector(self, response_iterator): # Get the metadata response_md = response_iterator.initial_metadata() # Parse the server's metadata full_msg, chunks_per_msg = self._parse_server_metadata(response_md) # Initialize the output list resulting_vectors = [] # Start processing messages independently for msg in range(full_msg): # Init the resulting numpy.ndarray to None, its size and its type result = None result_size = 0 result_dtype = None # Loop over the available chunks per message for chunk_idx in range(chunks_per_msg[msg]): # Read a message vector = next(response_iterator) # If it is the first chunk being processed, parse dtype and size if chunk_idx == 0: if vector.data_type == grpcdemo_pb2.DataType.Value("INTEGER"): result_dtype = np.int32 elif vector.data_type == grpcdemo_pb2.DataType.Value("DOUBLE"): result_dtype = np.float64 result_size = vector.vector_size # Parse the chunk if result is None: result = np.frombuffer(vector.vector_as_chunk, dtype=result_dtype) else: tmp = np.frombuffer(vector.vector_as_chunk, dtype=result_dtype) result = np.concatenate((result, tmp)) # Check if the final vector has the desired size if result.size != result_size: raise RuntimeError("Problems reading server full Vector message...") else: # If everything is fine, append to resulting_vectors list resulting_vectors.append(result) # Return the resulting_vectors list return resulting_vectors def _read_nparray_from_matrix(self, response_iterator): # Get the metadata response_md = response_iterator.initial_metadata() # Parse the server's metadata full_msg, chunks_per_msg = self._parse_server_metadata(response_md) # Initialize the output list resulting_matrices = [] # Start processing messages independently for msg in range(full_msg): # Init the resulting numpy.ndarray to None, its size (rows,cols), and its type result = None result_rows = 0 result_cols = 0 result_dtype = None # Loop over the available chunks per message for chunk_idx in range(chunks_per_msg[msg]): # Read a message matrix = next(response_iterator) # If it is the first chunk being processing, parse dtype and size (rows,cols) if chunk_idx == 0: if matrix.data_type == grpcdemo_pb2.DataType.Value("INTEGER"): result_dtype = np.int32 elif matrix.data_type == grpcdemo_pb2.DataType.Value("DOUBLE"): result_dtype = np.float64 result_rows = matrix.matrix_rows result_cols = matrix.matrix_cols # Parse the chunk if result is None: result = np.frombuffer(matrix.matrix_as_chunk, dtype=result_dtype) else: tmp = np.frombuffer(matrix.matrix_as_chunk, dtype=result_dtype) result = np.concatenate((result, tmp)) # Check if the final matrix has the desired size if result.size != result_rows * result_cols: raise RuntimeError("Problems reading server full matrix message...") else: # If everything is fine, append to resulting_matrices list resulting_matrices.append( np.reshape( result, ( result_rows, result_cols, ), ) ) # Return the resulting_matrices list return resulting_matrices def _sanity_check_vector(self, arg): # Perform some argument input sanity checks. if type(arg) is not np.ndarray: raise RuntimeError("Invalid argument. Only numpy.ndarrays are allowed.") elif arg.dtype.type not in constants.NP_DTYPE_TO_DATATYPE.keys(): raise RuntimeError( "Invalid argument. Only numpy.ndarrays of type int32 and float64 are allowed." ) elif arg.ndim != 1: raise RuntimeError("Invalid argument. Only 1D numpy.ndarrays are allowed.") def _sanity_check_matrix(self, arg): # Perform some argument input sanity checks. if type(arg) is not np.ndarray: raise RuntimeError("Invalid argument. Only numpy.ndarrays are allowed.") elif arg.dtype.type not in constants.NP_DTYPE_TO_DATATYPE.keys(): raise RuntimeError( "Invalid argument. Only numpy.ndarrays of type int32 and float64 are allowed." ) elif arg.ndim != 2: raise RuntimeError("Invalid argument. Only 2D numpy.ndarrays are allowed.") def _parse_server_metadata(self, response_md: "list[tuple]"): # Init the return variables: amount of full messages received # and partial messages per full message full_msg = 0 chunks_per_msg = [] # Find out how many full messages are to be processed for md in response_md: if md[0] == "full-vectors" or md[0] == "full-matrices": full_msg = int(md[1]) # Identify the chunks per message (only if successful previously) if full_msg != 0: for i in range(1, full_msg + 1): for md in response_md: if md[0] == "vec%d-messages" % i or md[0] == "mat%d-messages" % i: chunks_per_msg.append(int(md[1])) return full_msg, chunks_per_msg