Skip to main content

rpc_helpers


func native_grpc_call

Call Type: async

Calls the provided gRPC method by invoking it directly on the channel.

View Source
async def native_grpc_call(metadata, full_method_name, method_desc, request, classes, channel):
'''
Calls the provided gRPC method by invoking it directly on the channel.
'''
# Get the classes for request and response, needed to deserialize
# and serialize messages from the channel correctly
req_class, rep_class = classes

if method_desc.server_streaming:
# Server-streaming call
call = channel.unary_stream(
full_method_name,
request_serializer=req_class.SerializeToString,
response_deserializer=rep_class.FromString
)
responses = []
# In this case, call responds with a wrapper that is an async
# generator function
async for resp in call(request, metadata=metadata):
responses.append(resp)
return responses[-1] # Just the last response is needed
else:
# Unary call
call = channel.unary_unary(
full_method_name,
request_serializer=req_class.SerializeToString,
response_deserializer=rep_class.FromString
)
return await call(request, metadata=metadata)


func generate_request

Call Type: normal

Generates a protobuf request object for an RPC given a sender ID.

View Source
def generate_request():
'''
Generates a protobuf request object for an RPC given a
sender ID.
'''
return Request(
timestamp=Timestamp().GetCurrentTime()
)


func generate_response

Call Type: normal

Generates a protobuf response object for an RPC given a response type and optional response string.

View Source
def generate_response(resp_type, resp_string=""):
'''
Generates a protobuf response object for an RPC given a
response type and optional response string.
'''
return Response(
status=resp_type,
response_string=resp_string,
timestamp=Timestamp().GetCurrentTime()
)