Last active
April 27, 2020 05:10
-
-
Save jkent/e5667fbf35403345e6f3df511f3ff40f to your computer and use it in GitHub Desktop.
python to be used with https://github.com/Frans-Willem/ZigbeeRadioBridge
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from binascii import hexlify | |
from time import sleep | |
from zag import MHR, Radio | |
def main(): | |
def packet_handler(data, rssi, link_quality): | |
mhr, offset = MHR.decode(data) | |
print(hexlify(mhr.encode() + data[offset:]).decode('utf8')) | |
radio = Radio('/dev/ttyACM1', packet_handler) | |
_, short = radio.get_value(Radio.Param.ShortAddr) | |
_, ext = radio.get_object(Radio.Param.LongAddr, 8) | |
print('I\'m 0x%04X, aka %s' % (short, hexlify(ext).decode('utf8').upper())) | |
radio.set_value(Radio.Param.Channel, 11) | |
radio.set_value(Radio.Param.RxMode, 0) | |
radio.set_value(Radio.Param.TxMode, Radio.TxMode.SendOnCca) | |
try: | |
while True: | |
sleep(1) | |
except KeyboardInterrupt: | |
radio.shutdown() | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from binascii import hexlify | |
from time import sleep | |
from zag import MHR, Radio | |
def main(): | |
def packet_handler(data, rssi, link_quality): | |
mhr, offset = MHR.decode(data) | |
print(hexlify(mhr.encode() + data[offset:]).decode('utf8')) | |
radio = Radio('/dev/ttyACM0', packet_handler) | |
_, short = radio.get_value(Radio.Param.ShortAddr) | |
_, ext = radio.get_object(Radio.Param.LongAddr, 8) | |
print('I\'m 0x%04X, aka %s' % (short, hexlify(ext).decode('utf8').upper())) | |
radio.set_value(Radio.Param.Channel, 11) | |
radio.set_value(Radio.Param.RxMode, 0) | |
radio.set_value(Radio.Param.TxMode, Radio.TxMode.SendOnCca) | |
mhr = MHR() | |
mhr.frame_type = MHR.FrameType.Beacon | |
mhr.src_addr = short | |
packet = mhr.encode() + b'beacon' | |
print(hexlify(packet).decode('utf8')) | |
try: | |
while True: | |
radio.send(packet) | |
sleep(1) | |
except KeyboardInterrupt: | |
radio.shutdown() | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from enum import IntEnum, IntFlag, unique | |
from queue import Queue | |
import random | |
from serial import Serial | |
import struct | |
from threading import Thread | |
class Radio(object): | |
packet_magic = b'ZPB' | |
header_struct = struct.Struct('!BHH') | |
@unique | |
class Request(IntEnum): | |
Prepare = 0 | |
Transmit = 1 | |
Send = 2 | |
ChannelClear = 3 | |
On = 4 | |
Off = 5 | |
GetValue = 6 | |
SetValue = 7 | |
GetObject = 8 | |
SetObject = 9 | |
InitPendingTable = 10 | |
SetPending = 11 | |
def __str__(self): | |
return str(self.name) | |
@unique | |
class Response(IntEnum): | |
OK = 128 | |
Err = 129 | |
def __str__(self): | |
return str(self.name) | |
@unique | |
class Event(IntEnum): | |
OnPacket = 192 | |
def __str__(self): | |
return str(self.name) | |
class ResponseErr(Exception): | |
pass | |
@unique | |
class Result(IntEnum): | |
OK = 0 | |
NotSupported = 1 | |
InvalidValue = 2 | |
Error = 3 | |
def __str__(self): | |
return str(self.name) | |
@unique | |
class TransmitResult(IntEnum): | |
OK = 0 | |
Err = 1 | |
Collision = 2 | |
NoAck = 3 | |
def __str__(self): | |
return str(self.name) | |
@unique | |
class Param(IntEnum): | |
PowerMode = 0 | |
Channel = 1 | |
PanID = 2 | |
ShortAddr = 3 | |
RxMode = 4 | |
TxMode = 5 | |
TxPower = 6 | |
CcaThreshold = 7 | |
Rssi = 8 | |
LastRssi = 9 | |
LastLinkQuality = 10 | |
LongAddr = 11 | |
LastPacketTimestamp = 12 | |
# Constants | |
ChannelMin = 13 | |
ChannelMax = 14 | |
TxpowerMin = 15 | |
TxpowerMax = 16 | |
def __str__(self): | |
return str(self.name) | |
@unique | |
class RxMode(IntFlag): | |
AddressFilter = (1 << 0) | |
AutoAck = (1 << 1) | |
PollMode = (1 << 2) | |
@unique | |
class TxMode(IntFlag): | |
SendOnCca = (1 << 0) | |
def __init__(self, port, packet_handler=None): | |
self.packet_handler = packet_handler | |
self.next_request_id = random.randint(0, 65534) | |
self.waiting_request_id = None | |
self.serial = Serial(port, timeout=1) | |
self.reader_queue = Queue() | |
self.thread = Thread(target=self.reader) | |
self.thread.start() | |
def shutdown(self): | |
self.done = True | |
def reader(self): | |
self.done = False | |
while not self.done: | |
data = self.serial.read_until(Radio.packet_magic) | |
if not data.endswith(Radio.packet_magic): | |
continue | |
data = self.serial.read(Radio.header_struct.size) | |
if len(data) != Radio.header_struct.size: | |
continue | |
response, request_id, data_len = Radio.header_struct.unpack(data) | |
data = self.serial.read(data_len) | |
if len(data) != data_len: | |
continue | |
if response & 0xC0 == 0xC0: | |
event = Radio.Event(response) | |
if event == Radio.Event.OnPacket: | |
rssi, link_quality = struct.unpack('!bB', data[-2:]) | |
if self.packet_handler: | |
self.packet_handler(data[:-2], rssi, link_quality) | |
continue | |
if self.waiting_request_id != request_id: | |
continue | |
response = Radio.Response(response) | |
self.reader_queue.put((response, data)) | |
def write(self, command, data=b''): | |
request_id = self.next_request_id | |
self.next_request_id = 0 if request_id >= 65534 else request_id + 1 | |
self.waiting_request_id = request_id | |
packet = Radio.packet_magic + Radio.header_struct.pack(command.value, request_id, len(data)) + data | |
self.serial.write(packet) | |
response, data = self.reader_queue.get() | |
if response == Radio.Response.Err: | |
raise Radio.ResponseErr | |
return data | |
def prepare(self, data): | |
data = self.write(Radio.Request.Prepare, data) | |
result, = struct.unpack('!H', data) | |
result = Radio.Result(result) | |
return result, | |
def transmit(self, transmit_len): | |
data = struct.pack('!H', transmit_len) | |
data = self.write(Radio.Request.Transmit, data) | |
result, = struct.unpack('!H', data) | |
result = Radio.TransmitResult(result) | |
return result, | |
def send(self, data): | |
data = self.write(Radio.Request.Send, data) | |
result, = struct.unpack('!H', data) | |
result = Radio.TransmitResult(result) | |
return result, | |
def channel_clear(self): | |
data = self.write(Radio.Request.ChannelClear) | |
result, = struct.unpack('!H', data) | |
result = Radio.Result(result) | |
return result, | |
def on(self): | |
self.write(Radio.Request.On) | |
return Radio.Result.OK, | |
def off(self): | |
self.write(Radio.Request.Off) | |
return Radio.Result.OK, | |
def get_value(self, param): | |
data = struct.pack('!H', int(param)) | |
data = self.write(Radio.Request.GetValue, data) | |
result, value = struct.unpack('!HH', data) | |
result = Radio.Result(result) | |
return result, value | |
def set_value(self, param, value): | |
data = struct.pack('!HH', int(param), value) | |
data = self.write(Radio.Request.SetValue, data) | |
result, = struct.unpack('!H', data) | |
result = Radio.Result(result) | |
return result, | |
def get_object(self, param, expected_len): | |
data = struct.pack('!HH', int(param), expected_len) | |
data = self.write(Radio.Request.GetObject, data) | |
result, = struct.unpack('!H', data[:2]) | |
result = Radio.Result(result) | |
return result, data[2:] | |
def set_object(self, param, data): | |
data = struct.pack('!HH', int(param), len(data)) + data | |
data = self.write(Radio.Request.GetObject, data) | |
result, = struct.unpack('!H', data) | |
result = Radio.Result(result) | |
return result, | |
def init_pending_table(self): | |
data = self.write(Radio.Request.InitPendingTable) | |
result, = struct.unpack('!H', data) | |
result = Radio.Result(result) | |
return result, | |
def set_pending_short(self, index, address=None): | |
if address == None: | |
address = b'' | |
elif isinstance(address, int): | |
address = struct.pack('!H', address) | |
data = struct.pack('!B', index) + address | |
data = self.write(Radio.Request.SetPending, data) | |
result, = struct.unpack('!H', data) | |
result = Radio.Result(result) | |
return result, | |
def set_pending_ext(self, index, address=None): | |
if address == None: | |
address = b'' | |
data = struct.pack('!B', index | 0x80) + address | |
data = self.write(Radio.Request.SetPending, data) | |
result, = struct.unpack('!H', data) | |
result = Radio.Result(result) | |
return result, | |
class MHR(object): | |
@unique | |
class FrameType(IntEnum): | |
Beacon = 0 | |
Data = 1 | |
Ack = 2 | |
MacCommand = 3 | |
def __str__(self): | |
return str(self.name) | |
@unique | |
class FrameControl(IntFlag): | |
SecurityEnabled = (1 << 3) | |
FramePending = (1 << 4) | |
AckRequest = (1 << 5) | |
PanidCompression = (1 << 6) | |
def __str__(self): | |
return str(self.name) | |
@unique | |
class AddrMode(IntEnum): | |
NotPresent = 0 | |
ShortAddr = 2 | |
ExtendedAddr = 3 | |
def __str__(self): | |
return str(self.name) | |
@unique | |
class FrameVersion(IntEnum): | |
IEEE802_15_4_2003 = 0 | |
IEEE802_15_4_2006 = 1 | |
def __str__(self): | |
return str(self.name) | |
@classmethod | |
def decode(cls, packet): | |
mhr = cls() | |
offset = 0 | |
mhr._frame_control, mhr._seq_num = struct.unpack_from('!HB', packet, offset) | |
offset += 3 | |
if mhr.frame_version > MHR.FrameVersion.IEEE802_15_4_2006: | |
return None | |
if mhr.dst_addrmode != MHR.AddrMode.NotPresent: | |
mhr._dst_panid, = struct.unpack_from('!H', packet, offset) | |
offset += 2 | |
if mhr.dst_addrmode == MHR.AddrMode.ShortAddr: | |
mhr._dst_addr, = struct.unpack_from('!H', packet, offset) | |
offset += 2 | |
elif mhr.dst_addrmode == MHR.AddrMode.ExtendedAddr: | |
mhr._dst_addr = packet[offset:offset+8] | |
offset += 8 | |
if mhr.src_addrmode != MHR.AddrMode.NotPresent and not mhr.panid_compression: | |
mhr._src_panid, = struct.unpack_from('!H', packet, offset) | |
offset += 2 | |
if mhr.src_addrmode == MHR.AddrMode.ShortAddr: | |
mhr._src_addr, = struct.unpack_from('!H', packet, offset) | |
offset += 2 | |
elif mhr.src_addrmode == MHR.AddrMode.ExtendedAddr: | |
mhr._src_addr = packet[offset:offset+8] | |
offset += 8 | |
return mhr, offset | |
def __init__(self): | |
self._frame_control = 0 | |
self._seq_num = 0 | |
self._dst_panid = None | |
self._dst_addr = None | |
self._src_panid = None | |
self._src_addr = None | |
def encode(self): | |
data = struct.pack('!HB', self._frame_control, self._seq_num) | |
if self.dst_addrmode != MHR.AddrMode.NotPresent: | |
data += struct.pack('!H', self._dst_panid) | |
if self.dst_addrmode == MHR.AddrMode.ShortAddr: | |
data += struct.pack('!H', self._dst_addr) | |
elif self.dst_addrmode == MHR.AddrMode.ExtendedAddr: | |
data += self._dst_addr | |
if self.src_addrmode != MHR.AddrMode.NotPresent and not self.panid_compression: | |
data += struct.pack('!H', self._src_panid) | |
if self.src_addrmode == MHR.AddrMode.ShortAddr: | |
data += struct.pack('!H', self._src_addr) | |
elif self.src_addrmode == MHR.AddrMode.ExtendedAddr: | |
data += self._src_addr | |
return data | |
@property | |
def frame_control(self): | |
return self._frame_control | |
@frame_control.setter | |
def frame_control(self, value): | |
print('fc!') | |
self._frame_control = value | |
if self.dst_addrmode == MHR.AddrMode.NotPresent: | |
self._dst_addr = None | |
else: | |
if self.dst_addrmode == MHR.AddrMode.ShortAddr and not isinstance(self._dst_addr, int): | |
self._dst_addr = 0 | |
elif self.dst_addrmode == MHR.AddrMode.ExtendedAddr and not isinstance(self._dst_addr, bytes): | |
self._dst_addr = b'\x00' * 8 | |
if self.src_addrmode == MHR.AddrMode.NotPresent: | |
self._src_addr = None | |
else: | |
if self.src_addrmode == MHR.AddrMode.ShortAddr and not isinstance(self._src_addr, int): | |
self._src_addr = 0 | |
elif self.src_addrmode == MHR.AddrMode.ExtendedAddr and not isinstance(self._src_addr, bytes): | |
self._src_addr = b'\x00' * 8 | |
@property | |
def frame_type(self): | |
return MRH.FrameType(self._frame_control & 0x3) | |
@frame_type.setter | |
def frame_type(self, value): | |
self._frame_control &= ~0x3 | |
self._frame_control |= int(value) & 0x3 | |
@property | |
def security_enabled(self): | |
return bool(self._frame_control & 0x4) | |
@security_enabled.setter | |
def security_enabled(self, value): | |
self._frame_control &= ~0x4 | |
if value: | |
self._frame_control |= 0x4 | |
@property | |
def frame_pending(self): | |
return bool(self._frame_control & 0x8) | |
@frame_pending.setter | |
def frame_pending(self, value): | |
self._frame_control &= ~0x8 | |
if value: | |
self._frame_control |= 0x08 | |
@property | |
def ack_request(self): | |
return bool(self._frame_control & MHR.FrameControl.AckRequest) | |
@ack_request.setter | |
def ack_request(self, value): | |
self._frame_control &= ~0x10 | |
if value: | |
self._frame_control |= 0x10 | |
@property | |
def panid_compression(self): | |
return bool(self._frame_control & MHR.FrameControl.PanidCompression) | |
@panid_compression.setter | |
def panid_compression(self, value): | |
self._frame_control &= ~MHR.FrameControl.PanidCompression | |
if value: | |
self._frame_control |= MHR.FrameControl.PanidCompression | |
if self.src_addrmode == MHR.AddrMode.NotPresent: | |
self._src_panid = None | |
self._src_addr = None | |
elif self.panid_compression: | |
self._src_panid = None | |
@property | |
def dst_addrmode(self): | |
return MHR.AddrMode((self._frame_control >> 10) & 0x3) | |
@dst_addrmode.setter | |
def dst_addrmode(self, value): | |
self._frame_control &= ~0xC00 | |
self._frame_control |= (int(value) & 0x3) << 10 | |
if self.dst_addrmode == MHR.AddrMode.NotPresent: | |
self._dst_addr = None | |
self._dst_panid = None | |
else: | |
if self.dst_addrmode != MHR.AddrMode.NotPresent and self._dst_panid == None: | |
self._dst_panid = 0 | |
if self.dst_addrmode == MHR.AddrMode.ShortAddr and not isinstance(self._dst_addr, int): | |
self._dst_addr = 0 | |
elif self.dst_addrmode == MHR.AddrMode.ExtendedAddr and not isinstance(self._dst_addr, bytes): | |
self._dst_addr = b'\x00' * 8 | |
@property | |
def frame_version(self): | |
return MHR.FrameVersion((self._frame_control >> 12) & 0x3) | |
@frame_version.setter | |
def frame_version(self, value): | |
self._frame_control &= ~0x3000 | |
self._frame_control |= (int(value) & 0x3) << 12 | |
@property | |
def src_addrmode(self): | |
return MHR.AddrMode((self._frame_control >> 14) & 0x3) | |
@src_addrmode.setter | |
def src_addrmode(self, value): | |
self._frame_control &= ~0xC000 | |
self._frame_control |= (int(value) & 0x3) << 14 | |
if self.src_addrmode == MHR.AddrMode.NotPresent: | |
self._src_addr = None | |
self._src_panid = None | |
else: | |
if self.panid_compression: | |
self._src_panid = None | |
if self.src_addrmode != MHR.AddrMode.NotPresent and self._src_panid == None: | |
self._src_panid = 0 | |
if self.src_addrmode == MHR.AddrMode.ShortAddr and not isinstance(self._src_addr, int): | |
self._src_addr = 0 | |
elif self.src_addrmode == MHR.AddrMode.ExtendedAddr and not isinstance(self._src_addr, bytes): | |
self._src_addr = b'\x00' * 8 | |
@property | |
def seq_num(self): | |
return self._seq_num | |
@seq_num.setter | |
def seq_num(self, value): | |
self._seq_num = value & 0xFF | |
@property | |
def dst_panid(self): | |
return self._dst_panid | |
@dst_panid.setter | |
def dst_panid(self, value): | |
self._dst_panid = value & 0xFFFF | |
@property | |
def dst_addr(self): | |
return self._dst_addr | |
@dst_addr.setter | |
def dst_addr(self, value): | |
if isinstance(value, int): | |
self._dst_addr = value & 0xFFFF | |
if self.dst_addrmode != MHR.AddrMode.ShortAddr: | |
self.dst_addrmode = MHR.AddrMode.ShortAddr | |
elif isinstance(value, bytes) and len(value) == 8: | |
self._dst_addr = value | |
if self.dst_addrmode != MHR.AddrMode.ExtendedAddr: | |
self.dst_addrmode = MHR.AddrMode.ExtendedAddr | |
else: | |
self.dst_addrmode = MHR.AddrMode.NotPresent | |
@property | |
def src_panid(self): | |
if self.panid_compression: | |
return self._dst_panid | |
return self._src_panid | |
@src_panid.setter | |
def src_panid(self, value): | |
if value == None: | |
self.src_addrmode = MHR.AddrMode.NotPresent | |
elif value == self.dst_panid: | |
self.panid_compression = True | |
else: | |
self.panid_compression = False | |
self._src_panid = value & 0xFFFF | |
if self.src_addrmode == MHR.AddrMode.NotPresent: | |
self.src_addr = 0 | |
@property | |
def src_addr(self): | |
return self._dst_addr | |
@src_addr.setter | |
def src_addr(self, value): | |
if isinstance(value, int): | |
self._src_addr = value & 0xFFFF | |
if self.src_addrmode != MHR.AddrMode.ShortAddr: | |
self.src_addrmode = MHR.AddrMode.ShortAddr | |
elif isinstance(value, bytes) and len(value) == 8: | |
self._src_addr = value | |
if self.src_addrmode != MHR.AddrMode.ExtendedAddr: | |
self.src_addrmode = MHR.AddrMode.ExtendedAddr | |
else: | |
self.src_addrmode = MHR.AddrMode.NotPresent | |
if __name__ == '__main__': | |
import sys | |
from time import sleep | |
def packet_handler(data, rssi, link_quality): | |
print(f'Data: {data} RSSI: {rssi} Link Quality: {link_quality}') | |
port = sys.argv[1] | |
radio = Radio(port, packet_handler) | |
try: | |
radio.set_value(Radio.Param.Channel, 11) | |
radio.set_value(Radio.Param.RxMode, 0) | |
radio.set_value(Radio.Param.TxMode, Radio.TxMode.SendOnCca) | |
packet = b'Hello from ' + port.encode('utf8') + b'!' | |
while True: | |
radio.send(packet) | |
sleep(0.1) | |
except KeyboardInterrupt: | |
radio.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment