Created
August 1, 2019 11:50
-
-
Save atx/ef4df535f81389dea4905761ae7e05f7 to your computer and use it in GitHub Desktop.
Hacky script to export Mozilla webthings gateway values to influxdb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
import argparse | |
import asyncio | |
import configparser | |
import datetime | |
import influxdb | |
import json | |
import pathlib | |
import requests | |
import websockets | |
from urllib.parse import urljoin | |
class Translator: | |
def __init__(self, config): | |
self.gateway_url = config["gateway"]["url"] | |
self.gateway_token = config["gateway"]["token"] | |
self.session = requests.Session() | |
self.session.headers["Accept"] = "application/json" | |
self.session.headers["Authorization"] = \ | |
"Bearer {}".format(config["gateway"]["token"]) | |
self.connected_to = set() | |
self.influx = influxdb.InfluxDBClient( | |
host=config["influx"]["host"], | |
port=int(config["influx"]["port"]) | |
) | |
self.influx.switch_database(config["influx"]["database"]) | |
def update_things(self): | |
loop = asyncio.get_event_loop() | |
jsn = self.session.get(urljoin(self.gateway_url, "/things")).json() | |
for thing in jsn: | |
ws_url = urljoin(self.gateway_url.replace("http", "ws"), | |
thing["href"] + "?jwt={}".format(self.gateway_token)) | |
if ws_url in self.connected_to: | |
continue | |
print("Connecting to {}".format(thing["href"])) | |
self.connected_to.add(ws_url) | |
loop.create_task(self._thing_task(thing["href"], ws_url)) | |
return jsn | |
@staticmethod | |
def _metric_name(thing_name, property_name): | |
return thing_name + "_" + property_name | |
def _send_measurement(self, thing_name, property_name, value): | |
metric_name = self.__class__._metric_name(thing_name, property_name) | |
timestamp = datetime.datetime.utcnow().isoformat() + "Z" | |
print("Sending {} = {} @{}".format(metric_name, value, timestamp)) | |
json_body = { | |
"measurement": metric_name, | |
"time": timestamp, | |
"fields": { | |
"value": value | |
} | |
} | |
self.influx.write_points([json_body]) | |
async def _thing_task(self, thing_name, ws_url): | |
async with websockets.connect(ws_url) as ws: | |
while True: | |
msg = json.loads(await ws.recv()) | |
if msg["messageType"] != "propertyStatus": | |
continue | |
for property_name, property_value in msg["data"].items(): | |
if not isinstance(property_value, float): | |
property_value = float(property_value) | |
self._send_measurement(thing_name, property_name, property_value) | |
async def run(self): | |
while True: | |
self.update_things() | |
await asyncio.sleep(10) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-c", "--config", type=pathlib.Path, | |
default=(pathlib.Path(__file__).parent / "config.ini")) | |
args = parser.parse_args() | |
print("Loading config file from '{}'".format(args.config)) | |
config = configparser.RawConfigParser() | |
config.read(args.config) | |
print("Starting translator...") | |
translator = Translator(config) | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(translator.run()) | |
loop.run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment