Last active
January 20, 2025 20:02
-
-
Save florianpasteur/f67784d52918e80c6cd8647785d28152 to your computer and use it in GitHub Desktop.
Ant devices to MQTT for home assistant
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from openant.easy.node import Node | |
from openant.devices import ANTPLUS_NETWORK_KEY | |
from openant.devices.heart_rate import HeartRate, HeartRateData | |
from openant.devices.bike_speed_cadence import BikeCadence, BikeCadenceData | |
from openant.devices.power_meter import PowerMeter, PowerData | |
import paho.mqtt.client as mqtt | |
# Load configuration | |
def load_config(config_file="config.json"): | |
with open(config_file, "r") as f: | |
return json.load(f) | |
# MQTT client setup | |
mqtt_client = mqtt.Client() | |
def mqtt_connect(config, discovery_topic): | |
"""Connect to the MQTT broker using details from the config.""" | |
mqtt_details = config["mqtt"] | |
mqtt_client.username_pw_set(mqtt_details["username"], mqtt_details["password"]) | |
mqtt_client.connect(mqtt_details["broker"], mqtt_details["port"]) | |
mqtt_client.loop_start() | |
print("Connected to MQTT broker") | |
mqtt_client.publish(discovery_topic, json.dumps({ | |
"status": "connected", | |
"topic": discovery_topic, | |
})) | |
def setup_device(node, device_type, device_id, mqtt_base_topic, discovery_topic): | |
if device_type == "heart_rate": | |
device = HeartRate(node, device_id=device_id) | |
elif device_type == "cadence": | |
device = BikeCadence(node, device_id=device_id) | |
elif device_type == "power": | |
device = PowerMeter(node, device_id=device_id) | |
else: | |
raise ValueError(f"Unsupported device type: {device_type}") | |
def on_found(): | |
"""Callback when the device is found.""" | |
device_id = device.device_id | |
print(f"Device {device} found: ID={device_id}, Type={device_type}") | |
topic = f"{mqtt_base_topic}/{device_type}/{device_id}" | |
# Publish device discovery to the "ant/discovery" MQTT topic | |
discovery_data = { | |
"device_id": device_id, | |
"device_type": device_type, | |
"topic": topic, | |
"name": device.name, | |
# "battery": device.last_battery_data.status, | |
} | |
mqtt_client.publish(discovery_topic, json.dumps(discovery_data)) | |
print(f"Device discovery data sent to ant/discovery: {discovery_data}") | |
# Now set up the device data publishing topic | |
print(f"Using MQTT topic: {topic}") | |
def on_device_data(page: int, page_name: str, data): | |
"""Callback for receiving device data.""" | |
mqtt_client.publish(f"{topic}/raw", json.dumps(data, default=str)) | |
if hasattr(data, 'battery_percentage'): | |
battery = data.battery_percentage | |
print(f"[{device_id}] 🔋 {battery}") | |
mqtt_client.publish(f"{topic}/battery", battery) | |
if isinstance(data, HeartRateData): | |
hr = data.heart_rate | |
print(f"[{device_id}] ❤️ {hr}") | |
mqtt_client.publish(topic, hr) | |
elif isinstance(data, BikeCadenceData): | |
cadence = data.cadence | |
print(f"[{device_id}] ♾️ {cadence}") | |
mqtt_client.publish(topic, cadence) | |
elif isinstance(data, PowerData): | |
power = data.instantaneous_power | |
print(f"[{device_id}] ⚡️ {power}") | |
mqtt_client.publish(topic, power) | |
if data.cadence: | |
cadence = data.cadence | |
print(f"[{device_id}] ♾️ {cadence}") | |
mqtt_client.publish(f"{topic}/cadence", cadence) | |
device.on_device_data = on_device_data | |
device.on_found = on_found | |
return device | |
def main(config_file="config.json"): | |
# Load configuration | |
config = load_config(config_file) | |
mqtt_base_topic = config["mqtt"]["base_topic"] | |
discovery_topic = f"{mqtt_base_topic}/discovery" | |
# Connect to MQTT broker | |
mqtt_connect(config, discovery_topic) | |
# Setup ANT+ node | |
node = Node() | |
node.set_network_key(0x00, ANTPLUS_NETWORK_KEY) | |
devices = [] | |
for device in config["devices"]: | |
print(f"Setting up device: {device}") | |
devices.append(setup_device(node, device["type"], device["id"], mqtt_base_topic, discovery_topic)) | |
# Set up all supported device types for scanning | |
# for device_type in ["heart_rate", "cadence", "power"]: | |
# devices.append(setup_device(node, device_type, 0, mqtt_base_topic, discovery_topic)) | |
try: | |
print(f"Scanning for devices, press Ctrl-C to finish") | |
node.start() | |
except KeyboardInterrupt: | |
print("Closing ANT+ devices...") | |
finally: | |
for device in devices: | |
device.close_channel() | |
node.stop() | |
mqtt_client.loop_stop() | |
mqtt_client.disconnect() | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"mqtt": { | |
"broker": "mqtt_broker_host", | |
"username": "your_username", | |
"password": "your_password", | |
"port": 1883, | |
"base_topic": "ant" | |
}, | |
"devices": [ | |
{ "id": 1, "type": "device_type", "name": "device_name"} | |
] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment