import math
import numpy as np
from onnx import numpy_helper
from omlt.neuralnet.layer import (
ConvLayer2D,
DenseLayer,
IndexMapper,
InputLayer,
PoolingLayer2D,
)
from omlt.neuralnet.network_definition import NetworkDefinition
_ACTIVATION_OP_TYPES = ["Relu", "Sigmoid", "LogSoftmax", "Tanh", "Softplus"]
_POOLING_OP_TYPES = ["MaxPool"]
[docs]class NetworkParser:
"""
References
----------
* https://github.com/onnx/onnx/blob/master/docs/Operators.md
"""
def __init__(self):
self._reset_state()
def _reset_state(self):
self._graph = None
self._initializers = None
self._constants = None
self._nodes = None
self._nodes_by_output = None
self._inputs = None
self._outputs = None
self._node_stack = None
self._node_map = None
[docs] def parse_network(self, graph, scaling_object, input_bounds):
self._reset_state()
self._graph = graph
# initializers contain constant data
initializers = dict()
for initializer in self._graph.initializer:
initializers[initializer.name] = numpy_helper.to_array(initializer)
self._initializers = initializers
# Build graph
nodes = dict()
nodes_by_output = dict()
inputs = set()
outputs = set()
self._node_map = dict()
network = NetworkDefinition(
scaling_object=scaling_object, scaled_input_bounds=input_bounds
)
network_input = None
for input in self._graph.input:
nodes[input.name] = ("input", input.type, [])
nodes_by_output[input.name] = input.name
inputs.add(input.name)
# onnx inputs are tensors. Flatten tensors to a vector.
dim_value = None
size = []
for dim in input.type.tensor_type.shape.dim:
if dim.dim_value > 0:
if dim_value is None:
dim_value = 1
size.append(dim.dim_value)
dim_value *= dim.dim_value
if dim_value is None:
raise ValueError(
f'All dimensions in graph "{graph.name}" input tensor have 0 value.'
)
assert network_input is None
network_input = InputLayer(size)
self._node_map[input.name] = network_input
network.add_layer(network_input)
if network_input is None:
raise ValueError(f'No valid input layer found in graph "{graph.name}".')
self._nodes = nodes
self._nodes_by_output = nodes_by_output
self._inputs = inputs
self._outputs = outputs
# The node.output field contains the name of this node
# output.
# Here map output name to node name.
for node in self._graph.node:
for output in node.output:
nodes_by_output[output] = node.name
self._constants = dict()
for node in self._graph.node:
# add node not connected to anything
self._nodes[node.name] = ("node", node, [])
# Map inputs by their output name
node_inputs = [
nodes_by_output[input]
for input in node.input
if input not in initializers
]
if node_inputs:
# Now connect inputs to the current node
for input in node_inputs:
self._nodes[input][2].append(node.name)
elif node.op_type == "Constant":
for output in node.output:
value = _parse_constant_value(node)
self._constants[output] = value
else:
raise ValueError(
f'Nodes must have inputs or have op_type "Constant". Node "{node.name}" has no inputs and op_type "{node.op_type}".'
)
# traverse graph
self._node_stack = list(inputs)
self._weights = dict()
self._biases = dict()
self._activations = dict()
while self._node_stack:
node_name = self._node_stack.pop()
type_, node, next_nodes = self._nodes[node_name]
# no need to process inputs or outputs
if type_ == "node":
new_layer, new_layer_inputs = self._visit_node(node, next_nodes)
if new_layer is not None:
network.add_layer(new_layer)
for layer_input in new_layer_inputs:
network.add_edge(layer_input, new_layer)
else:
for next in next_nodes:
self._node_stack.append(next)
return network
def _visit_node(self, node, next_nodes):
if node.op_type == "MatMul":
next_nodes, new_layer, new_layer_inputs = self._consume_dense_nodes(
node, next_nodes
)
elif node.op_type == "Gemm":
next_nodes, new_layer, new_layer_inputs = self._consume_gemm_dense_nodes(
node, next_nodes
)
elif node.op_type == "Conv":
next_nodes, new_layer, new_layer_inputs = self._consume_conv_nodes(
node, next_nodes
)
elif node.op_type == "Reshape":
next_nodes = self._consume_reshape_nodes(node, next_nodes)
new_layer = new_layer_inputs = None
elif node.op_type in _POOLING_OP_TYPES:
next_nodes, new_layer, new_layer_inputs = self._consume_pool_nodes(
node, next_nodes
)
else:
raise Exception(f"Unhandled node type {node.op_type}")
for next in next_nodes:
self._node_stack.append(next)
return new_layer, new_layer_inputs
def _consume_dense_nodes(self, node, next_nodes):
"""Starting from a MatMul node, consume nodes to form a dense Ax + b node."""
if node.op_type != "MatMul":
raise ValueError(
f"{node.name} is a {node.op_type} node, only MatMul nodes can be used as starting points for consumption."
)
if len(node.input) != 2:
raise ValueError(
f"{node.name} input has {len(node.input)} dimensions, only nodes with 2 input dimensions can be used as starting points for consumption."
)
[in_0, in_1] = list(node.input)
input_layer, transformer = self._node_input_and_transformer(in_0)
node_weights = self._initializers[in_1]
if len(next_nodes) != 1:
raise ValueError(
f"Next nodes must have length 1, {next_nodes} has length {len(next_nodes)}"
)
# expect 'Add' node ahead
type_, node, maybe_next_nodes = self._nodes[next_nodes[0]]
if type_ != "node":
raise TypeError(f"Expected a node next, got a {type_} instead.")
if node.op_type != "Add":
raise ValueError(
f"The first node to be consumed, {node.name}, is a {node.op_type} node. Only Add nodes are supported."
)
# extract biases
next_nodes = maybe_next_nodes
[in_0, in_1] = list(node.input)
if in_0 in self._initializers:
node_biases = self._initializers[in_0]
elif in_1 in self._initializers:
node_biases = self._initializers[in_1]
else:
raise ValueError(f"Node inputs were not found in graph initializers.")
if len(node_weights.shape) != 2:
raise ValueError(f"Node weights must be a 2-dimensional matrix.")
if node_weights.shape[1] != node_biases.shape[0]:
raise ValueError(
f"Node weights has {node_weights.shape[1]} columns; node biases has {node_biases.shape[0]} rows. These must be equal."
)
if len(node.output) != 1:
raise ValueError(
f"Node output is {node.output} but should be a single value."
)
input_output_size = _get_input_output_size(input_layer, transformer)
output_size = input_output_size[:-1] + [node_weights.shape[1]]
activation = "linear"
if len(next_nodes) == 1:
# check if Relu
type_, maybe_node, maybe_next_nodes = self._nodes[next_nodes[0]]
if maybe_node.op_type in _ACTIVATION_OP_TYPES:
node = maybe_node
activation = maybe_node.op_type.lower()
next_nodes = maybe_next_nodes
dense_layer = DenseLayer(
input_output_size,
output_size,
node_weights,
node_biases,
activation=activation,
input_index_mapper=None,
)
self._node_map[node.name] = dense_layer
self._node_map[node.output[0]] = dense_layer
return next_nodes, dense_layer, [input_layer]
def _consume_gemm_dense_nodes(self, node, next_nodes):
"""Starting from a Gemm node, consume nodes to form a dense aAB + bC node."""
if node.op_type != "Gemm":
raise ValueError(
f"{node.name} is a {node.op_type} node, only Gemm nodes can be used as starting points for consumption."
)
if len(node.input) != 3:
raise ValueError(
f"{node.name} input has {len(node.input)} dimensions, only nodes with 3 input dimensions can be used as starting points for consumption."
)
attr = _collect_attributes(node)
alpha = attr["alpha"]
beta = attr["beta"]
[in_0, in_1, in_2] = list(node.input)
input_layer, transformer = self._node_input_and_transformer(in_0)
weights = self._initializers[in_1]
# transpose B
if attr["transB"] == 1:
weights = np.transpose(weights)
biases = self._initializers[in_2]
input_output_size = _get_input_output_size(input_layer, transformer)
# output is the same size as input except for the last dimension
output_size = input_output_size[:-1] + [weights.shape[1]]
activation = "linear"
if len(next_nodes) == 1:
# check if Relu
type_, maybe_node, maybe_next_nodes = self._nodes[next_nodes[0]]
if maybe_node.op_type in _ACTIVATION_OP_TYPES:
node = maybe_node
activation = node.op_type.lower()
next_nodes = maybe_next_nodes
weights = weights * alpha
biases = beta * biases
dense_layer = DenseLayer(
input_output_size,
output_size,
weights,
biases,
activation=activation,
input_index_mapper=transformer,
)
self._node_map[node.name] = dense_layer
self._node_map[node.output[0]] = dense_layer
return next_nodes, dense_layer, [input_layer]
def _consume_conv_nodes(self, node, next_nodes):
"""
Starting from a Conv node, consume nodes to form a convolution node with
(optional) activation function.
"""
if node.op_type != "Conv":
raise ValueError(
f"{node.name} is a {node.op_type} node, only Conv nodes can be used as starting points for consumption."
)
if len(node.input) not in [2, 3]:
raise ValueError(
f"{node.name} input has {len(node.input)} dimensions, only nodes with 2 or 3 input dimensions can be used as starting points for consumption."
)
if len(node.input) == 2:
[in_0, in_1] = list(node.input)
in_2 = None
else:
[in_0, in_1, in_2] = list(node.input)
input_layer, transformer = self._node_input_and_transformer(in_0)
input_output_size = _get_input_output_size(input_layer, transformer)
weights = self._initializers[in_1]
[out_channels, in_channels, *kernel_shape] = weights.shape
if in_2 is None:
biases = np.zeros(out_channels)
else:
biases = self._initializers[in_2]
attr = _collect_attributes(node)
strides = attr["strides"]
# check only kernel shape and stride are set
if attr["kernel_shape"] != kernel_shape:
raise ValueError(
f"Kernel shape attribute {attr['kernel_shape']} does not match initialized kernel shape {kernel_shape}."
)
if len(kernel_shape) != len(strides):
raise ValueError(
f"Initialized kernel shape {kernel_shape} has {len(kernel_shape)} dimensions. Strides attribute has {len(strides)} dimensions. These must be equal."
)
if len(input_output_size) != len(kernel_shape) + 1:
raise ValueError(
f"Input/output size ({input_output_size}) must have one more dimension than initialized kernel shape ({kernel_shape})."
)
# Check input, output have correct dimensions
if biases.shape != (out_channels,):
raise ValueError(
f"Biases shape {biases.shape} must match output weights channels {(out_channels,)}."
)
if in_channels != input_output_size[0]:
raise ValueError(
f"Input/output size ({input_output_size}) first dimension must match input weights channels ({in_channels})."
)
# Other attributes are not supported
if "dilations" in attr and attr["dilations"] != [1, 1]:
raise ValueError(
f"{node} has non-identity dilations ({attr['dilations']}). This is not supported."
)
if attr["group"] != 1:
raise ValueError(
f"{node} has multiple groups ({attr['group']}). This is not supported."
)
if "pads" in attr and np.any(attr["pads"]):
raise ValueError(
f"{node} has non-zero pads ({attr['pads']}). This is not supported."
)
# generate new nodes for the node output
padding = 0
output_size = [out_channels]
for w, k, s in zip(input_output_size[1:], kernel_shape, strides):
new_w = int((w - k + 2 * padding) / s) + 1
output_size.append(new_w)
activation = "linear"
if len(next_nodes) == 1:
# check if Relu
type_, maybe_node, maybe_next_nodes = self._nodes[next_nodes[0]]
if maybe_node.op_type in _ACTIVATION_OP_TYPES:
node = maybe_node
activation = maybe_node.op_type.lower()
next_nodes = maybe_next_nodes
# convolute image one channel at the time
# expect 2d image with channels
if len(input_output_size) != 3:
raise ValueError(
f"Expected a 2D image with channels, got {input_output_size}."
)
conv_layer = ConvLayer2D(
input_output_size,
output_size,
strides,
weights,
activation=activation,
input_index_mapper=transformer,
)
self._node_map[node.name] = conv_layer
self._node_map[node.output[0]] = conv_layer
return next_nodes, conv_layer, [input_layer]
def _consume_reshape_nodes(self, node, next_nodes):
"""Parse a Reshape node."""
if node.op_type != "Reshape":
raise ValueError(
f"{node.name} is a {node.op_type} node, only Reshape nodes can be used as starting points for consumption."
)
if len(node.input) != 2:
raise ValueError(
f"{node.name} input has {len(node.input)} dimensions, only nodes with 2 input dimensions can be used as starting points for consumption."
)
[in_0, in_1] = list(node.input)
input_layer = self._node_map[in_0]
new_shape = self._constants[in_1]
output_size = np.empty(input_layer.output_size).reshape(new_shape).shape
transformer = IndexMapper(input_layer.output_size, list(output_size))
self._node_map[node.output[0]] = (transformer, input_layer)
return next_nodes
def _consume_pool_nodes(self, node, next_nodes):
"""
Starting from a MaxPool node, consume nodes to form a pooling node with
(optional) activation function.
"""
if node.op_type not in _POOLING_OP_TYPES:
raise ValueError(
f"{node.name} is a {node.op_type} node, only MaxPool nodes can be used as starting points for consumption."
)
pool_func_name = "max"
# ONNX network should not contain indices output from MaxPool - not supported by OMLT
if len(node.output) != 1:
raise ValueError(
f"The ONNX contains indices output from MaxPool. This is not supported by OMLT."
)
if len(node.input) != 1:
raise ValueError(
f"{node.name} input has {len(node.input)} dimensions, only nodes with 1 input dimension can be used as starting points for consumption."
)
input_layer, transformer = self._node_input_and_transformer(node.input[0])
input_output_size = _get_input_output_size(input_layer, transformer)
# currently only support 2D image with channels.
if len(input_output_size) == 4:
# this means there is an extra dimension for number of batches
# batches not supported, so only accept if they're not there or there is only 1 batch
if input_output_size[0] != 1:
raise ValueError(
f"{node.name} has {input_output_size[0]} batches, only a single batch is supported."
)
input_output_size = input_output_size[1:]
in_channels = input_output_size[0]
attr = _collect_attributes(node)
kernel_depth = attr["kernel_shape"][0]
kernel_shape = attr["kernel_shape"][1:]
strides = attr["strides"] if "strides" in attr else [1] * len(kernel_shape)
# check only kernel shape, stride, storage order are set
# everything else is not supported
if "dilations" in attr and attr["dilations"] != [1, 1]:
raise ValueError(
f"{node.name} has non-identity dilations ({attr['dilations']}). This is not supported."
)
if "pads" in attr and np.any(attr["pads"]):
raise ValueError(
f"{node.name} has non-zero pads ({attr['pads']}). This is not supported."
)
if ("auto_pad" in attr) and (attr["auto_pad"] != "NOTSET"):
raise ValueError(
f"{node.name} has autopad set ({attr['auto_pad']}). This is not supported."
)
if len(kernel_shape) != len(strides):
raise ValueError(
f"Kernel shape {kernel_shape} has {len(kernel_shape)} dimensions. Strides attribute has {len(strides)} dimensions. These must be equal."
)
if len(input_output_size) != len(kernel_shape) + 1:
raise ValueError(
f"Input/output size ({input_output_size}) must have one more dimension than kernel shape ({kernel_shape})."
)
output_shape_wrapper = math.floor
if "ceil_mode" in attr and attr["ceil_mode"] == 1:
output_shape_wrapper = math.ceil
output_size = [in_channels]
for i in range(1, len(input_output_size)):
output_size.append(
output_shape_wrapper(
(input_output_size[i] - kernel_shape[i - 1]) / strides[i - 1] + 1
)
)
activation = "linear"
if len(next_nodes) == 1:
# check if Relu
type_, maybe_node, maybe_next_nodes = self._nodes[next_nodes[0]]
if maybe_node.op_type in _ACTIVATION_OP_TYPES:
node = maybe_node
activation = maybe_node.op_type.lower()
next_nodes = maybe_next_nodes
pooling_layer = PoolingLayer2D(
input_output_size,
output_size,
strides,
pool_func_name,
tuple(kernel_shape),
kernel_depth,
activation=activation,
input_index_mapper=transformer,
)
self._node_map[node.name] = pooling_layer
self._node_map[node.output[0]] = pooling_layer
return next_nodes, pooling_layer, [input_layer]
def _node_input_and_transformer(self, node_name):
maybe_layer = self._node_map[node_name]
if isinstance(maybe_layer, tuple):
transformer, input_layer = maybe_layer
return input_layer, transformer
else:
return maybe_layer, None
def _collect_attributes(node):
r = dict()
for attr in node.attribute:
if attr.type == 1: # FLOAT
r[attr.name] = attr.f
elif attr.type == 2: # INT
r[attr.name] = int(attr.i)
elif attr.type == 4: # TENSOR
r[attr.name] = numpy_helper.to_array(attr.t)
pass
elif attr.type == 7: # INTS
r[attr.name] = list(attr.ints)
else:
raise RuntimeError(f"unhandled attribute type {attr.type}")
return r
def _parse_constant_value(node):
attr = _collect_attributes(node)
value = attr["value"]
return value
def _get_input_output_size(input_layer, transformer):
if transformer is not None:
return transformer.output_size
return input_layer.output_size