Skip to content

Instantly share code, notes, and snippets.

@michalfratczak
Created March 30, 2021 20:35
Show Gist options
  • Save michalfratczak/5c9000cf6a5a89e0a83ca05ed33a1e6e to your computer and use it in GitHub Desktop.
Save michalfratczak/5c9000cf6a5a89e0a83ca05ed33a1e6e to your computer and use it in GitHub Desktop.
HabDec_to_AntennaRotor
#!/usr/bin/env python3
import string
import json
import time
from pprint import pprint
import threading
import time
from ws4py.client.threadedclient import WebSocketClient # pip install ws4py
import urllib3
user_agent = {'user-agent': 'Mozilla/5.0 (Windows NT 6.3; rv:36.0) Gecko/20100101 Firefox/36.0'}
urllib3.disable_warnings()
http = urllib3.PoolManager(10, user_agent)
# get flights from HABITAT
def getFlights():
_url = (
"http://habitat.habhub.org/habitat/_design/flight/_view/end_start_including_payloads?startkey=["
+ str(int(time.time()))
+ "]&include_docs=True"
)
flights_json = http.request("GET", _url)
flights = json.loads(flights_json.data.decode("utf-8"))["rows"]
res = [f for f in flights if f["doc"]["type"] == "flight"]
return res
# get payloads from ^^^ flights
def getPayloads(i_flights=None):
_url = (
"http://habitat.habhub.org/habitat/_design/flight/_view/end_start_including_payloads?startkey=["
+ str(int(time.time()))
+ "]&include_docs=True"
)
all_docs = http.request("GET", _url)
all_docs = json.loads(all_docs.data.decode("utf-8"))["rows"]
if not i_flights:
i_flights = getFlights()
flyingPayloadsIds = []
for f in i_flights:
flyingPayloadsIds.extend(f["doc"]["payloads"])
res = []
for f in all_docs:
if (
f["doc"]["type"] == "payload_configuration"
and f["doc"]["_id"] in flyingPayloadsIds
):
res.append(f)
return res
# convert from NMEA to decimal
def degree_2_decimal(i_pos):
i_pos = float(i_pos)
degs = math.trunc(i_pos / 100)
mins = i_pos - 100.0 * degs
res = degs + mins / 60.0
return res
# parse sentences based on lat/lon/alt positional info
class SentenceParser(object):
def __init__(self):
self.callsign = ''
self.lat_pos = -1 # latitude position in sentence
self.lon_pos = -1 # longitude position in sentence
self.alt_pos = -1 # altitude position in sentence
self.lat_is_NMEA = False
self.lon_is_NMEA = False
def parse_sentence(self, i_sentence):
'''
input HAB sentence: $$$$callsign,time,lat,lon,alt,sensor*CRC
return tuple(lat,lon,alt)
'''
tokens = i_sentence.split(',')
lat = tokens[self.lat_pos]
if self.lat_is_NMEA:
lat = degree_2_decimal(lat)
else:
lat = float(lat)
lon = tokens[self.lon_pos]
if self.lon_is_NMEA:
lon = degree_2_decimal(lon)
else:
lon = float(lon)
alt = tokens[self.alt_pos]
return lat, lon, alt
# get info on sentence structure for all payloads from HABITAT
def get_payloads_data():
flights = getFlights()
payloads = getPayloads(flights)
payloads_info = {} # key: callsign, value: SentenceParser
print("Building payloads info from HABITAT")
for p in payloads:
for s in p['doc']['sentences']:
# pprint(s)
s_parser = SentenceParser()
s_parser.callsign = s['callsign']
for fi, field in enumerate(s['fields']):
# print fi, field
if field['sensor'] == 'stdtelem.coordinate' and field['name'] == 'latitude':
s_parser.lat_pos = fi
if field['format'] == 'dd.mm.mmmm':
s_parser.lat_is_NMEA = True
if field['sensor'] == 'stdtelem.coordinate' and field['name'] == 'longitude':
s_parser.lon_pos = fi
if field['format'] == 'dd.mm.mmmm':
s_parser.lon_is_NMEA = True
if field['name'] == 'altitude':
s_parser.alt_pos = fi
payloads_info[s_parser.callsign] = s_parser
# return
return payloads_info
# HABDEC connection
class HabDecClient(WebSocketClient):
def received_message(self, m):
data = str(m.data)
cmd_type = 'cmd::sentence='
# cmd_type = 'cmd::info:liveprint='
if data.startswith(cmd_type):
sentence = data.replace(cmd_type, '')
self.sentence_callback(sentence)
def opened(self):
print("Opened Connection")
def closed(self, code, reason=None):
print("Closed down", code, reason)
def main():
payloads_info = get_payloads_data()
def parsing_callback(sentence):
print('parsing_callback:', sentence)
try:
sentence = sentence[sentence.rindex('$'):]
sentence = sentence[:sentence.index('*')]
except:
pass
tokens = sentence.split(',')
callsign = tokens[0]
if callsign in payloads_info:
lat,lon,alt = payloads_info[callsign].parse_sentence(sentence)
# calculate azimuth and elevation from antenna to lat/lon/alt
# ie. https://github.com/projecthorus/horus_utils/blob/master/horuslib/earthmaths.py
# send to rotor
print(callsign, lat, lon, alt)
else:
print('No info for this callsign: ', callsign)
habdec_client = HabDecClient( "ws://localhost:5555")
habdec_client.sentence_callback = parsing_callback
habdec_client.connect()
threading.Thread( target = lambda: habdec_client.run_forever() ).start()
while 1:
try:
time.sleep(1)
except KeyboardInterrupt:
return
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment