native
func now_ts
Call Type: normal
Get the current time as a Google Protobuf Timestamp.
Returns the current time as a Google Protobuf Timestamp object. This is useful for setting the timestamp field inside a Request object.
Returns
google.protobuf.timestamp_pb2.Timestamp
View Source
def now_ts() -> Timestamp:
"""Get the current time as a Google Protobuf Timestamp.
Returns the current time as a Google Protobuf Timestamp object.
This is useful for setting the timestamp field inside a Request
object.
Returns:
Timestamp: current timestamp as a Google Protobuf Timestamp object.
"""
ts = Timestamp()
ts.GetCurrentTime()
return ts
func error_to_api_response
Call Type: normal
Get the corresponding Python API Response for an error code.
Returns a Python API Response for a corresponding gRPC error code. Allows transformation of gRPC exceptions into a unified Response.
Arguments
arg error (grpc.aio.AioRpcError)
Returns
Response
View Source
def error_to_api_response(error: grpc.aio.AioRpcError) -> Response:
"""Get the corresponding Python API Response for an error code.
Returns a Python API Response for a corresponding gRPC error code.
Allows transformation of gRPC exceptions into a unified Response.
Args:
error (grpc.aio.AioRpcError): input gRPC Exception object.
Returns:
Response: Python API Response object.
"""
ts = now_ts()
# Note: gRPC error codes start from 0, API Response codes start from 2
return Response(status=error.code().value[0] + 2, response_string=error.details(), timestamp=ts)
func run_unary
Call Type: async
Runs a unary gRPC method and returns a Python API Response.
Runs a unary gRPC method, gets the response (or error), and translates it into a Python API Response.
Arguments
arg method_coro (typing.Any) STUB.METHOD e.g.
ControlStub.Connect.
arg request_pb (typing.Any)
arg metadata (Optional[list]) identity
parameter to access kernel services. An identity set to
internal signals to the kernel that the RPC request originates
from an onboard client.
arg timeout (Optional[int]) None indicates
no timeout.
View Source
async def run_unary(method_coro: Any, request_pb: Any, metadata: Optional[list]=[('identity', 'internal')], timeout: Optional[int]=None) -> Response:
"""Runs a unary gRPC method and returns a Python API Response.
Runs a unary gRPC method, gets the response (or error), and translates
it into a Python API Response.
Args:
method_coro (Any): an awaitable stub coroutine `STUB.METHOD` e.g.
ControlStub.Connect.
request_pb (Any): Protobuf object input for the method coroutine.
metadata (Optional[list]):
metadata object for gRPC. The metadata must include an `identity`
parameter to access kernel services. An `identity` set to
`internal` signals to the kernel that the RPC request originates
from an onboard client.
timeout (Optional[int]): timeout for the RPC request, `None` indicates
no timeout.
"""
ts = now_ts()
request_pb.request.timestamp.CopyFrom(ts)
try:
resp_pb = await method_coro(request_pb, metadata=metadata, timeout=timeout)
return resp_pb
except grpc.aio.AioRpcError as e:
return error_to_api_response(e)
func run_streaming
Call Type: async
Runs a streaming gRPC method and returns a Python API Response.
Runs a streaming gRPC method, gets the response (or error), and translates it into a Python API Response. This method will only return the last response it receives from the RPC.
Arguments
arg method_coro (typing.Any) STUB.METHOD e.g.
ControlStub.TakeOff.
arg request_pb (typing.Any)
arg metadata (Optional[list]) identity parameter to access kernel services.
An identity set to internal signals to the kernel that the
RPC request originates from an onboard client.
arg timeout (Optional[int]) None indicates
no timeout. It is generally not recommended to add a timeout to a
streaming method, since most have non-deterministic time of completion.
View Source
async def run_streaming(method_coro: Any, request_pb: Any, metadata: Optional[list]=[('identity', 'internal')], timeout: Optional[int]=None) -> Response:
"""Runs a streaming gRPC method and returns a Python API Response.
Runs a streaming gRPC method, gets the response (or error), and translates
it into a Python API Response. This method will only return the _last_
response it receives from the RPC.
Args:
method_coro (Any): an async generator stub coroutine `STUB.METHOD` e.g.
ControlStub.TakeOff.
request_pb (Any): Protobuf object input for the method coroutine.
metadata (Optional[list]): metadata object for gRPC. The metadata
must include an `identity` parameter to access kernel services.
An `identity` set to `internal` signals to the kernel that the
RPC request originates from an onboard client.
timeout (Optional[int]): timeout for the RPC request, `None` indicates
no timeout. It is generally not recommended to add a timeout to a
streaming method, since most have non-deterministic time of completion.
"""
ts = now_ts()
request_pb.request.timestamp.CopyFrom(ts)
call = method_coro(request_pb, metadata=metadata, timeout=timeout)
last = None
try:
async for msg in call:
last = msg # Guaranteed at least one response
return last
except grpc.aio.AioRpcError as e:
return error_to_api_response(e)