Created
August 21, 2023 00:30
-
-
Save kzaremski/43513ec02aca157f049dca1d8f77d509 to your computer and use it in GitHub Desktop.
CC of CO RockSat-X 2022-2023 Telemetry to CSV/Logfile Exporter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #/usr/bin/python | |
| """ | |
| Konstantin Zaremski <konstantin.zaremski@gmail.com> | |
| <kzaremsk@msudenver.edu> | |
| <kzaremski@student.cccs.edu> | |
| August 18, 2023 | |
| CC of CO RockSat-X 2022-23 Telemetry to CSV Converter | |
| This simple python program converts the archaic telemetry format | |
| that the payload transmits in to CSV data for import in to MS Excel | |
| or another program for data analysis. | |
| Operation | |
| rocksat_2023_telem_to_csv.py <input_file_path> <output_file_path> | |
| This is the default behavior and usage of the program. The first | |
| option should be the absolute or relative path to the telemetry | |
| output file provided by NASA telemetry engineers or techs. The | |
| second option should be the output file (csv format) that contains | |
| the converted telemetry sensor data. | |
| rocksat_2023_telem_to_csv.py <input_file_path> <output_file_path> | |
| -l <log_output_file_path> | |
| rocksat_2023_telem_to_csv.py <input_file_path> <output_file_path> | |
| --log <log_output_file_path> | |
| If you desire to have payload log events extracted from the | |
| telemetry output, use the option "-l" or "--log", followed by | |
| an additional output file path where the log events that have | |
| been extracted will be saved in plain text format. | |
| """ | |
| # Import dependencies | |
| import sys | |
| # Map telemetry variables against their sensor counterparts | |
| sensors = { | |
| "t": "Time", | |
| "T_i": "BME680 (Internal) Temperature", | |
| "T_e": "BME680 (External) Temperature", | |
| "d": "VL54L0X Distance", | |
| "a_x": "BNO055 Acceleration (x)", | |
| "a_y": "BNO055 Acceleration (y)", | |
| "a_z": "BNO055 Acceleration (z)", | |
| } | |
| # Main Method | |
| def main(args): | |
| # Empty Variables | |
| inPath = None | |
| outPath = None | |
| logPath = None | |
| # Parse Arguments | |
| if len(args) == 0: | |
| return print("Please specity input and output file paths.") | |
| elif len(args) == 1: | |
| return print("Please specify an output path.") | |
| # Primary Operation Mode, set the arguments | |
| elif len(args) == 2: | |
| inPath = args[0] | |
| outPath = args[1] | |
| elif len(args) == 3 and (arg[2] == "-l" or arg[2] == "--log"): | |
| return print("Please specify an output path for the payload log file since you have elected to export it with \"-l\" or \"--log\"!") | |
| # Mode that includes the logfile | |
| else: | |
| inPath = args[0] | |
| outPath = args[1] | |
| logPath = args[3] | |
| # Empty arrays representing the lines of the respective output files | |
| dataLines = [",".join(sensors.values())] | |
| logLines = [] | |
| # Read in the input file | |
| inputBlobString = [] | |
| with open(inPath, "r", errors="ignore") as inputFile: | |
| # Read in all the lines at once and then close the file | |
| # The payload does not produce telemtry data at the scale | |
| # that we would be needing to read in the file chunks at a time. | |
| inputBlobString = inputFile.read() | |
| # Clean up the file prior to input | |
| # Trim newlines | |
| inputBlobString = inputBlobString.replace("\n", "") | |
| # Trim off the opening brackets | |
| inputBlobString = inputBlobString.replace("[", "") | |
| # Trim off NASA/Wallops additions | |
| inputBlobString = inputBlobString.replace("%% Header %%", "") | |
| inputBlobString = inputBlobString.replace("%% %%", "") | |
| # Split in to lines based on the closing brackets | |
| inputLines = inputBlobString.split("]") | |
| # Data Messages | |
| dataObjects = [] | |
| # For each line | |
| for telemetryMessage in inputLines: | |
| # Split the line | |
| messageParts = telemetryMessage.split(">") | |
| # If invalid, skip | |
| if len(messageParts) != 2: | |
| continue | |
| # Extract the time that the payload says that it sent the message | |
| payloadTime = messageParts[0] | |
| # Determine if it is a log line or data line | |
| messageString = messageParts[1] | |
| dataLine = messageString[0:2] == "d:" or (messageString[1] == "_" and messageString[3] == ":") | |
| # If is a data line, parse as such | |
| if dataLine: | |
| # Data Message | |
| dataObject = {} | |
| # Individual measurements | |
| measurements = messageParts[1].split(",") | |
| # For each measurement | |
| for measurement in measurements: | |
| # Split the measurement in to its respective key-value pair | |
| measurementParts = measurement.split(":") | |
| if len(measurementParts) != 2: | |
| print(f"INVALID: {measurement}") | |
| continue | |
| # Add it to the data object | |
| dataObject[sensors[measurementParts[0]]] = measurementParts[1] | |
| # Time for the measurements | |
| dataObject["Time"] = payloadTime | |
| # Add the dataObject to the dataObjects array | |
| dataObjects.append(dataObject) | |
| # Otherwise, it is a log line, parse as such | |
| else: | |
| outLine = f"[{payloadTime}] {messageString}" | |
| logLines.append(outLine) | |
| # For each data element | |
| for dataObject in dataObjects: | |
| dataLine = "" | |
| # For each column in the header | |
| for sensor in sensors.values(): | |
| if sensor in dataObject.keys(): | |
| dataLine = dataLine + str(dataObject[sensor]) + "," | |
| else: | |
| dataLine = dataLine + "," | |
| # Add the data line to the dataLins | |
| dataLine = dataLine[:-1] | |
| dataLines.append(dataLine) | |
| # Write to the output files | |
| if outPath != None: | |
| with open(outPath, "w") as file: | |
| file.write("\n".join(dataLines) + "\n") | |
| if logPath != None: | |
| with open(logPath, "w") as file: | |
| file.write("\n".join(logLines) + "\n") | |
| # Print a message to the user | |
| print("Done!") | |
| # Runtime (if being run directly and not being imported by another file) | |
| if __name__ == "__main__": | |
| main(sys.argv[1:]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment