Created
September 10, 2024 05:41
-
-
Save magomi/126d9c751826f9d2d9e3b6d5b24e6ef7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import serial | |
import time | |
file_name = '' | |
file_initialized = False | |
def parse_command(xs): | |
""" | |
Parse a string with parentheses into a nested list. | |
Adopted from https://stackoverflow.com/a/17141899/300188 | |
Lists will be surrounded by parentheses. List entries are | |
separated by spaces. Missing parentheses will be detected | |
and an exception will be raised. | |
Examples: | |
* `x (y z)` -> `['x', ['y', 'z']]` | |
* `ab cd ef` -> `['ab', 'cd', 'ef']` | |
* `a (b (c)` -> invalid because of a missing closing paranthese | |
""" | |
stack = [[]] | |
# sourround existing parentheses with spaces to allow easy splitting around them | |
for x in xs.replace('(', ' ( ').replace(')', ' ) ').split(): | |
if x == '(': | |
stack[-1].append([]) | |
stack.append(stack[-1][-1]) | |
elif x == ')': | |
stack.pop() | |
if not stack: | |
raise ValueError('opening bracket is missing') | |
else: | |
stack[-1].append(x) | |
if len(stack) > 1: | |
raise ValueError('closing bracket is missing') | |
return stack.pop() | |
def execute_command(line: str): | |
""" | |
Parse a given command and execute the necessary method. | |
The command line has to be a nested list of keywords/parameters | |
that are separated by spaces and nested by parenthesis. | |
The first keyword represents | |
the name of the command that shall be executed. | |
Example: | |
`setup_file(filename (column1 column2))` will be interpreted as | |
the `setup_file` command, the parameter list has two entries, first | |
the `filename` of that file and second a list of names for the data | |
columns within that file. | |
""" | |
tokens = parse_command(line) | |
if tokens[0] == 'setup_file': | |
setup_file(tokens[1]) | |
else: | |
raise ValueError(f'unknown method (\'{tokens[0]}\')') | |
def setup_file(params): | |
""" | |
Create a new log file (csv formatted) based on the given | |
parameters array. | |
The first entry contains the name of the log file, the second | |
entry is a list of column headers. The very first column header | |
will always be set 'timestamp' | |
""" | |
global file_name | |
global file_initialized | |
file_name = params[0] | |
with open(file_name, 'w') as f: | |
f.write(f'timestamp;{";".join(params[1])};\n') | |
file_initialized = True | |
def log_measurement(line: str): | |
""" | |
Write a given line directly to the log file. | |
The log will only be written if the log file is already | |
initialized. | |
""" | |
global file_name | |
global file_initialized | |
if (file_initialized): | |
with open(file_name, 'a') as f: | |
f.write(f'{datetime.datetime.now().isoformat()};{line}\n') | |
if __name__ == '__main__': | |
# todo: port as a parameter (config, cmdline) | |
ser_port = 'COM24' | |
# todo: wait for successful connection instead of crashing when | |
# port not available | |
connected = False | |
while (not connected): | |
try: | |
with serial.Serial(port=ser_port, baudrate=115200, timeout=1) as ser: | |
connected = True | |
# todo: logging instead of print statements | |
print(f'connected to {ser.portstr}') | |
while True: | |
line = ser.readline().decode('utf-8').strip() | |
if len(line) > 1: | |
if line.startswith('cmd'): | |
execute_command(line.replace('cmd: ', '').strip()) | |
elif line.startswith('log'): | |
log_measurement(line.replace('log: ', '').strip()) | |
else: | |
print(line) | |
time.sleep(0.0001) | |
except serial.SerialException as sEx: | |
print(f'unable to connect to {ser_port}; trying again in 5 seconds') | |
time.sleep(5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment