Remote Python Module Example

Below is an example of a remote module implemented in python. It opens up a bi-directional stream in which it passes response messages after handling state events coming from the server. It also downloads a reference sent in the state event payload during the event handling.

Once a remote module has been started, it's inputs need to be mapped in the Ocellus UI as any normal module would by specifying it's input parameters. The remote module needs to make its input parameters known in it's initial Bind payload explained in its documentation.

Building Python Lib

To build the python clients. Execute the following command:

python3 -m grpc_tools.protoc \
        --python_out=./ \
        --grpc_python_out=./ \
        protos/ocellus_module_service.proto \
        protos/ocellus_state_service.proto

Installing Dependencies

The dependencies for the above can be installed on ubuntu, for example, using:

sudo apt install -y python3-pip
pip3 install grpcio-tools
echo -e "\nif [ -d "\$HOME/.local/bin" ] ; then\n    PATH="\$HOME/.local/bin:\$PATH"\nfi" >> ~/.bashrc
source ~/.bashrc

Test Client Code

Usage: python3 ocellus_test_client.py [ip|hostname] Where the ip or hostname is where ocellus is running

from __future__ import print_function

import cv2
import sys
import datetime
import imageio
import logging
import grpc
import numpy as np
import threading
import time
import queue
import ocellus_module_service_pb2
import ocellus_module_service_pb2_grpc


mock_response = ocellus_module_service_pb2.OutputData(
    items=[
        ocellus_module_service_pb2.Item(
            id=1,
            name="CoffeCup1",
            position=ocellus_module_service_pb2.Position(
                x=1.0,
                y=0.5,
                z=0.4
            ),
            quaternion=ocellus_module_service_pb2.Quaternion(
                x=0.3996381,
                y=0.3996381,
                z=0.3996381,
                w=-0.721712
            ),
            pathPlan=[ocellus_module_service_pb2.PathPoint(
                deltaTime=0,
                position=ocellus_module_service_pb2.Position(
                    x=0.0,
                    y=0.0,
                    z=0.0
                ),
                eulerAngles=ocellus_module_service_pb2.EulerAngles(
                    yaw=0.0,
                    pitch=0.0,
                    roll=0.0
                ),
            )
        ]),
        ocellus_module_service_pb2.Item(
            id=2,
            name="CoffeCup2",
            position=ocellus_module_service_pb2.Position(
                x=1.1,
                y=0.6,
                z=0.5
            ),
            quaternion=ocellus_module_service_pb2.Quaternion(
                x=0.3996381,
                y=0.3996381,
                z=0.3996381,
                w=-0.721712
            ),
            pathPlan=[ocellus_module_service_pb2.PathPoint(
                deltaTime=3000,
                position=ocellus_module_service_pb2.Position(
                    x=10.0,
                    y=10.0,
                    z=10.0
                ),
                # Not required if quartinion is set
                # eulerAngles=ocellus_module_service_pb2.EulerAngles(),
                quaternion=ocellus_module_service_pb2.Quaternion(
                    x=0.3996381,
                    y=0.3996381,
                    z=0.3996381,
                    w=-0.721712
                )
            )
        ])
    ]
)


def handle(event):
    # process inputs
    #  -CoffeeAnalyzerIntrinsics
    #  -CoffeeAnalyzerContours
    #  -CoffeeAnalyzerItems

    # mocked outputs
    return mock_response


def event_stream(output_queue):
    # Initial bind request to open stream
    bindRequest = ocellus_module_service_pb2.BindRequest(
        name='RemoteTestModule',
        states=['*'],  # 'filling_cup'],
        inputs=[
            ocellus_module_service_pb2.BindRequest.IOData(
                name="CoffeeAnalyzerCamera",
                dataType=ocellus_module_service_pb2.BindRequest.DataType.DATA_TYPE_CAMERA
            ),
            ocellus_module_service_pb2.BindRequest.IOData(
                name="CoffeeAnalyzerPointcloud",
                dataType=ocellus_module_service_pb2.BindRequest.DataType.DATA_TYPE_POINTCLOUD
            ),
            ocellus_module_service_pb2.BindRequest.IOData(
                name="CoffeeAnalyzerImage",
                dataType=ocellus_module_service_pb2.BindRequest.DataType.DATA_TYPE_RGBA_IMAGE
            ),
            ocellus_module_service_pb2.BindRequest.IOData(
                name="CoffeeAnalyzerImage2",
                dataType=ocellus_module_service_pb2.BindRequest.DataType.DATA_TYPE_RGBA_IMAGE
            ),
        ],
        outputs=[
            ocellus_module_service_pb2.BindRequest.IOData(
                name="RemoteTestModulePathPlans",
                dataType=ocellus_module_service_pb2.BindRequest.DataType.DATA_TYPE_ITEMS
            ),
        ]
    )
    print("Sending bindRequest")
    yield ocellus_module_service_pb2.ModuleData(bindRequest=bindRequest)

    while True:
        time.sleep(.1)
        item = output_queue.get()
        if item is not None:
            print("Sending data")
            yield ocellus_module_service_pb2.ModuleData(data=item)


MB = 1024 * 1024
GRPC_CHANNEL_OPTIONS = [('grpc.max_message_length', 64 * MB),
                        ('grpc.max_receive_message_length', 64 * MB)]


def run(host):
    with grpc.insecure_channel('{}:50052'.format(host), options=GRPC_CHANNEL_OPTIONS) as channel:
        stub = ocellus_module_service_pb2_grpc.OcellusModuleServiceStub(
            channel)
        try:
            lock = threading.Lock()
            output_queue = queue.Queue()
            gen = event_stream(output_queue)
            for event in stub.Bind(gen):
                print("Received event:")
                # print(event)
                print(event.data.pointClouds['CoffeeAnalyzerPointcloud'])

                refReq = ocellus_module_service_pb2.DownloadRefRequest(
                    urn=event.data.pointClouds['CoffeeAnalyzerPointcloud'] + "?format=depth"
                )
                refResponse = stub.DownloadRef(refReq)
                data = refResponse.pointCloud.mat32FC1Data
                rows = refResponse.pointCloud.height
                cols = refResponse.pointCloud.width

                arr = np.frombuffer(data, dtype=np.float32)
                arr = np.reshape(arr, (rows, cols))
                arr = cv2.normalize(arr, arr, 0, 255, cv2.NORM_MINMAX, cv2.CV_8UC1)
                cv2.imwrite('./depth.jpg', arr)

                print(len(refResponse.pointCloud.vertices))
                refReq = ocellus_module_service_pb2.DownloadRefRequest(
                    urn=event.data.rgbaImages['CoffeeAnalyzerImage']
                )
                refResponse = stub.DownloadRef(refReq)
                print(len(refResponse.data))
                with open('./rbga.png', 'wb') as f:
                    f.write(refResponse.data)
                refReq = ocellus_module_service_pb2.DownloadRefRequest(
                    urn=event.data.rgbaImages['CoffeeAnalyzerImage2']
                )
                refResponse = stub.DownloadRef(refReq)
                print(len(refResponse.data))
                response = handle(event)
                output_queue.put(response)

        except grpc._channel._Rendezvous as err:
            print(err)


if __name__ == '__main__':
    logging.basicConfig()
    host = sys.argv[1] if 1 < len(sys.argv) else '0.0.0.0'
    while True:
        run(host)
        time.sleep(5)

Top

State Change Simulation Code

The state can be changed using the python code below.

Usage: python3 ocellus_test_state_client.py filling_cup

from __future__ import print_function
import sys
import logging

import grpc

import ocellus_state_service_pb2
import ocellus_state_service_pb2_grpc


def run(state):
    with grpc.insecure_channel('localhost:50052') as channel:
        stub = ocellus_state_service_pb2_grpc.OcellusStateServiceStub(channel)
        msg = ocellus_state_service_pb2.SetStateRequest(
            state=state
        )
        try:
            response = stub.SetState(msg)
            print(response)
        except grpc._channel._Rendezvous as err:
            print(err)


if __name__ == '__main__':
    logging.basicConfig()
    run(sys.argv[1])