Last active
September 15, 2020 14:19
-
-
Save prerakmody/110f80c3e1d99ac100c481d6428e3c75 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import sys | |
import copy | |
import struct | |
import pprint | |
import traceback | |
def pcd_read(filename, verbose=0, verbose_steps=0): | |
# Refer this to unpack - https://docs.python.org/3/library/struct.html#format-characters | |
with open(filename, 'rb') as fp: | |
data_ip = fp.read() | |
data_op = {} | |
data_header = '' | |
data_payload = '' | |
matchObj = re.finditer(b'[\n]DATA\s(\S*)[\n]', data_ip, flags = 0) | |
for group in matchObj: | |
## Step1 : Get the header | |
data_header = data_ip[:group.end(0)] | |
data_header = data_header.decode('utf-8').split('\n') | |
field_names = data_header[2].split(' ')[1:] | |
field_size = data_header[3].split(' ')[1:] | |
field_type = data_header[4].split(' ')[1:] | |
data_count = int(data_header[9].split(' ')[1]) | |
viewpoint = data_header[8] | |
data_type = data_header[10].split(' ')[1].lower() | |
if verbose: | |
print (' - field_names : ', field_names) | |
print (' - field_size : ', field_size) | |
print (' - field_type : ', field_type) | |
print (' - data_count : ', data_count, ' rows ') | |
print (' - viewpoint : ', viewpoint) | |
print (' - data_type : ', data_type) | |
## Step1.1 : Acumulate data on fields --> fields_dict | |
fields_dict = {} | |
for i, field_name in enumerate(field_names): | |
if int(field_size[i]) == 8: | |
fields_dict[i] = [str(field_name), int(field_size[i]), 'd'] | |
elif int(field_size[i]) == 2: | |
fields_dict[i] = [str(field_name), int(field_size[i]), 'H'] | |
else: | |
fields_dict[i] = [str(field_name), int(field_size[i]), str(field_type[i]).lower()] | |
if verbose : | |
print ('\n ----- Fields -----') | |
pprint.pprint(fields_dict, indent=4) | |
## Step2 - Iterate through the payload | |
data_payload = data_ip[group.end(0):] | |
data_count_tmp = 0 | |
i = 0 | |
for id_ in fields_dict: | |
data_op[fields_dict[id_][0]] = [] | |
if verbose : | |
print ('\n ----- Output Skeleton -----') | |
pprint.pprint(data_op) | |
if data_type == 'ascii': | |
data_payload = data_payload.decode('utf-8') | |
rows = data_payload.split('\n') | |
while (i < len(rows)): | |
row_vals = rows[i].split(' ') | |
for id_ in fields_dict: | |
field_name = str(fields_dict[id_][0]) | |
field_size = int(fields_dict[id_][1]) | |
field_type = str(fields_dict[id_][2]) | |
if field_type in ['f', 'd']: | |
tmp_data = float(row_vals[id_]) | |
elif field_type == 'H': | |
tmp_data = int(row_vals[id_]) | |
data_op[field_name].append(tmp_data) | |
i += 1 | |
data_count_tmp += 1 | |
if data_count_tmp == data_count: | |
break | |
elif data_type == 'binary': | |
while(i < len(data_payload)): | |
try: | |
for id_ in fields_dict: | |
field_name = str(fields_dict[id_][0]) | |
field_size = int(fields_dict[id_][1]) | |
field_type = str(fields_dict[id_][2]) | |
tmp_data = data_payload[i:i+field_size] | |
if len(tmp_data) == field_size: | |
tmp_data = struct.unpack(field_type, tmp_data)[0] | |
elif len(tmp_data) == 0: | |
break | |
if verbose_steps : | |
print ('data_count_tmp : ',data_count_tmp+1, ' || i : ', i, ' || field : ', field_name,' || data : ', tmp_data) | |
i += field_size | |
data_op[field_name].append(tmp_data) | |
data_count_tmp += 1 | |
if data_count_tmp == data_count: | |
break | |
except Exception as e: | |
print (' - Error : ', e) | |
traceback.print_exc() | |
sys.exit(1) | |
if verbose: | |
for id_ in fields_dict: | |
field_name = str(fields_dict[id_][0]) | |
print ('\n ----- Output Count -----') | |
print (' - Field : ', field_name, ' || Data Count : ', len(data_op[field_name])) | |
break | |
return data_op, fields_dict | |
def pcd_write_bin(data_op, fields_dict, data_op_file, fields_required=[],verbose=0): | |
data = [] | |
field_names = [] | |
if len(fields_required): | |
for id_ in fields_dict: | |
field_name = fields_dict[id_][0] | |
if field_name not in fields_required: | |
data_op.pop(field_name) | |
else: | |
field_names.append(field_name) | |
else: | |
field_names = [fields_dict[id_][0] for id_ in fields_dict] | |
## Step2 - binary data | |
print ('\n ----- Writing .bin file -----') | |
print (' - Writing these field_names : ', field_names) | |
for i in range(len(data_op[field_names[0]])): # scroll down the rows | |
for field_name in field_names: | |
data.append(data_op[field_name][i]) | |
payload = bytes() | |
payload = payload.join((struct.pack('f', val) for val in data)) | |
print (' - Writing to ', data_op_file) | |
with open(data_op_file, 'wb') as fp: | |
fp.write(payload) | |
if __name__ == "__main__": | |
data_pcds = ['data/1523643737.998116000.pcd'] | |
for data_pcd in data_pcds: | |
data_op, fields_dict = pcd_read(data_pcd, verbose=1) | |
data_op_file = data_pcd.split('.pcd')[0] + '_modified.bin' | |
pcd_write_bin(data_op, fields_dict, data_op_file, fields_required=['x', 'y', 'z', 'intensity'],verbose=1) | |
if 0: | |
## Cross Check Data : | |
rows_count = 4 | |
print ('\n ----- Original Data ----- : ', len(data_op['x'])) | |
for i in range(rows_count): | |
print (data_op['x'][i], data_op['y'][i], data_op['z'][i], data_op['intensity'][i]) | |
data_bin = np.fromfile(data_op_file, dtype=np.float32) | |
print ('\n ----- .bin data ----- : ', len(data_bin)) | |
print (' - ', data_bin[:rows_count * rows_count]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
from struct import * | |
def read_pcd(filename): | |
with open(filename, 'rb') as fp: | |
data = fp.read() | |
data_header = '' | |
data_payload = '' | |
## Step1 : Get the header | |
matchObj = re.finditer(b'[\n]DATA\s(\S*)[\n]', data, flags = 0) | |
for group in matchObj: | |
data_header = data[:group.end(0)] | |
data_header = data_header.decode('utf-8').split('\n') | |
print (' - Header : \n', data_header) ##- Header : ['# .PCD v0.7 - Point Cloud Data file format', 'VERSION 0.7', 'FIELDS x y z rgba', 'SIZE 4 4 4 4', 'TYPE F F F U', 'COUNT 1 1 1 1', 'WIDTH 0', 'HEIGHT 0', 'VIEWPOINT 0 0 0 1 0 0 0', 'POINTS 212001', 'DATA binary', ''] | |
# Step2 - Iterate through the payload | |
data_payload = data[group.end(0):] | |
prev_i = 0 | |
count = 0 | |
X,Y,Z,R,G,B, A = [], [], [], [], [], [], [] | |
for i in range(4,200,4): | |
if count == 3: | |
count = -1 | |
R.append(data_payload[i]) | |
G.append(data_payload[i+1]) | |
B.append(data_payload[i+2]) | |
A.append(data_payload[i+3]) | |
else: | |
if count == 0 : X.append(unpack('f', data_payload[prev_i:i])) | |
if count == 1 : Y.append(unpack('f', data_payload[prev_i:i])) | |
if count == 2 : Z.append(unpack('f', data_payload[prev_i:i])) | |
prev_i = i | |
count += 1 | |
return data_header, [X,Y,Z,R,G,B,A] | |
if __name__ == "__main__": | |
filename = '../boxes2.pcd' | |
data_header, data_payload = read_pcd(filename) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Topic : Visualize the KiTTI Dataset (LiDAR) | |
References | |
- LiDAR Dataset : www.cvlibs.net/datasets/kitti | |
- Data Download : www.cvlibs.net/datasets/kitti/raw_data.php | |
- Blog : | |
- Notes | |
- Tested on Python3.5 | |
""" | |
import os | |
import h5py # pip3 install h5py | |
import pykitti # pip3 install pykitti | |
import vispy # pip3 install vispy | |
import vispy.scene | |
import numpy as np | |
## 1. Convert multiple .bin files to single .h5 format | |
def data_write_h5(url_hdf5, dataset, dataset_limit=1000): | |
with h5py.File(url_hdf5, "w") as hf: | |
for i, each in enumerate(dataset.velo): | |
if i % 10 == 0: | |
print (' - Velo File : ', i+1) | |
points = each[:,[0,1,2]] | |
colors = [] | |
for pt in points: | |
colors.append((1, 1, 1, 1)) | |
colors = np.array(colors) | |
hf.create_dataset('points_{0}'.format('%.3d' % (i)), data=points) | |
hf.create_dataset('colors_{0}'.format('%.3d' % (i)), data=colors) | |
if i > dataset_limit: | |
break | |
## 2. Plot data from a single .h5 file | |
def data_plot_h5(url_hdf5, point_size=3): | |
## Step1 - Read Data | |
points = [] | |
colors = [] | |
with h5py.File(url_hdf5, "r") as hf: | |
for name in hf: | |
if 'points' in name: | |
points.append(np.array(hf.get(name))) | |
if 'colors' in name: | |
colors.append(np.array(hf.get(name))) | |
## Step2 - Setup the canvas | |
canvas = vispy.scene.SceneCanvas(keys='interactive', show=True) | |
view = canvas.central_widget.add_view() | |
view.bgcolor = '#111111' | |
view.camera = ['perspective', 'panzoom', 'fly', 'arcball', 'base', 'turntable', None][2] | |
if 1: | |
view.camera.fov = 60 | |
view.camera.scale_factor = 0.7 | |
view.camera.keymap['Right'] = (1,5) | |
view.camera.keymap['Left'] = (-1,5) | |
view.camera.keymap['Up'] = (1,4) | |
view.camera.keymap['Down'] = (-1,4) | |
axis = vispy.scene.visuals.XYZAxis(parent=view.scene) | |
scatter = vispy.scene.visuals.Markers(parent=view.scene) | |
canvas.show() | |
## Step3 - Update function | |
def update(data): | |
matrix, colors = data | |
scatter.set_data(matrix | |
, edge_color=None, face_color=colors | |
, size=point_size) | |
## Step4 - Update canvas with first lidar frame | |
update([points[0], colors[t]]) | |
# Step5 - Handling key events | |
@canvas.events.key_press.connect | |
def keypress(e): | |
global t | |
print(' - File Index : ', t) | |
if e._key.name == '=': | |
t = min(t + 1, len(points) - 1) | |
update([points[t], colors[t]]) | |
elif e._key.name == '-': | |
t = max(t - 1, 0) | |
update([points[t], colors[t]]) | |
else: | |
pass | |
# print (' - key : ', e._key.name) | |
if __name__ == "__main__": | |
## Define filenames | |
url_hdf5 = 'lidar_kitti_plot.h5' | |
data_write = 0 | |
## First read the .bin files from a KITTI drive and convert to .h5 (data_write=1) | |
if data_write : | |
DATA_DIR = os.path.abspath(os.path.join(os.getcwd(), '../data')) | |
print (' - Data Dir : ', DATA_DIR) | |
basedir = DATA_DIR | |
date = '2011_09_26' | |
drive = '0001' | |
dataset = pykitti.raw(basedir, date, drive)#, frames=range(0, 50, 5)) | |
print ('\n - Read KITTI Dataset') | |
data_write_h5(url_hdf5, dataset, dataset_limit=40) | |
## Then plot that 3D LiDAR data (data_write=0) | |
## You can use wasd and arrow keys to move around. (f,c)-> increase or decrease your z | |
## Note : Zooming does not currently work, so scrolling wont have an effect | |
else: | |
t = 0 | |
data_plot_h5(url_hdf5) | |
vispy.app.run() | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Topic : Visualize the UMichigan Ford Dataset | |
References | |
- Umich Dataset : http://robots.engin.umich.edu/SoftwareData/Ford | |
- Data Download : http://robots.engin.umich.edu/uploads/SoftwareData/Ford/dataset-1-subset.tgz | |
- Blog : | |
- Notes | |
- Tested on Python3.5 | |
""" | |
## Import Libraries | |
import os | |
import h5py # pip3 install h5py | |
import vispy # pip3 install vispy | |
import vispy.scene | |
import numpy as np | |
import scipy.io as sio | |
## 1. Read a matlab file | |
def get_data(filename_mat, verbose = 0): | |
mat_contents = sio.loadmat(filename_mat) | |
keys = mat_contents.keys() | |
data = mat_contents['SCAN'] | |
a = data[0][0] | |
if verbose: | |
for each in a: | |
print (each.shape) | |
data = a[0].T | |
return data | |
## 2. Read a directory containing matlab files and store as hfd5 | |
def write_datum_h5(dirname_mat, filename_h5, file_count_max = 100): | |
datum = [] | |
file_count = 0 | |
if os.path.exists(dirname_mat): | |
with h5py.File(filename_h5, 'w') as hf: | |
for file_id, file in enumerate(sorted(os.listdir(dirname_mat))): | |
file_tmp = os.path.join(dirname_mat, file) | |
print (' - File : ', file_tmp) | |
data = get_data(file_tmp) | |
hf.create_dataset('data_%.4d' % (file_id) , data=data) | |
file_count += 1 | |
if file_count > file_count_max: | |
return 1 | |
return 1 | |
else: | |
return 0 | |
## 3. Read a h5d5 file | |
def read_datum_h5(filename_h5): | |
datum = [] | |
with h5py.File(filename_h5, 'r') as hf: | |
for node in hf: | |
print (' ->', node) | |
datum.append(np.array(hf.get(node))) | |
return datum | |
## Plot U.Mich LiDAR data saved as .h5 file | |
def plot_data(datum, point_size=3): | |
canvas = vispy.scene.SceneCanvas(keys='interactive', show=True) | |
view = canvas.central_widget.add_view() | |
view.bgcolor = '#ffffff' | |
view.bgcolor = '#111111' | |
view.camera = ['perspective', 'panzoom', 'fly', 'arcball', 'base', 'turntable', None][2] | |
if 1: | |
view.camera.fov = 60 | |
view.camera.scale_factor = 0.7 | |
view.camera.keymap['Right'] = (1,5) | |
view.camera.keymap['Left'] = (-1,5) | |
view.camera.keymap['Up'] = (1,4) | |
view.camera.keymap['Down'] = (-1,4) | |
axis = vispy.scene.visuals.XYZAxis(parent=view.scene) | |
scatter = vispy.scene.visuals.Markers(parent=view.scene) | |
scatter.set_data(datum[0], size=point_size) | |
def update(data): | |
scatter.set_data(data, size=point_size) | |
@canvas.events.key_press.connect | |
def keypress(e): | |
print (' - Event : ', e._key.name) | |
global t | |
print(' - File Index : ', t) | |
if e._key.name == '=': | |
t = min(t + 1, len(datum) - 1) | |
update(datum[t]) | |
if e._key.name == '-': | |
t = max(t - 1, 0) | |
update(datum[t]) | |
canvas.show() | |
if __name__ == '__main__': | |
## Define the directory containing the .mat files | |
dirname_umich_scans = './plot_vispy' #contains the .mat files | |
filename_h5 = 'data_umich.h5'; file_count_max = 1000 | |
## First convert all .mat files into a .h5 file (data_write=1) | |
data_write = 1 | |
if data_write: | |
write_datum_h5(dirname_umich_scans, filename_h5, file_count_max) | |
## Then plot that 3D LiDAR data (data_write=0) | |
## You can use wasd and arrow keys to move around. (f,c)-> increase or decrease your z | |
## Note : Zooming does not currently work, so scrolling wont have an effect | |
else: | |
t = 0 | |
datum_3d = read_datum_h5(filename_h5) | |
plot_data(datum_3d, point_size=2) | |
vispy.app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment