Skip to content

Instantly share code, notes, and snippets.

@florianpasteur
Last active January 20, 2025 20:02
Show Gist options
  • Save florianpasteur/f67784d52918e80c6cd8647785d28152 to your computer and use it in GitHub Desktop.
Save florianpasteur/f67784d52918e80c6cd8647785d28152 to your computer and use it in GitHub Desktop.
Ant devices to MQTT for home assistant
import json
from openant.easy.node import Node
from openant.devices import ANTPLUS_NETWORK_KEY
from openant.devices.heart_rate import HeartRate, HeartRateData
from openant.devices.bike_speed_cadence import BikeCadence, BikeCadenceData
from openant.devices.power_meter import PowerMeter, PowerData
import paho.mqtt.client as mqtt
# Load configuration
def load_config(config_file="config.json"):
with open(config_file, "r") as f:
return json.load(f)
# MQTT client setup
mqtt_client = mqtt.Client()
def mqtt_connect(config, discovery_topic):
"""Connect to the MQTT broker using details from the config."""
mqtt_details = config["mqtt"]
mqtt_client.username_pw_set(mqtt_details["username"], mqtt_details["password"])
mqtt_client.connect(mqtt_details["broker"], mqtt_details["port"])
mqtt_client.loop_start()
print("Connected to MQTT broker")
mqtt_client.publish(discovery_topic, json.dumps({
"status": "connected",
"topic": discovery_topic,
}))
def setup_device(node, device_type, device_id, mqtt_base_topic, discovery_topic):
if device_type == "heart_rate":
device = HeartRate(node, device_id=device_id)
elif device_type == "cadence":
device = BikeCadence(node, device_id=device_id)
elif device_type == "power":
device = PowerMeter(node, device_id=device_id)
else:
raise ValueError(f"Unsupported device type: {device_type}")
def on_found():
"""Callback when the device is found."""
device_id = device.device_id
print(f"Device {device} found: ID={device_id}, Type={device_type}")
topic = f"{mqtt_base_topic}/{device_type}/{device_id}"
# Publish device discovery to the "ant/discovery" MQTT topic
discovery_data = {
"device_id": device_id,
"device_type": device_type,
"topic": topic,
"name": device.name,
# "battery": device.last_battery_data.status,
}
mqtt_client.publish(discovery_topic, json.dumps(discovery_data))
print(f"Device discovery data sent to ant/discovery: {discovery_data}")
# Now set up the device data publishing topic
print(f"Using MQTT topic: {topic}")
def on_device_data(page: int, page_name: str, data):
"""Callback for receiving device data."""
mqtt_client.publish(f"{topic}/raw", json.dumps(data, default=str))
if hasattr(data, 'battery_percentage'):
battery = data.battery_percentage
print(f"[{device_id}] 🔋 {battery}")
mqtt_client.publish(f"{topic}/battery", battery)
if isinstance(data, HeartRateData):
hr = data.heart_rate
print(f"[{device_id}] ❤️ {hr}")
mqtt_client.publish(topic, hr)
elif isinstance(data, BikeCadenceData):
cadence = data.cadence
print(f"[{device_id}] ♾️ {cadence}")
mqtt_client.publish(topic, cadence)
elif isinstance(data, PowerData):
power = data.instantaneous_power
print(f"[{device_id}] ⚡️ {power}")
mqtt_client.publish(topic, power)
if data.cadence:
cadence = data.cadence
print(f"[{device_id}] ♾️ {cadence}")
mqtt_client.publish(f"{topic}/cadence", cadence)
device.on_device_data = on_device_data
device.on_found = on_found
return device
def main(config_file="config.json"):
# Load configuration
config = load_config(config_file)
mqtt_base_topic = config["mqtt"]["base_topic"]
discovery_topic = f"{mqtt_base_topic}/discovery"
# Connect to MQTT broker
mqtt_connect(config, discovery_topic)
# Setup ANT+ node
node = Node()
node.set_network_key(0x00, ANTPLUS_NETWORK_KEY)
devices = []
for device in config["devices"]:
print(f"Setting up device: {device}")
devices.append(setup_device(node, device["type"], device["id"], mqtt_base_topic, discovery_topic))
# Set up all supported device types for scanning
# for device_type in ["heart_rate", "cadence", "power"]:
# devices.append(setup_device(node, device_type, 0, mqtt_base_topic, discovery_topic))
try:
print(f"Scanning for devices, press Ctrl-C to finish")
node.start()
except KeyboardInterrupt:
print("Closing ANT+ devices...")
finally:
for device in devices:
device.close_channel()
node.stop()
mqtt_client.loop_stop()
mqtt_client.disconnect()
if __name__ == "__main__":
main()
{
"mqtt": {
"broker": "mqtt_broker_host",
"username": "your_username",
"password": "your_password",
"port": 1883,
"base_topic": "ant"
},
"devices": [
{ "id": 1, "type": "device_type", "name": "device_name"}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment