Skip to content

Instantly share code, notes, and snippets.

@danielfaust
Last active April 11, 2025 16:34
Show Gist options
  • Save danielfaust/3e93300b858022acb364f21b71496ec8 to your computer and use it in GitHub Desktop.
Save danielfaust/3e93300b858022acb364f21b71496ec8 to your computer and use it in GitHub Desktop.
Read out Engelmann FAW water meters via optical M-Bus in Python.
'''
Engelmann Funkaufsatzmodul FAW
https://www.engelmann.de/product/funkaufsatzmodul-faw/
Platine ttl ir lesekopf lese-schreib-Kopf EHZ Volkszähler Hichi Smartmeter
https://www.ebay.de/itm/313525835802
Wärmezähler über optische M-Bus-Schnittstelle auslesen
https://www.mikrocontroller.net/topic/438972?page=single
'''
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
import io
import os
import time
import json
import serial
import logging
import datetime
import binascii
import threading
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
LOG_TO_FILE = True
READ_SENSORS = True
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
READ_RETRIES = 10
WRITE_55_COUNT = 528 # 2400 (bits/second) * 2.2 (seconds) = 5280 (bits) | 5280 (bits) / 10 (bits/byte) = 528 (bytes) | 10 (bits/byte) = 1 (start bit) + 8 (data bits) + 1 (stop bit)
SLEEP_55_SECONDS = 0.350
SLEEP_55_SECONDS = 0.1875 # 330 / 2400 + 0.05 | EN1434-3: 330 bitperioden / 2400 baud + 50 ms "Wartezeit von 234ms (10 Zeichen Wartezeit? - Eigentlich schreibt EN1434-3 330 Bitperioden + 50ms vor)"
SLEEP_55_SECONDS = 0.100
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
devices = [
# fill the mock data with the data returned from the first reading of the sensor (75 bytes)
{'tty': '/dev/ttyUSB0', 'mock-data': '68~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~16'},
]
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
meters = {
# serial meter mbus
'12345678': {'name':'badezimmer warmwasser', 'offset': 12345 - 1234},
'23456789': {'name':'badezimmer kaltwasser', 'offset': 23456 - 2345},
}
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def get_data(ser=None):
try:
result = None
attempt = 0
while attempt < READ_RETRIES:
attempt += 1
NOW = datetime.datetime.now()
file = io.StringIO()
if LOG_TO_FILE:
if not os.path.exists('log-files'):
os.makedirs('log-files')
file = open(f"log-files/log--{NOW.strftime('%Y-%m-%d--%H%M%S')}--{attempt}.txt", 'w')
with file:
logger.info(f"waking up sensor, attempt {attempt}")
start_wakeup = time.perf_counter()
ser.parity = serial.PARITY_NONE
ser.write(b'\x55' * WRITE_55_COUNT)
ser.flush()
stop_wakeup = time.perf_counter()
logger.info(f"sent wakeup sequence in {stop_wakeup - start_wakeup:.3f} seconds (probably buffered)")
if SLEEP_55_SECONDS > 0:
time.sleep(SLEEP_55_SECONDS)
logger.info("sending data request sequence 105bfe5916")
read_data = b'\x10\x5B\xFE\x59\x16'
ser.parity = serial.PARITY_EVEN
ser.write(read_data)
ser.flush()
result = b""
datagram = b""
is_echo = True
logger.info("reading received data")
while True:
try:
if len(datagram) == 75:
print()
if LOG_TO_FILE:
file.write(f"\n{binascii.hexlify(datagram).decode()}\n")
file.flush()
logger.info("finished reading response")
return result
byte = ser.read(1)
if not byte:
logger.warning("no bytes received from sensor")
print(len(datagram))
result = None
break
result += byte
byte_array_hex = binascii.hexlify(byte)
byte_array_hex_str = byte_array_hex.decode()
if not is_echo:
print(byte_array_hex_str, flush=True, end='')
datagram += byte
if LOG_TO_FILE:
file.write(byte_array_hex_str + ' ')
file.flush()
if binascii.hexlify(result[-5:]).decode() == '105bfe5916':
logger.info("found echoed 105bfe5916, reading response")
result = b""
is_echo = False
except serial.SerialTimeoutException as e:
logger.warning("timeout occurred", exc_info=e)
print(len(datagram))
result = None
break
except serial.SerialException as e:
logger.warning("serial communication error while reading response", exc_info=e)
print(len(datagram))
result = None
break
except Exception as e:
logger.error("An unexpected error occurred while reading response", exc_info=e)
print(len(datagram))
result = None
break
return result
except serial.SerialException as e:
logger.warning("serial communication error", exc_info=e)
return None
except Exception as e:
logger.error("An unexpected error occurred while reading response", exc_info=e)
return None
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
start_sequence = '55105bfe5916'
intro_sequence = '68454568080072'
time_sequence = '046d'
total_sequence = '0413'
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def little_to_big_endian(hex_string):
bytes_list = [hex_string[i:i+2] for i in range(0, len(hex_string), 2)]
bytes_list.reverse()
return ''.join(bytes_list)
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def hex_to_datetime(hex_value):
hex_bytes = bytes.fromhex(hex_value)
minute = hex_bytes[0] & 0x3F # n5..n0 (6 bits)
hour = hex_bytes[1] & 0x1F # h4..h0 (5 bits)
day = hex_bytes[2] & 0x1F # j4..j0 (5 bits)
month = hex_bytes[3] & 0x0F # M3..M0 (4 bits)
year = ((hex_bytes[2] >> 5) | ((hex_bytes[3] & 0x70) >> 1))
return datetime.datetime(2000 + year, month, day, hour, minute)
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def hex_to_decimal(msb_hex):
return int(msb_hex, 16)
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def process_data(data):
_serial_index = data.find(intro_sequence) + len(intro_sequence)
_serial = data[_serial_index:_serial_index+8]
_serial = little_to_big_endian(_serial)
_time_index = data.find(time_sequence) + len(time_sequence)
_time = data[_time_index:_time_index+8]
_time = hex_to_datetime(_time)
_total_index = data.find(total_sequence) + len(total_sequence)
_total = data[_total_index:_total_index+8]
_total = hex_to_decimal(little_to_big_endian(_total))
if _serial not in meters:
print(f"Unknown serial number: {_serial}. Add it to the \"meters\" list as follows:\n'{_serial}': {{'name': '<meter name>', 'offset': <analog value as displayed on meter> - {_total}}},\n")
else:
_meter = meters[_serial]
_meter['now'] = datetime.datetime.now()
_meter['time'] = _time
_meter['total'] = _total + _meter['offset']
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
for device in devices:
if True:
raw_data_hex_str = None
if READ_SENSORS:
ser = serial.Serial(device['tty'], baudrate=2400, bytesize=8, parity=serial.PARITY_NONE, stopbits=1, timeout=2)
raw_data = get_data(ser)
ser.close()
if raw_data is not None:
raw_data_hex_str = binascii.hexlify(raw_data).decode()
else:
raw_data_hex_str = device['mock-data']
print(device['tty'], raw_data_hex_str)
print()
if raw_data_hex_str is not None:
process_data(raw_data_hex_str)
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
print(f"\n--------------------------\n")
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
for _serial, data in meters.items():
if 'time' in data:
_name = data['name']
_time = data['time']
_total = data['total']
print(f"{_name} | {_serial} | {_time.strftime('%Y-%m-%d %H:%M')} | {_total:>5} Liter")
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment