# Copyright (C) 2023 - 2025 ANSYS, Inc. and/or its affiliates.# SPDX-License-Identifier: MIT### Permission is hereby granted, free of charge, to any person obtaining a copy# of this software and associated documentation files (the "Software"), to deal# in the Software without restriction, including without limitation the rights# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell# copies of the Software, and to permit persons to whom the Software is# furnished to do so, subject to the following conditions:## The above copyright notice and this permission notice shall be included in all# copies or substantial portions of the Software.## THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE# SOFTWARE."""Python implementation of the gRPC API Eigen example server."""fromconcurrentimportfuturesimportloggingimportclickimportdemo_eigen_wrapperimportgrpcimportnumpyasnpimportansys.eigen.python.grpc.constantsasconstantsimportansys.eigen.python.grpc.generated.grpcdemo_pb2asgrpcdemo_pb2importansys.eigen.python.grpc.generated.grpcdemo_pb2_grpcasgrpcdemo_pb2_grpc# =================================================================================================# AUXILIARY METHODS for Server operations# =================================================================================================
[docs]defcheck_data_type(dtype,new_dtype):"""Check if the new data type is the same as the previous data type. Parameters ---------- dtype : numpy.type Type of the numpy array before processing. new_dtype : numpy.type Type of the numpy array to be processed. Returns ------- numpy.type Type of the numpy array. Raises ------ RuntimeError In case there is already a type and it does not match that of the new_type argument. """ifdtypeisNone:returnnew_dtypeelifdtype!=new_dtype:raiseRuntimeError("Error while processing data types... Input arguments are of different nature (such as int32, float64).")else:returndtype
[docs]defcheck_size(size,new_size):"""Check if the new parsed size is the same as the previous size. Parameters ---------- size : tuple Size of the numpy array before processing. new_size : _type_ Size of the numpy array to process. Returns ------- tuple Size of the numpy array. Raises ------ RuntimeError In case there is already a size and it does not match that of the new_size argument. """ifsizeisNone:returnnew_sizeelifsize!=new_size:raiseRuntimeError("Error while processing data types... Input arguments are of different sizes.")else:returnsize
[docs]classGRPCDemoServicer(grpcdemo_pb2_grpc.GRPCDemoServicer):"""Provides methods that implement functionality of the API Eigen Example server."""def__init__(self)->None:"""No special init is required for the server... unless data is to be stored in a DB. This is to be determined."""# TODO : is it required to store the input vectors in a DB?super().__init__()# =================================================================================================# PUBLIC METHODS for Server operations# =================================================================================================
[docs]defSayHello(self,request,context):"""Test the greeter method to see if the server is up and running correctly. Parameters ---------- request : HelloRequest Greeting request sent by the client. context : grpc.ServicerContext gRPC-specific information. Returns ------- grpcdemo_pb2.HelloReply Reply to greeting by the server. """click.echo("Greeting requested! Requested by: "+request.name)# Inform about the size of the message contentclick.echo("Size of message: "+constants.human_size(request))returngrpcdemo_pb2.HelloReply(message="Hello, %s!"%request.name)
[docs]defFlipVector(self,request_iterator,context):"""Flip a given vector. Parameters ---------- request_iterator : iterator Iterator to the stream of vector messages provided. context : grpc.ServicerContext gRPC-specific information. Returns ------- grpcdemo_pb2.Vector Flipped vector message. """click.echo("Vector flip requested.")# Process the metadatamd=self._read_client_metadata(context)# Process the input messagesdtype,size,vector_list=self._get_vectors(request_iterator,md)# Flip it --> assuming that only one vector is passednparray_flipped=np.flip(vector_list[0])# Send the responsereturnself._send_vectors(context,nparray_flipped)
[docs]defAddVectors(self,request_iterator,context):"""Add vectors. Parameters ---------- request_iterator : iterator Iterator to the stream of vector messages provided. context : grpc.ServicerContext gRPC-specific information. Returns ------- grpcdemo_pb2.Vector Vector message. """click.echo("Vector addition requested.")# Process the metadatamd=self._read_client_metadata(context)# Process the input messagesdtype,size,vector_list=self._get_vectors(request_iterator,md)# Create an empty array with the input arguments characteristics (dtype, size)result=np.zeros(size,dtype=dtype)# Add all provided vectors using the Eigen libraryforvectorinvector_list:# Casting is needed due to interface with Eigen library... Not the desired approach,# but works. Ideally, vectors should be passed directly, but errors appearcast_vector=np.array(vector,dtype=dtype)result=demo_eigen_wrapper.add_vectors(result,cast_vector)# Send the responsereturnself._send_vectors(context,result)
[docs]defMultiplyVectors(self,request_iterator,context):"""Multiply two vectors. Parameters ---------- request_iterator : iterator Iterator to the stream of vector messages provided. context : grpc.ServicerContext gRPC-specific information. Returns ------- grpcdemo_pb2.Vector Vector message. """click.echo("Vector dot product requested")# Process the metadatamd=self._read_client_metadata(context)# Process the input messagesdtype,size,vector_list=self._get_vectors(request_iterator,md)# Check that the vctor list contains a maximum of two vectorsiflen(vector_list)!=2:raiseRuntimeError("Unexpected number of vectors to be multiplied: "+len(vector_list)+". Only 2 is valid.")# Perform the dot product of the provided vectors using the Eigen library# casting is needed due to interface with Eigen library... Not the desired approach,# but works. Ideally, vectors should be passed directly, but errors appearvec_1=np.array(vector_list[0],dtype=dtype)vec_2=np.array(vector_list[1],dtype=dtype)result=demo_eigen_wrapper.multiply_vectors(vec_1,vec_2)# Return the result as a numpy.ndarrayresult=np.array(result,dtype=dtype,ndmin=1)# Finally, send the responsereturnself._send_vectors(context,result)
[docs]defAddMatrices(self,request_iterator,context):"""Add matrices. Parameters ---------- request_iterator : iterator Iterator to the stream of matrix messages provided. context : grpc.ServicerContext gRPC-specific information. Returns ------- grpcdemo_pb2.Matrix Matrix message. """click.echo("Matrix addition requested!")# Process the metadatamd=self._read_client_metadata(context)# Process the input messagesdtype,size,matrix_list=self._get_matrices(request_iterator,md)# Create an empty array with the input arguments characteristics (dtype, size)result=np.zeros(size,dtype=dtype)# Add all provided matrices using the Eigen libraryformatrixinmatrix_list:# Casting is needed due to interface with Eigen library... Not the desired approach,# but works. Ideally, we would want to pass matrix directly, but errors appearcast_matrix=np.array(matrix,dtype=dtype)result=demo_eigen_wrapper.add_matrices(result,cast_matrix)# Send the responsereturnself._send_matrices(context,result)
[docs]defMultiplyMatrices(self,request_iterator,context):"""Multiply two matrices. Parameters ---------- request_iterator : iterator Iterator to the stream of Matrix messages provided. context : grpc.ServicerContext gRPC-specific information. Returns ------- grpcdemo_pb2.Matrix Matrix message. """click.echo("Matrix multiplication requested.")# Process the metadatamd=self._read_client_metadata(context)# Process the input messagesdtype,size,matrix_list=self._get_matrices(request_iterator,md)# Check that the matrix list contains a maximum of two matricesiflen(matrix_list)!=2:raiseRuntimeError("Unexpected number of matrices to be multiplied: "+len(matrix_list)+". You can only multiple two matrices.")# Due to the previous _get_matrices method, the size of all# matrices is the same... check that it is a square matrix. Otherwise, no multiplication# is possibleifsize[0]!=size[1]:raiseRuntimeError("Only square matrices are allowed for multiplication.")# Perform the matrix multiplication of the provided matrices using the Eigen library# Casting is needed due to interface with Eigen library... Not the desired approach,# but works. Ideally, vector should be passed directly, but errors appearmat_1=np.array(matrix_list[0],dtype=dtype)mat_2=np.array(matrix_list[1],dtype=dtype)result=demo_eigen_wrapper.multiply_matrices(mat_1,mat_2)# Finally, send the responsereturnself._send_matrices(context,result)
# =================================================================================================# PRIVATE METHODS for Server operations# =================================================================================================def_get_vectors(self,request_iterator,md:dict):"""Process a stream of vector messages. Parameters ---------- request_iterator : iterator Iterator to the received request messages of type Vector. md : dict Metadata provided by the client. Returns ------- np.type, tuple, list of np.array Type of data, size of the vectors, and list of vectors to process. """# First, determine how many full vector messages are to be processedfull_msgs=int(md.get("full-vectors"))# Initialize the output vector list and some aux varsvector_list=[]dtype=Nonesize=None# Loop over the expected full messagesformsginrange(1,full_msgs+1):# Find out how many partial vector messages constitute this full vector messagechunks=int(md.get("vec%d-messages"%msg))# Initialize the output vectorvector=None# Loop over the expected chunksforchunk_msginrange(chunks):# Read the vector messagechunk_vec=next(request_iterator)# Inform about the size of the message contentclick.echo("Size of message: "+constants.human_size(chunk_vec.vector_as_chunk))# If processing the first chunk of the message, fill in some dataifchunk_msg==0:# Check the data type of the incoming vectorifchunk_vec.data_type==grpcdemo_pb2.DataType.Value("INTEGER"):dtype=check_data_type(dtype,np.int32)elifchunk_vec.data_type==grpcdemo_pb2.DataType.Value("DOUBLE"):dtype=check_data_type(dtype,np.float64)# Check the size of the incoming vectorsize=check_size(size,(chunk_vec.vector_size,))# Parse the chunkifvectorisNone:vector=np.frombuffer(chunk_vec.vector_as_chunk,dtype=dtype)else:tmp=np.frombuffer(chunk_vec.vector_as_chunk,dtype=dtype)vector=np.concatenate((vector,tmp))# Check if the final vector has the desired sizeifvector.size!=size[0]:raiseRuntimeError("Problems reading client full vector message...")else:# If everything is fine, append to vector_listvector_list.append(vector)# Return the input vector list (as a list of numpy.ndarray)returndtype,size,vector_listdef_get_matrices(self,request_iterator,md:dict):"""Process a stream of matrix messages. Parameters ---------- request_iterator : iterator Iterator to the received request messages of type ``Matrix``. md : dict Metadata provided by the client. Returns ------- np.type, tuple, list of np.array Type of data, shape of the matrices, and list of matrices to process. """# Determine how many full matrix messages are to be processedfull_msgs=int(md.get("full-matrices"))# Initialize the output matrix list and some aux varsmatrix_list=[]dtype=Nonesize=None# Loop over the expected full messagesformsginrange(1,full_msgs+1):# Find out how many partial matrix messages constitute this full matrix messagechunks=int(md.get("mat%d-messages"%msg))# Initialize the output matrixmatrix=None# Loop over the expected chunksforchunk_msginrange(chunks):# Read the matrix messagechunk_mat=next(request_iterator)# Inform about the size of the message contentclick.echo("Size of message: "+constants.human_size(chunk_mat.matrix_as_chunk))# If processing the first chunk of the message, fill in some dataifchunk_msg==0:# Check the data type of the incoming matrixifchunk_mat.data_type==grpcdemo_pb2.DataType.Value("INTEGER"):dtype=check_data_type(dtype,np.int32)elifchunk_mat.data_type==grpcdemo_pb2.DataType.Value("DOUBLE"):dtype=check_data_type(dtype,np.float64)# Check the size of the incoming matrixsize=check_size(size,(chunk_mat.matrix_rows,chunk_mat.matrix_cols,),)# Parse the chunkifmatrixisNone:matrix=np.frombuffer(chunk_mat.matrix_as_chunk,dtype=dtype)else:tmp=np.frombuffer(chunk_mat.matrix_as_chunk,dtype=dtype)matrix=np.concatenate((matrix,tmp))# Check if the final matrix has the desired sizeifmatrix.size!=size[0]*size[1]:raiseRuntimeError("Problems reading client full Matrix message...")else:# If everything is fine, append to matrix_listmatrix=np.reshape(matrix,size)matrix_list.append(matrix)# Return the input matrix list (as a list of numpy.ndarray)returndtype,size,matrix_listdef_read_client_metadata(self,context):"""Return the metadata as a dictionary. Parameters ---------- context : grpc.ServicerContext gRPC-specific information. Returns ------- dict Python-readable metadata in dictionary form. """metadata=context.invocation_metadata()metadata_dict={}forcinmetadata:metadata_dict[c.key]=c.valuereturnmetadata_dictdef_generate_md(self,message_type:str,abbrev:str,*args:np.ndarray):"""Generate the server metadata sent to the client and determine the number of chunks in which to decompose each message. Parameters ---------- message_type : str Type of message being sent. Options are``vectors`` and ``matrices``. abbrev : str Abbreviated form of the message being sent. Options are ``vec`` and ``mat``. Returns ------- list[tuple], list[list[int]] Metadata to be sent by the server and the chunk indices for the list of messages to send. Raises ------ RuntimeError In case of an invalid use of this function. """# Initialize the metadata and the chunks list for each full messagemd=[]chunks=[]# Find how many arguments are to be transmittedmd.append(("full-"+message_type,str(len(args))))# Loop over all input argumentsidx=1forarginargs:# Check the size of the arrays# If size is surpassed, determine chunks neededifarg.nbytes>constants.MAX_CHUNKSIZE:# Let us determine how many chunks we will need## Max amount of elements per chunkmax_elems=constants.MAX_CHUNKSIZE//arg.itemsize# Bulk number of chunks neededbulk_chunks=arg.size//max_elems# The remainder amount of elements (if any)remainder=arg.size%max_elems# This list provides the last index up to which to# process in each partial vector or matrix messagelast_idx_chunk=[]foriinrange(1,bulk_chunks+1):last_idx_chunk.append(i*max_elems)# Take into account that if there is a remainder,# include one last partial vector or matrix message.ifremainder!=0:last_idx_chunk.append(arg.size)# Append the resultsmd.append((abbrev+str(idx)+"-messages",str(len(last_idx_chunk))))chunks.append(last_idx_chunk)else:# Otherwise dealing with a single message.. Append results.md.append((abbrev+str(idx)+"-messages",str(1)))chunks.append([arg.size])# Increase idx by 1idx+=1# Return the metadata and the chunks list for each vector or matrixreturnmd,chunksdef_send_vectors(self,context:grpc.ServicerContext,*args:np.ndarray):"""Send the response vector messages. Parameters ---------- context : grpc.ServicerContext gRPC context. args : np.ndarray Variable size of np.arrays to transmit. Yields ------ grpcdemo_pb2.Vector Vector messages streamed (full or partial, depending on the metadata) """# Generate the metadata and info on the chunksmd,chunks=self._generate_md("vectors","vec",*args)# Send the initial metadatacontext.send_initial_metadata(md)# Loop over all input argumentsforarg,vector_chunksinzip(args,chunks):# Loop over the chunk indicesprocessed_idx=0forlast_idx_chunkinvector_chunks:# Use tmp_idx in yield function and update the processed_idx afterwardstmp_idx=processed_idxprocessed_idx=last_idx_chunk# Yield!yieldgrpcdemo_pb2.Vector(data_type=constants.NP_DTYPE_TO_DATATYPE[arg.dtype.type],vector_size=arg.shape[0],vector_as_chunk=arg[tmp_idx:last_idx_chunk].tobytes(),)def_send_matrices(self,context:grpc.ServicerContext,*args:np.ndarray):"""Sending the response matrix messages. Parameters ---------- context : grpc.ServicerContext gRPC context. args : np.ndarray Variable size of np.arrays to transmit. Yields ------ grpcdemo_pb2.Matrix Matrix messages streamed (full or partial, depending on the metadata) """# Generate the metadata and info on the chunksmd,chunks=self._generate_md("matrices","mat",*args)# Send the initial metadatacontext.send_initial_metadata(md)# Loop over all input argumentsforarg,matrix_chunksinzip(args,chunks):# Since we are dealing with matrices, ravel it to a 1D array (avoids copy)arg_as_vec=arg.ravel()# Loop over the chunk indicesprocessed_idx=0forlast_idx_chunkinmatrix_chunks:# Use tmp_idx in yield function and update the processed_idx afterwardstmp_idx=processed_idxprocessed_idx=last_idx_chunk# Yield!yieldgrpcdemo_pb2.Matrix(data_type=constants.NP_DTYPE_TO_DATATYPE[arg.dtype.type],matrix_rows=arg.shape[0],matrix_cols=arg.shape[1],matrix_as_chunk=arg_as_vec[tmp_idx:last_idx_chunk].tobytes(),)
# =================================================================================================# SERVING METHODS for Server operations# =================================================================================================
[docs]defserve():"""Deploy the API Eigen Example server."""server=grpc.server(futures.ThreadPoolExecutor(max_workers=10))grpcdemo_pb2_grpc.add_GRPCDemoServicer_to_server(GRPCDemoServicer(),server)server.add_insecure_port("[::]:50051")server.start()server.wait_for_termination()