Created
March 30, 2021 20:35
-
-
Save michalfratczak/5c9000cf6a5a89e0a83ca05ed33a1e6e to your computer and use it in GitHub Desktop.
HabDec_to_AntennaRotor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import string | |
import json | |
import time | |
from pprint import pprint | |
import threading | |
import time | |
from ws4py.client.threadedclient import WebSocketClient # pip install ws4py | |
import urllib3 | |
user_agent = {'user-agent': 'Mozilla/5.0 (Windows NT 6.3; rv:36.0) Gecko/20100101 Firefox/36.0'} | |
urllib3.disable_warnings() | |
http = urllib3.PoolManager(10, user_agent) | |
# get flights from HABITAT | |
def getFlights(): | |
_url = ( | |
"http://habitat.habhub.org/habitat/_design/flight/_view/end_start_including_payloads?startkey=[" | |
+ str(int(time.time())) | |
+ "]&include_docs=True" | |
) | |
flights_json = http.request("GET", _url) | |
flights = json.loads(flights_json.data.decode("utf-8"))["rows"] | |
res = [f for f in flights if f["doc"]["type"] == "flight"] | |
return res | |
# get payloads from ^^^ flights | |
def getPayloads(i_flights=None): | |
_url = ( | |
"http://habitat.habhub.org/habitat/_design/flight/_view/end_start_including_payloads?startkey=[" | |
+ str(int(time.time())) | |
+ "]&include_docs=True" | |
) | |
all_docs = http.request("GET", _url) | |
all_docs = json.loads(all_docs.data.decode("utf-8"))["rows"] | |
if not i_flights: | |
i_flights = getFlights() | |
flyingPayloadsIds = [] | |
for f in i_flights: | |
flyingPayloadsIds.extend(f["doc"]["payloads"]) | |
res = [] | |
for f in all_docs: | |
if ( | |
f["doc"]["type"] == "payload_configuration" | |
and f["doc"]["_id"] in flyingPayloadsIds | |
): | |
res.append(f) | |
return res | |
# convert from NMEA to decimal | |
def degree_2_decimal(i_pos): | |
i_pos = float(i_pos) | |
degs = math.trunc(i_pos / 100) | |
mins = i_pos - 100.0 * degs | |
res = degs + mins / 60.0 | |
return res | |
# parse sentences based on lat/lon/alt positional info | |
class SentenceParser(object): | |
def __init__(self): | |
self.callsign = '' | |
self.lat_pos = -1 # latitude position in sentence | |
self.lon_pos = -1 # longitude position in sentence | |
self.alt_pos = -1 # altitude position in sentence | |
self.lat_is_NMEA = False | |
self.lon_is_NMEA = False | |
def parse_sentence(self, i_sentence): | |
''' | |
input HAB sentence: $$$$callsign,time,lat,lon,alt,sensor*CRC | |
return tuple(lat,lon,alt) | |
''' | |
tokens = i_sentence.split(',') | |
lat = tokens[self.lat_pos] | |
if self.lat_is_NMEA: | |
lat = degree_2_decimal(lat) | |
else: | |
lat = float(lat) | |
lon = tokens[self.lon_pos] | |
if self.lon_is_NMEA: | |
lon = degree_2_decimal(lon) | |
else: | |
lon = float(lon) | |
alt = tokens[self.alt_pos] | |
return lat, lon, alt | |
# get info on sentence structure for all payloads from HABITAT | |
def get_payloads_data(): | |
flights = getFlights() | |
payloads = getPayloads(flights) | |
payloads_info = {} # key: callsign, value: SentenceParser | |
print("Building payloads info from HABITAT") | |
for p in payloads: | |
for s in p['doc']['sentences']: | |
# pprint(s) | |
s_parser = SentenceParser() | |
s_parser.callsign = s['callsign'] | |
for fi, field in enumerate(s['fields']): | |
# print fi, field | |
if field['sensor'] == 'stdtelem.coordinate' and field['name'] == 'latitude': | |
s_parser.lat_pos = fi | |
if field['format'] == 'dd.mm.mmmm': | |
s_parser.lat_is_NMEA = True | |
if field['sensor'] == 'stdtelem.coordinate' and field['name'] == 'longitude': | |
s_parser.lon_pos = fi | |
if field['format'] == 'dd.mm.mmmm': | |
s_parser.lon_is_NMEA = True | |
if field['name'] == 'altitude': | |
s_parser.alt_pos = fi | |
payloads_info[s_parser.callsign] = s_parser | |
# return | |
return payloads_info | |
# HABDEC connection | |
class HabDecClient(WebSocketClient): | |
def received_message(self, m): | |
data = str(m.data) | |
cmd_type = 'cmd::sentence=' | |
# cmd_type = 'cmd::info:liveprint=' | |
if data.startswith(cmd_type): | |
sentence = data.replace(cmd_type, '') | |
self.sentence_callback(sentence) | |
def opened(self): | |
print("Opened Connection") | |
def closed(self, code, reason=None): | |
print("Closed down", code, reason) | |
def main(): | |
payloads_info = get_payloads_data() | |
def parsing_callback(sentence): | |
print('parsing_callback:', sentence) | |
try: | |
sentence = sentence[sentence.rindex('$'):] | |
sentence = sentence[:sentence.index('*')] | |
except: | |
pass | |
tokens = sentence.split(',') | |
callsign = tokens[0] | |
if callsign in payloads_info: | |
lat,lon,alt = payloads_info[callsign].parse_sentence(sentence) | |
# calculate azimuth and elevation from antenna to lat/lon/alt | |
# ie. https://github.com/projecthorus/horus_utils/blob/master/horuslib/earthmaths.py | |
# send to rotor | |
print(callsign, lat, lon, alt) | |
else: | |
print('No info for this callsign: ', callsign) | |
habdec_client = HabDecClient( "ws://localhost:5555") | |
habdec_client.sentence_callback = parsing_callback | |
habdec_client.connect() | |
threading.Thread( target = lambda: habdec_client.run_forever() ).start() | |
while 1: | |
try: | |
time.sleep(1) | |
except KeyboardInterrupt: | |
return | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment