Created
September 1, 2020 23:03
-
-
Save michalfratczak/4ff0785b5f7c5da8a4273552097a1cf6 to your computer and use it in GitHub Desktop.
fldigi loger
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python | |
''' | |
This script reads characters from fldigi and scans for UKHAS sentences https://ukhas.org.uk/communication:protocol | |
For every valid sentence, log to file. Works only for sentences with CRC16 checksum, XOR is not supported. | |
Run fldigi with these parameters | |
dl-fldigi.exe --hab --arq-server-address 127.0.0.1 --arq-server-port 7777 | |
or | |
fldigi.exe --arq-server-address 127.0.0.1 --arq-server-port 7777 | |
''' | |
from __future__ import print_function | |
import string | |
import socket | |
import select | |
import time | |
import re | |
import traceback | |
import threading | |
import queue | |
from datetime import datetime | |
def crc(i_str): | |
def _hex(n): return hex(int(n))[2:] | |
i_str = str(i_str) | |
CRC = 0xffff | |
for i in xrange(len(i_str)): | |
CRC ^= ord(i_str[i]) << 8 | |
for j in xrange(8): | |
if (CRC & 0x8000): | |
CRC = (CRC << 1) ^ 0x1021 | |
else: | |
CRC <<= 1 | |
result = '' | |
result += _hex((CRC >> 12) & 15) | |
result += _hex((CRC >> 8) & 15) | |
result += _hex((CRC >> 4) & 15) | |
result += _hex(CRC & 15) | |
return result.lower() | |
def make_connection(host='localhost', port='7777'): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
try: | |
port = int(port) | |
sock.connect(( host, port )) | |
sock.setblocking(0) | |
except: | |
print("fldigi2habboy: make_connection failed.") | |
print(traceback.format_exc()) | |
return sock | |
def read_from_fldigi(sock, callback = None): | |
timeout_in_seconds = 10 | |
while 1: | |
try: | |
ready = select.select([sock], [], [], timeout_in_seconds) | |
if ready[0]: | |
chunk = sock.recv(4096) | |
if callback: | |
callback(chunk) | |
else: | |
print("Waiting for some fldigi data ...") | |
except KeyboardInterrupt: | |
return | |
except: | |
print(traceback.format_exc()) | |
class UkhasSentenceScanner(object): | |
''' | |
1. push() some characters into internal buffer | |
2. if a valid sentence is found, run callback(sentence) | |
''' | |
def __init__(self): | |
self.__buff__ = '' | |
self.__rex__ = re.compile( r'''(.*?)\$+(.+)(\*)(\w{4})(.*)''' ) | |
self.__sentence_callback__ = print | |
def set_callback(self, cb): | |
self.__sentence_callback__ = cb | |
def push(self, chars): | |
# print(chars, end='') | |
if len(self.__buff__) > 5000: | |
self.__buff__ = '' | |
self.__buff__ += chars | |
sentences = self.__scan__() | |
for s in sentences: | |
self.__sentence_callback__(s) | |
def __scan__(self): | |
sentences = [] | |
for line in string.split(self.__buff__, '\n'): | |
mo = self.__rex__.match( string.strip(line) ) | |
if not mo: | |
print('no MO', string.strip(line)) | |
self.__buff__ = line | |
continue | |
if len(mo.groups()) != 5: | |
self.__buff__ = line #sentence might be not complete yet | |
continue | |
self.__buff__ = '' | |
sentence_no_crc = mo.group(2) | |
_crc = mo.group(4) | |
if crc(sentence_no_crc).lower() == _crc.lower(): | |
print('CRC ok:', sentence_no_crc, _crc, crc(sentence_no_crc) ) | |
sentences.append( (sentence_no_crc, _crc) ) | |
else: | |
print('CRC not valid:', sentence_no_crc, _crc, crc(sentence_no_crc) ) | |
sentences.append( (sentence_no_crc, _crc) ) | |
return sentences | |
def log_to_file(line, filename = None): | |
if not filename: | |
filename = './' + datetime.now().strftime('%Y_%m_%d_fldigi.txt') | |
now = datetime.now().isoformat().replace('T', ' ') | |
with open(filename, 'a') as fh: | |
fh.write('\n' + now + ' => ' + line) | |
def main(): | |
print("Scanner") | |
scanner = UkhasSentenceScanner() | |
scanner.set_callback( lambda sent_and_crc: | |
log_to_file( sent_and_crc[0] + '*' + sent_and_crc[1] ) ) | |
print("fldigi connection") | |
con = make_connection() | |
print("Connected to fldigi") | |
print("Go !") | |
read_from_fldigi( con, lambda chars: scanner.push(chars) ) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment