Created
August 24, 2021 06:47
-
-
Save p3jitnath/59c6ea7e37cb9d0a4933a73dc219cce8 to your computer and use it in GitHub Desktop.
ACAS-Xu ONNX Parser
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Loading ONNX Network and Creating resulting neural network expressions using Big M Encoding | |
import tensorflow as tf | |
import numpy as np | |
import pandas as pd | |
from tensorflow.compat.v1 import ConfigProto | |
from tensorflow.compat.v1 import InteractiveSession | |
config = ConfigProto() | |
config.gpu_options.allow_growth = True | |
session = InteractiveSession(config=config) | |
import onnx | |
import onnx2keras | |
import pickle | |
# Check whether name contains bias | |
def does_name_contain_bias(inpt): | |
if inpt.find("bias") >= 0: | |
return True | |
return False | |
# Check whether it is bias addition layer node or not | |
def check_bias(node): | |
is_op_type_add = False | |
does_input_type_contain_bias = False | |
if (node.op_type == "Add"): | |
is_op_type_add = True | |
for inpt in node.input: | |
if does_name_contain_bias(inpt) == True: | |
does_input_type_contain_bias = True | |
break | |
return is_op_type_add & does_input_type_contain_bias | |
# Check whether it is matmul layer node or not | |
def check_matmul(node): | |
is_op_type_matmul = False | |
if (node.op_type == "MatMul"): | |
is_op_type_matmul = True | |
return is_op_type_matmul | |
# Check whether it is relu layer node or not | |
def check_relu(node): | |
is_op_type_relu = False | |
if (node.op_type == "Relu"): | |
is_op_type_relu = True | |
return is_op_type_relu | |
# Initializing different kind of objects | |
def init_layer_obj(): | |
layer_obj = {} | |
layer_obj["matmul_node"] = None | |
layer_obj["bias_node"] = None | |
layer_obj["relu_node"] = None | |
layer_obj["family"] = None | |
layer_obj["nodes"] = {} | |
return layer_obj | |
def init_node_obj(): | |
node_obj = {} | |
node_obj["inputs"] = [] | |
node_obj["weights"] = [] | |
node_obj["bias"] = None | |
return node_obj | |
def init_input_map(node_list): | |
input_map = {} | |
for node in node_list: | |
input_map[node] = 0 | |
return input_map | |
def get_node_name(prefix, layers, family_idx, idx): | |
if family_idx >= 0: | |
return prefix + str(family_idx) + f"_{idx}" | |
elif family_idx == -1: | |
return 'Y' + f"_{idx}" | |
elif family_idx == -2: | |
return prefix + str(len(layers) - 2) + f"_{idx}" | |
def init_family(): | |
family = {} | |
family['nodes'] = [] | |
family['contains_relu'] = False | |
return family | |
def get_node_expressions(onnx_model): | |
# First Pass | |
## Note: Here 'node' refers to nodes in the ONNX graph (similar to layers) and not the actual node in the neural network. | |
layers = [None] | |
input_hash_table = {} | |
layer_obj = init_layer_obj() | |
prefix = "X_" | |
counter = 1 | |
new_layer_flag = True | |
for idx, node in enumerate(onnx_model.graph.node): | |
for ipt in node.input: | |
input_hash_table[ipt] = counter | |
if check_matmul(node): | |
layer_obj["matmul_node"] = node | |
new_layer_flag = False | |
elif (not new_layer_flag) and check_bias(node): | |
layer_obj["bias_node"] = node | |
layer_obj["family"] = prefix + str(counter) | |
elif (not new_layer_flag) and check_relu(node): | |
layer_obj["relu_node"] = node | |
if ((idx == len(onnx_model.graph.node) - 1) or check_matmul(onnx_model.graph.node[idx + 1])): | |
layers.append(layer_obj) | |
layer_obj = init_layer_obj() | |
counter += 1 | |
new_layer_flag = True | |
layers[-1]['family'] = 'Y' | |
# Second Pass | |
for inpt in onnx_model.graph.initializer: | |
name = inpt.name | |
dims = inpt.dims | |
weights = np.array(inpt.float_data).reshape(dims) | |
layer_idx = input_hash_table[name] | |
layer_obj = layers[layer_idx] | |
try: | |
layer_family_idx = int(layer_obj['family'].split('_')[-1]) | |
except: | |
layer_family_idx = -1 | |
if (does_name_contain_bias(name)): | |
for idx, weight in enumerate(weights): | |
node_name = get_node_name(prefix, layers, layer_family_idx, idx) | |
try: | |
layer_obj["nodes"][node_name]["bias"] = float(weight) | |
except KeyError: | |
node_obj = init_node_obj() | |
layer_obj["nodes"][node_name] = node_obj | |
layer_obj["nodes"][node_name]["bias"] = float(weight) | |
else: | |
for outer_idx, weight_row in enumerate(weights): | |
for inner_idx, weight in enumerate(weight_row): | |
node_name = get_node_name(prefix, layers, layer_family_idx, inner_idx) | |
try: | |
layer_obj["nodes"][node_name]["weights"].append(float(weight)) | |
layer_obj["nodes"][node_name]["inputs"].append(get_node_name(prefix, layers, layer_family_idx-1, outer_idx)) | |
except KeyError: | |
node_obj = init_node_obj() | |
layer_obj["nodes"][node_name] = node_obj | |
layer_obj["nodes"][node_name]["weights"].append(float(weight)) | |
layer_obj["nodes"][node_name]["inputs"].append(get_node_name(prefix, layers, layer_family_idx-1, outer_idx)) | |
# Converting into mathematical equations | |
node_expressions = {} | |
for layer in layers[1:]: | |
for key, node in layer['nodes'].items(): | |
node_expressions[key] = {} | |
node_expressions[key]['exp_terms'] = [(w, x) for w, x in zip(node['weights'], node['inputs'])] + [(1, node['bias'])] | |
node_expressions[key]['family'] = layer['family'] | |
node_expressions[key]['contains_relu'] = True if layer['relu_node'] is not None else False | |
return node_expressions | |
def get_family_list(node_expressions): | |
current_family = "" | |
family_list = [] | |
family = init_family() | |
for key in node_expressions.keys(): | |
node = node_expressions[key] | |
if node['family'] != current_family: | |
family_list.append(family) | |
current_family = node['family'] | |
family = init_family() | |
family['nodes'].append(key) | |
if node['contains_relu']: | |
family['contains_relu'] = True | |
family_list.append(family) | |
family_list = family_list[1:] | |
return family_list | |
def get_feature_ai(node_expressions): | |
""" | |
Feature Description: Accumulator after Input | |
""" | |
value = 0 | |
for key in node_expressions.keys(): | |
node = node_expressions[key] | |
if node['family'] == 'X_1': | |
value += np.sum([str(x[1])[:-2] == 'X_0' for x in node['exp_terms']]) | |
return value | |
def get_feature_oa(node_expressions): | |
""" | |
Feature Description: Output after Accumulator | |
""" | |
value = 0 | |
for key in node_expressions.keys(): | |
node = node_expressions[key] | |
if (node['family'] == 'Y') and (node['contains_relu'] == False): | |
value += 1 | |
return value | |
def get_feature_or(node_expressions): | |
""" | |
Feature Description: Output after ReLU | |
""" | |
value = 0 | |
for key in node_expressions.keys(): | |
node = node_expressions[key] | |
if (node['family'] == 'Y') and (node['contains_relu'] == True): | |
value += 1 | |
return value | |
def get_feature_ra(node_expressions): | |
""" | |
Feature Description: ReLU after Accumulator | |
""" | |
value = 0 | |
for key in node_expressions.keys(): | |
node = node_expressions[key] | |
if node['contains_relu'] == True: | |
value += 1 | |
return value | |
def get_feature_ar(node_expressions): | |
""" | |
Feature Description: Accumulator after ReLU | |
""" | |
family_list = get_family_list(node_expressions) | |
value = 0 | |
for i in range(len(family_list) - 1): | |
if family_list[i]['contains_relu']: | |
value += len(family_list[i]['nodes']) * len(family_list[i+1]['nodes']) | |
return value | |
features = [ | |
('AI', 'Accumulator after Input', get_feature_ai), | |
('OA', 'Output after Accumulator', get_feature_oa), | |
('OR', 'Output after ReLU', get_feature_or), | |
('AR', 'Accumulator after ReLU', get_feature_ar), | |
('RA', 'ReLU after Accumulator', get_feature_ra), | |
] | |
DIR = "../" | |
onnx_dir = DIR + "onnx/" | |
import glob | |
# onnx_filenames = glob.glob(onnx_dir + "*") | |
onnx_filenames = [onnx_dir + x.lower() + ".onnx" for x in pd.read_pickle('../high_timeout_run.pkl')['Network_Id'].values] | |
from tqdm import tqdm | |
rows = [] | |
for onnx_filename in tqdm(onnx_filenames): | |
model_name = onnx_filename.split('/')[-1][:-5] | |
onnx_model = onnx.load(onnx_dir + f'{model_name}.onnx') | |
node_expressions = get_node_expressions(onnx_model) | |
row = {} | |
row['Network_Id'] = model_name | |
for name, description, fn in features: | |
row[name] = fn(node_expressions) | |
rows.append(row) | |
df = pd.DataFrame(rows) | |
df = df.sort_values('Network_Id').reset_index(drop=True) | |
df = df.applymap(lambda s:s.upper() if type(s) == str else s) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment