Last active
April 11, 2025 16:34
-
-
Save danielfaust/3e93300b858022acb364f21b71496ec8 to your computer and use it in GitHub Desktop.
Read out Engelmann FAW water meters via optical M-Bus in Python.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Engelmann Funkaufsatzmodul FAW | |
https://www.engelmann.de/product/funkaufsatzmodul-faw/ | |
Platine ttl ir lesekopf lese-schreib-Kopf EHZ Volkszähler Hichi Smartmeter | |
https://www.ebay.de/itm/313525835802 | |
Wärmezähler über optische M-Bus-Schnittstelle auslesen | |
https://www.mikrocontroller.net/topic/438972?page=single | |
''' | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
import io | |
import os | |
import time | |
import json | |
import serial | |
import logging | |
import datetime | |
import binascii | |
import threading | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
LOG_TO_FILE = True | |
READ_SENSORS = True | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
READ_RETRIES = 10 | |
WRITE_55_COUNT = 528 # 2400 (bits/second) * 2.2 (seconds) = 5280 (bits) | 5280 (bits) / 10 (bits/byte) = 528 (bytes) | 10 (bits/byte) = 1 (start bit) + 8 (data bits) + 1 (stop bit) | |
SLEEP_55_SECONDS = 0.350 | |
SLEEP_55_SECONDS = 0.1875 # 330 / 2400 + 0.05 | EN1434-3: 330 bitperioden / 2400 baud + 50 ms "Wartezeit von 234ms (10 Zeichen Wartezeit? - Eigentlich schreibt EN1434-3 330 Bitperioden + 50ms vor)" | |
SLEEP_55_SECONDS = 0.100 | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
devices = [ | |
# fill the mock data with the data returned from the first reading of the sensor (75 bytes) | |
{'tty': '/dev/ttyUSB0', 'mock-data': '68~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~16'}, | |
] | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
meters = { | |
# serial meter mbus | |
'12345678': {'name':'badezimmer warmwasser', 'offset': 12345 - 1234}, | |
'23456789': {'name':'badezimmer kaltwasser', 'offset': 23456 - 2345}, | |
} | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
def get_data(ser=None): | |
try: | |
result = None | |
attempt = 0 | |
while attempt < READ_RETRIES: | |
attempt += 1 | |
NOW = datetime.datetime.now() | |
file = io.StringIO() | |
if LOG_TO_FILE: | |
if not os.path.exists('log-files'): | |
os.makedirs('log-files') | |
file = open(f"log-files/log--{NOW.strftime('%Y-%m-%d--%H%M%S')}--{attempt}.txt", 'w') | |
with file: | |
logger.info(f"waking up sensor, attempt {attempt}") | |
start_wakeup = time.perf_counter() | |
ser.parity = serial.PARITY_NONE | |
ser.write(b'\x55' * WRITE_55_COUNT) | |
ser.flush() | |
stop_wakeup = time.perf_counter() | |
logger.info(f"sent wakeup sequence in {stop_wakeup - start_wakeup:.3f} seconds (probably buffered)") | |
if SLEEP_55_SECONDS > 0: | |
time.sleep(SLEEP_55_SECONDS) | |
logger.info("sending data request sequence 105bfe5916") | |
read_data = b'\x10\x5B\xFE\x59\x16' | |
ser.parity = serial.PARITY_EVEN | |
ser.write(read_data) | |
ser.flush() | |
result = b"" | |
datagram = b"" | |
is_echo = True | |
logger.info("reading received data") | |
while True: | |
try: | |
if len(datagram) == 75: | |
print() | |
if LOG_TO_FILE: | |
file.write(f"\n{binascii.hexlify(datagram).decode()}\n") | |
file.flush() | |
logger.info("finished reading response") | |
return result | |
byte = ser.read(1) | |
if not byte: | |
logger.warning("no bytes received from sensor") | |
print(len(datagram)) | |
result = None | |
break | |
result += byte | |
byte_array_hex = binascii.hexlify(byte) | |
byte_array_hex_str = byte_array_hex.decode() | |
if not is_echo: | |
print(byte_array_hex_str, flush=True, end='') | |
datagram += byte | |
if LOG_TO_FILE: | |
file.write(byte_array_hex_str + ' ') | |
file.flush() | |
if binascii.hexlify(result[-5:]).decode() == '105bfe5916': | |
logger.info("found echoed 105bfe5916, reading response") | |
result = b"" | |
is_echo = False | |
except serial.SerialTimeoutException as e: | |
logger.warning("timeout occurred", exc_info=e) | |
print(len(datagram)) | |
result = None | |
break | |
except serial.SerialException as e: | |
logger.warning("serial communication error while reading response", exc_info=e) | |
print(len(datagram)) | |
result = None | |
break | |
except Exception as e: | |
logger.error("An unexpected error occurred while reading response", exc_info=e) | |
print(len(datagram)) | |
result = None | |
break | |
return result | |
except serial.SerialException as e: | |
logger.warning("serial communication error", exc_info=e) | |
return None | |
except Exception as e: | |
logger.error("An unexpected error occurred while reading response", exc_info=e) | |
return None | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
start_sequence = '55105bfe5916' | |
intro_sequence = '68454568080072' | |
time_sequence = '046d' | |
total_sequence = '0413' | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
def little_to_big_endian(hex_string): | |
bytes_list = [hex_string[i:i+2] for i in range(0, len(hex_string), 2)] | |
bytes_list.reverse() | |
return ''.join(bytes_list) | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
def hex_to_datetime(hex_value): | |
hex_bytes = bytes.fromhex(hex_value) | |
minute = hex_bytes[0] & 0x3F # n5..n0 (6 bits) | |
hour = hex_bytes[1] & 0x1F # h4..h0 (5 bits) | |
day = hex_bytes[2] & 0x1F # j4..j0 (5 bits) | |
month = hex_bytes[3] & 0x0F # M3..M0 (4 bits) | |
year = ((hex_bytes[2] >> 5) | ((hex_bytes[3] & 0x70) >> 1)) | |
return datetime.datetime(2000 + year, month, day, hour, minute) | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
def hex_to_decimal(msb_hex): | |
return int(msb_hex, 16) | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
def process_data(data): | |
_serial_index = data.find(intro_sequence) + len(intro_sequence) | |
_serial = data[_serial_index:_serial_index+8] | |
_serial = little_to_big_endian(_serial) | |
_time_index = data.find(time_sequence) + len(time_sequence) | |
_time = data[_time_index:_time_index+8] | |
_time = hex_to_datetime(_time) | |
_total_index = data.find(total_sequence) + len(total_sequence) | |
_total = data[_total_index:_total_index+8] | |
_total = hex_to_decimal(little_to_big_endian(_total)) | |
if _serial not in meters: | |
print(f"Unknown serial number: {_serial}. Add it to the \"meters\" list as follows:\n'{_serial}': {{'name': '<meter name>', 'offset': <analog value as displayed on meter> - {_total}}},\n") | |
else: | |
_meter = meters[_serial] | |
_meter['now'] = datetime.datetime.now() | |
_meter['time'] = _time | |
_meter['total'] = _total + _meter['offset'] | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
for device in devices: | |
if True: | |
raw_data_hex_str = None | |
if READ_SENSORS: | |
ser = serial.Serial(device['tty'], baudrate=2400, bytesize=8, parity=serial.PARITY_NONE, stopbits=1, timeout=2) | |
raw_data = get_data(ser) | |
ser.close() | |
if raw_data is not None: | |
raw_data_hex_str = binascii.hexlify(raw_data).decode() | |
else: | |
raw_data_hex_str = device['mock-data'] | |
print(device['tty'], raw_data_hex_str) | |
print() | |
if raw_data_hex_str is not None: | |
process_data(raw_data_hex_str) | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
print(f"\n--------------------------\n") | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
for _serial, data in meters.items(): | |
if 'time' in data: | |
_name = data['name'] | |
_time = data['time'] | |
_total = data['total'] | |
print(f"{_name} | {_serial} | {_time.strftime('%Y-%m-%d %H:%M')} | {_total:>5} Liter") | |
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment