Created
February 16, 2023 10:12
-
-
Save JEnoch/e11404b1ed27e96679e7f4a09c7fa832 to your computer and use it in GitHub Desktop.
A Zenoh python script demonstrating ROS2 + Zigbee/MQTT integration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This script is an example of ROS2 + Zigbee/MQTT integration thanks to Zenoh. | |
# | |
# By default, it assumes that: | |
# - a zenoh-bridge-dds is deployed and routes the ROS2 traffic to zenoh. It's configured to use "simu" as a scope. | |
# - a zenoh-bridge-mqtt is deployed and routes the MQTT traffic coming from zigbee2mqtt software | |
# | |
# This script can connect to any Zenoh endpoint using the "-e" option (a Zenoh router, the zenoh-bridge-dds or the zenoh-bridge-mqtt). | |
# Then it performs the following: | |
# | |
# - It subscribes in Zenoh to "zigbee2mqtt/device/button" to receive JSON messages from zigbee2mqtt when a Zigbee push button named "device/button" is pressed. | |
# In reaction, it publishes Twist messages in Zenoh to "simu/rt/cmd_vel" to move the ROS2 robot | |
# | |
# - It subscribes in Zenoh to "simu/rt/scan" to receive LaserScan measurements from ROS2. | |
# The measurements for the front angle (60°) are extracted, the minimal range value is determined and a proximity level is computed. | |
# If the proximity level changed from the last measurements, JSON message is published in Zenoh to "zigbee2mqtt/device/bulb/set" | |
# to change the color and the brightness of a Zigbee lightbulb. | |
# | |
# - It reacts to keyboard events on the arrow keys and spacebar, publishing Twist messages in Zenoh to "simu/rt/cmd_vel" to move the ROS2 robot | |
# | |
import sys | |
from datetime import datetime | |
import time | |
import argparse | |
import curses | |
import zenoh | |
import json | |
from dataclasses import dataclass | |
from typing import List | |
from pycdr2 import IdlStruct | |
from pycdr2.types import int8, int32, uint32, float64, float32, sequence, array | |
# | |
# ROS2 topic types definition, with serialize() and deserialize() operation added by pycdr2 | |
# | |
@dataclass | |
class Vector3(IdlStruct, typename="Vector3"): | |
x: float64 | |
y: float64 | |
z: float64 | |
@dataclass | |
class Twist(IdlStruct, typename="Twist"): | |
linear: Vector3 | |
angular: Vector3 | |
@dataclass | |
class Time(IdlStruct, typename="Time"): | |
sec: uint32 | |
nsec: uint32 | |
@dataclass | |
class Header(IdlStruct, typename="Header"): | |
stamp: Time | |
frame_id: str | |
@dataclass | |
class LaserScan(IdlStruct, typename="LaserScan"): | |
stamp_sec: uint32 | |
stamp_nsec: uint32 | |
frame_id: str | |
angle_min: float32 | |
angle_max: float32 | |
angle_increment: float32 | |
time_increment: float32 | |
scan_time: float32 | |
range_min: float32 | |
range_max: float32 | |
ranges: List[float32] | |
intensities: List[float32] | |
# The front angle to consider getting minimal front distance from LaserScan | |
PROXIMITY_SCAN_ANGLE = 60 | |
# A number of proximity level, corresponding to a number of colors to display on the lightbulb | |
NB_PROXIMITY_LEVEL = 4 | |
# The last proximity level, to not re-publish color change command if level didn't change | |
last_prox_level = 0 | |
def main(stdscr): | |
# Use stdscr to get pressed keys (arrow keys to move the robot) | |
stdscr.refresh() | |
# --- Command line argument parsing --- --- --- --- --- --- | |
parser = argparse.ArgumentParser( | |
prog='zigbee-ros2-teleop', | |
description='Zigbee -> MQTT -> Zenoh -> ROS2 teleop example') | |
parser.add_argument('--mode', '-m', dest='mode', | |
choices=['peer', 'client'], | |
type=str, | |
help='The zenoh session mode.') | |
parser.add_argument('--connect', '-e', dest='connect', | |
metavar='ENDPOINT', | |
action='append', | |
type=str, | |
help='zenoh endpoints to connect to.') | |
parser.add_argument('--listen', '-l', dest='listen', | |
metavar='ENDPOINT', | |
action='append', | |
type=str, | |
help='zenoh endpoints to listen on.') | |
parser.add_argument('--config', '-c', dest='config', | |
metavar='FILE', | |
type=str, | |
help='A configuration file.') | |
parser.add_argument('--cmd_vel', dest='cmd_vel', | |
default='simu/rt/cmd_vel', | |
type=str, | |
help='The "cmd_vel" ROS2 topic.') | |
parser.add_argument('--scan', dest='scan', | |
default='simu/rt/scan', | |
type=str, | |
help='The "scan" ROS2 topic.') | |
parser.add_argument('--button', dest='button', | |
default='zigbee2mqtt/device/button', | |
type=str, | |
help='The Zenoh key to subscribe MQTT button publications.') | |
parser.add_argument('--light', dest='light', | |
default='zigbee2mqtt/device/bulb/set', | |
type=str, | |
help='The Zenoh key to publish MQTT messages for light.') | |
parser.add_argument('--angular_scale', '-a', dest='angular_scale', | |
default='0.5', | |
type=float, | |
help='The angular scale.') | |
parser.add_argument('--linear_scale', '-x', dest='linear_scale', | |
default='0.2', | |
type=float, | |
help='The linear scale.') | |
parser.add_argument('--max_range', dest='max_range', | |
default='0.8', | |
type=float, | |
help='The maximal distance above which the light will be green.') | |
parser.add_argument('--min_range', dest='min_range', | |
default='0.2', | |
type=float, | |
help='The minimal distance below which the light be red.') | |
# arguments parsing and config preparation | |
args = parser.parse_args() | |
conf = zenoh.config_from_file(args.config) if args.config is not None else zenoh.Config() | |
if args.mode is not None: | |
conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode)) | |
if args.connect is not None: | |
conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect)) | |
if args.listen is not None: | |
conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen)) | |
cmd_vel = args.cmd_vel | |
scan = args.scan | |
button = args.button | |
light = args.light | |
angular_scale = args.angular_scale | |
linear_scale = args.linear_scale | |
max_range = args.max_range | |
min_range = args.min_range | |
# zenoh-net code --- --- --- --- --- --- --- --- --- --- --- | |
# initiate logging | |
zenoh.init_logger() | |
print("Openning session...") | |
session = zenoh.open(conf) | |
# to publish a Twist message to ROS2 | |
def pub_twist(linear, angular): | |
print("Pub twist: {} - {}".format(linear, angular)) | |
t = Twist(linear=Vector3(x=linear, y=0.0, z=0.0), | |
angular=Vector3(x=0.0, y=0.0, z=angular)) | |
session.put(cmd_vel, t.serialize()) | |
# Zenoh (MQTT) button subscription | |
print("Subscriber on '{}'...".format(button)) | |
def button_callback(sample): | |
# On JSON payload reception, check 'action' (single or double) and publish Twist | |
m = json.loads(sample.payload) | |
print('Received sample {}'.format(sample)) | |
if m['action'] == 'single': | |
print(' => single') | |
pub_twist(0.0, 1.0 * angular_scale) | |
time.sleep(2.0) | |
pub_twist(0.0, 0.0) | |
elif m['action'] == 'double': | |
print(' => double') | |
pub_twist(0.0, -1.0 * angular_scale) | |
time.sleep(2.0) | |
pub_twist(0.0, 0.0) | |
sub_button = session.declare_subscriber(button, button_callback) | |
# Zenoh (ROS2) scan subscription | |
print("Subscriber on '{}'...".format(scan)) | |
def scan_callback(sample): | |
global last_prox_level | |
scan = LaserScan.deserialize(sample.payload) | |
# get the minimal range value in the front angle (PROXIMITY_SCAN_ANGLE) | |
range = min(scan.ranges[0:round(PROXIMITY_SCAN_ANGLE/2)] + scan.ranges[round(360-PROXIMITY_SCAN_ANGLE/2):359]) | |
prox_level = round((max_range - range) * NB_PROXIMITY_LEVEL / (max_range - min_range)) | |
prox_level=max(min(prox_level,NB_PROXIMITY_LEVEL),0) | |
# if proximity level changed: | |
if (prox_level != last_prox_level): | |
# compute brightness and color depending the proximity level | |
brightness = 15 + (prox_level * 240) / NB_PROXIMITY_LEVEL | |
red = round(255 * prox_level / NB_PROXIMITY_LEVEL) | |
green = round(255 * (1 - (prox_level / NB_PROXIMITY_LEVEL))) | |
blue = 0 | |
print(f"range={range} => proximity_level={prox_level} => publish brightness={brightness},RGB=({red},{green},{blue}) to {light} ") | |
# publish over zenoh the JSON message setting the lightbulb brightness and color | |
# (see supported JSON attributes in https://www.zigbee2mqtt.io/devices/33943_33944_33946.html) | |
session.put(light, {'brightness': brightness, 'color': {'r': red, 'g': green, 'b': blue}}) | |
last_prox_level=prox_level | |
sub_scan = session.declare_subscriber(scan, scan_callback) | |
# Loop catching keyboard strokes (arrows+space) | |
print("Waiting commands with arrow keys or space bar to stop. Press ESC or 'q' to quit.") | |
while True: | |
c = stdscr.getch() | |
if c == curses.KEY_UP: | |
pub_twist(1.0 * linear_scale, 0.0) | |
elif c == curses.KEY_DOWN: | |
pub_twist(-1.0 * linear_scale, 0.0) | |
elif c == curses.KEY_LEFT: | |
pub_twist(0.0, 1.0 * angular_scale) | |
elif c == curses.KEY_RIGHT: | |
pub_twist(0.0, -1.0 * angular_scale) | |
elif c == 32: | |
pub_twist(0.0, 0.0) | |
elif c == 27 or c == ord('q'): | |
break | |
sub_button.undeclare() | |
sub_scan.undeclare() | |
session.close() | |
curses.wrapper(main) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment