Remote Python Module Example
Below is an example of a remote module implemented in python. It opens up a bi-directional stream in which it passes response messages after handling state events coming from the server. It also downloads a reference sent in the state event payload during the event handling.
Adding a remote module in ocellus is done via the UI. A module of the type 'Remote' is added and then the inputs and outputs it requires can be selected based on other existing modules in the pipeline.
Installing Python Lib
To build the python clients from pip. Execute the following command:
pip install ocellus
Installing Dependencies
The dependencies for the example can be installed on ubuntu, for example, using:
sudo apt install -y python3-pip
pip3 install opencv-python
echo -e "\nif [ -d "\$HOME/.local/bin" ] ; then\n PATH="\$HOME/.local/bin:\$PATH"\nfi" >> ~/.bashrc
source ~/.bashrc
Test Client Code
Usage: python3 ocellus_test_client.py [ip|hostname]
Where the ip or hostname is where ocellus is running
from __future__ import print_function
import cv2
import sys
import logging
import grpc
import numpy as np
import threading
import time
import queue
import ocellus.ocellus_module_service_pb2 as ocellus_module_service_pb2
import ocellus.ocellus_module_service_pb2_grpc as ocellus_module_service_pb2_grpc
import ocellus.ocellus_types_pb2 as ocellus_types_pb2
def handle(parentCamera):
# mocked outputs
return ocellus_module_service_pb2.OutputData(
items=[
ocellus_types_pb2.Item(
id=1,
name="CoffeCup1",
parentCamera=parentCamera,
position=ocellus_types_pb2.Position(
local=ocellus_types_pb2.Position(
x=1.0,
y=0.5,
z=0.4
)
),
quaternion=ocellus_types_pb2.Quaternion(
local=ocellus_types_pb2.Quaternion(
x=0.3996381,
y=0.3996381,
z=0.3996381,
w=-0.721712
)
),
pathPlan=[ocellus_types_pb2.PathPoint(
deltaTime=0,
position=ocellus_types_pb2.Position(
local=ocellus_types_pb2.Position(
x=0.1,
y=0.1,
z=0.1
)
),
eulerAngles=ocellus_types_pb2.EulerAngles(
local=ocellus_types_pb2.EulerAngles(
yaw=0.1,
pitch=0.1,
roll=0.1
)
),
# Not required if eulerAngles is set
quaternion=ocellus_types_pb2.Quaternion(
local=ocellus_types_pb2.Quaternion(
x=0.3996381,
y=0.3996381,
z=0.3996381,
w=-0.721712
)
)
)
]),
ocellus_types_pb2.Item(
id=2,
name="CoffeCup2",
parentCamera=parentCamera,
position=ocellus_types_pb2.Position(
local=ocellus_types_pb2.Position(
x=1.1,
y=0.6,
z=0.5
)
),
quaternion=ocellus_types_pb2.Quaternion(
local=ocellus_types_pb2.Quaternion(
x=0.3996381,
y=0.3996381,
z=0.3996381,
w=-0.721712
)
),
pathPlan=[ocellus_types_pb2.PathPoint(
deltaTime=3000,
position=ocellus_types_pb2.Position(
local=ocellus_types_pb2.Position(
x=10.0,
y=10.0,
z=10.0
)
),
# Not required if quartinion is set
eulerAngles=ocellus_types_pb2.EulerAngles(
local=ocellus_types_pb2.EulerAngles(
yaw=0.1,
pitch=0.1,
roll=0.1
)
),
quaternion=ocellus_types_pb2.Quaternion(
local=ocellus_types_pb2.Quaternion(
x=0.3996381,
y=0.3996381,
z=0.3996381,
w=-0.721712
)
)
)
])
]
)
def event_stream(output_queue):
# Initial bind request to open stream
bindRequest = ocellus_module_service_pb2.BindRequest(
name='RemoteTestModule'
)
print("Sending bindRequest")
yield ocellus_module_service_pb2.ModuleData(bindRequest=bindRequest)
while True:
time.sleep(.1)
item = output_queue.get()
if item is not None:
print("Sending data")
yield ocellus_module_service_pb2.ModuleData(data=item)
MB = 1024 * 1024
GRPC_CHANNEL_OPTIONS = [('grpc.max_message_length', 128 * MB),
('grpc.max_receive_message_length', 128 * MB)]
def run(host):
with grpc.insecure_channel('{}:50052'.format(host), options=GRPC_CHANNEL_OPTIONS) as channel:
stub = ocellus_module_service_pb2_grpc.OcellusModuleServiceStub(
channel)
try:
lock = threading.Lock()
output_queue = queue.Queue()
gen = event_stream(output_queue)
for event in stub.Bind(gen):
print(f"Received event: {event}")
# First event after bind will not contain data, it will be an ack
if not event.HasField("data"):
output_queue.put(ocellus_module_service_pb2.OutputData())
continue
# Note that the 'rs' would be, for example, a Realsense module
# named "rs" configured before the Remote module
print(event.data.pointClouds['rs/POINTCLOUD'])
refReq = ocellus_module_service_pb2.DownloadRefRequest(
urn=event.data.pointClouds['rs/POINTCLOUD'] +
"?format=depth"
)
refResponse = stub.DownloadRef(refReq)
data = refResponse.pointCloud.mat32FC1Data
rows = refResponse.pointCloud.height
cols = refResponse.pointCloud.width
arr = np.frombuffer(data, dtype=np.float32)
arr = np.reshape(arr, (rows, cols))
arr = cv2.normalize(
arr, arr, 0, 255, cv2.NORM_MINMAX, cv2.CV_8UC1)
cv2.imwrite('./depth.jpg', arr)
print(len(refResponse.pointCloud.vertices))
refReq = ocellus_module_service_pb2.DownloadRefRequest(
urn=event.data.rgbaImages['rs/RGBA_IMAGE']
)
refResponse = stub.DownloadRef(refReq)
print(len(refResponse.data))
with open('./rbga.png', 'wb') as f:
f.write(refResponse.data)
parentCamera = event.data.cameras['rs/CAMERA']
response = handle(parentCamera)
output_queue.put(response)
except grpc._channel._Rendezvous as err:
print(err)
if __name__ == '__main__':
logging.basicConfig()
host = sys.argv[1] if 1 < len(sys.argv) else '0.0.0.0'
while True:
run(host)
time.sleep(5)