Skip to content

Commit

Permalink
[ETHOSN] Fix tests pylint errors (#12649)
Browse files Browse the repository at this point in the history
This pr fixes pylint errors in tests/python/contrib/test_ethosn as reported in issue #11414.
  • Loading branch information
NicolaLancellotti committed Sep 1, 2022
1 parent c6516a5 commit aa6c712
Show file tree
Hide file tree
Showing 20 changed files with 158 additions and 82 deletions.
1 change: 1 addition & 0 deletions tests/lint/pylint.sh
Expand Up @@ -21,6 +21,7 @@ python3 -m pylint python/tvm --rcfile="$(dirname "$0")"/pylintrc
python3 -m pylint vta/python/vta --rcfile="$(dirname "$0")"/pylintrc
python3 -m pylint tests/python/unittest/test_tvmscript_type.py --rcfile="$(dirname "$0")"/pylintrc
python3 -m pylint tests/python/contrib/test_cmsisnn --rcfile="$(dirname "$0")"/pylintrc
python3 -m pylint tests/python/contrib/test_ethosn --rcfile="$(dirname "$0")"/pylintrc
python3 -m pylint tests/python/relay/aot/*.py --rcfile="$(dirname "$0")"/pylintrc
python3 -m pylint tests/python/ci --rcfile="$(dirname "$0")"/pylintrc
python3 -m pylint tests/python/integration/ --rcfile="$(dirname "$0")"/pylintrc
Expand Down
50 changes: 28 additions & 22 deletions tests/python/contrib/test_ethosn/infrastructure.py
Expand Up @@ -18,17 +18,17 @@
"""Arm(R) Ethos(TM)-N test functions"""

from __future__ import absolute_import, print_function
import tvm
from tvm import relay
from tvm.contrib import utils, graph_executor, download
from hashlib import md5
from itertools import zip_longest, combinations
import os
import numpy as np
from PIL import Image
import os

from . import _infrastructure
import tvm
from tvm import relay
from tvm.contrib import utils, graph_executor, download
from tvm.relay.op.contrib import partition_for_ethosn
from . import _infrastructure


def get_real_image(im_height, im_width):
Expand Down Expand Up @@ -82,43 +82,47 @@ def make_module(func, params):


def make_ethosn_composite(ethosn_expr, name):
vars = relay.analysis.free_vars(ethosn_expr)
inner_vars = [relay.Var(v.name_hint, v.type_annotation) for v in vars]
variables = relay.analysis.free_vars(ethosn_expr)
inner_vars = [relay.Var(v.name_hint, v.type_annotation) for v in variables]
func = relay.Function(inner_vars, ethosn_expr)
func = func.with_attr("Composite", name)
call = relay.Call(func, vars)
call = relay.Call(func, variables)
return call


def make_ethosn_partition(ethosn_expr):
"""Make an Ethos(TM)-N partition."""

# Create an Ethos-N global function
mod = tvm.IRModule({})
vars = relay.analysis.free_vars(ethosn_expr)
variables = relay.analysis.free_vars(ethosn_expr)
# NB: it is illegal to reuse variables inside and outside a scope in Relay
# if you want to duplicate types and names you must re-allocate them.
fresh_vars = [relay.Var(v.name_hint, v.type_annotation) for v in vars]
fresh_vars = [relay.Var(v.name_hint, v.type_annotation) for v in variables]
binds = {}
for var, fresh_var in zip(vars, fresh_vars):
for var, fresh_var in zip(variables, fresh_vars):
binds[var] = fresh_var
ethosn_expr_fresh = relay.bind(ethosn_expr, binds)
func = relay.Function(fresh_vars, ethosn_expr_fresh)
func = func.with_attr("Primitive", tvm.tir.IntImm("int32", 1))
func = func.with_attr("Inline", tvm.tir.IntImm("int32", 1))
func = func.with_attr("Compiler", "ethos-n")
func = func.with_attr("global_symbol", "ethos-n_0")
g1 = relay.GlobalVar("ethos-n_0")
mod[g1] = func
global_var = relay.GlobalVar("ethos-n_0")
mod[global_var] = func
mod = relay.transform.InferType()(mod)

# These are the vars to call the Ethos-N partition with
more_vars = relay.analysis.free_vars(ethosn_expr)
# Call the Ethos-N partition in main
call_fn1 = g1(*more_vars)
call_fn1 = global_var(*more_vars)
mod["main"] = relay.Function(more_vars, call_fn1)
return relay.transform.InferType()(mod)


def get_host_op_count(mod):
"""Return the number of host operators."""

class Counter(tvm.relay.ExprVisitor):
def __init__(self):
super().__init__()
Expand Down Expand Up @@ -219,9 +223,7 @@ def run(lib, inputs, outputs, npu=True):
return out


def build_and_run(
mod, inputs, outputs, params, device=tvm.cpu(), npu=True, expected_host_ops=0, npu_partitions=1
):
def build_and_run(mod, inputs, outputs, params, npu=True, expected_host_ops=0, npu_partitions=1):
lib = build(mod, params, npu, expected_host_ops, npu_partitions)
return run(lib, inputs, outputs, npu)

Expand Down Expand Up @@ -254,6 +256,8 @@ def inference_result(outputs):


def test_error(mod, params, err_msg):
"""Test an operator error message."""

caught = None
with tvm.transform.PassContext(
opt_level=3, config={"relay.ext.ethos-n.options": {"variant": get_ethosn_variant()}}
Expand All @@ -262,8 +266,8 @@ def test_error(mod, params, err_msg):
try:
mod = relay.transform.InferType()(mod)
relay.build(mod, params=params)
except tvm.error.TVMError as e:
caught = e.args[0]
except tvm.error.TVMError as error:
caught = error.args[0]
finally:
relay.backend.te_compiler.get().clear()

Expand All @@ -275,8 +279,8 @@ def get_conv2d(var, shape, dtype):
"""Standard convolution to test activation functions"""

weight_shape = (1, 1, shape[3], 1)
w = tvm.nd.array(np.ones(weight_shape, dtype))
weights = relay.const(w, dtype)
weights_array = tvm.nd.array(np.ones(weight_shape, dtype))
weights = relay.const(weights_array, dtype)
conv = relay.qnn.op.conv2d(
var,
weights,
Expand All @@ -300,13 +304,15 @@ def get_conv2d(var, shape, dtype):
relay.const(0, "int32"), # output zero point
out_dtype=dtype,
)
params = {"w": w, "b": b}
params = {"w": weights_array, "b": b}
return req, params


def get_conv2d_qnn_params(
dtype, input_zp, input_sc, kernel_zp, kernel_sc, kernel_h, kernel_w, channels
):
"""Return Conv2D QNN params."""

kernel_sc = (
kernel_sc.numpy() if isinstance(kernel_sc, tvm.runtime.ndarray.NDArray) else [kernel_sc]
)
Expand Down
10 changes: 8 additions & 2 deletions tests/python/contrib/test_ethosn/test_concatenate.py
Expand Up @@ -57,6 +57,8 @@ def _get_model(shapes, dtype, axis):
@requires_ethosn
@pytest.mark.parametrize("dtype", ["uint8", "int8"])
def test_concatenate(dtype):
"""Compare Concatenate output with TVM."""

trials = [
([(1, 4), (1, 6)], 1),
([(1, 16, 4), (1, 16, 4)], 1),
Expand All @@ -78,19 +80,23 @@ def test_concatenate(dtype):

@requires_ethosn
def test_concatenate_failure():
"""Check Concatenate error messages."""

trials = [
([(1, 4, 4, 4, 4), (1, 4, 4, 4, 4)], "uint8", 1, "dimensions=5, dimensions must be <= 4;"),
(
[(1, 4, 4, 4), (1, 4, 4, 4)],
"uint8",
3,
"Concatenation along the channels dimension (axis 3) requires input tensors with a multiple of 16 channels;",
"Concatenation along the channels dimension (axis 3) "
"requires input tensors with a multiple of 16 channels;",
),
(
[(1, 4, 4, 4), (1, 4, 4, 4)],
"int16",
2,
"dtype='int16', dtype must be either uint8, int8 or int32; dtype='int16', dtype must be either uint8, int8 or int32;",
"dtype='int16', dtype must be either uint8, int8 or int32; dtype='int16', "
"dtype must be either uint8, int8 or int32;",
),
(
[(2, 4, 4, 4), (2, 4, 4, 4)],
Expand Down
10 changes: 7 additions & 3 deletions tests/python/contrib/test_ethosn/test_constant_duplication.py
Expand Up @@ -36,8 +36,10 @@ def _get_model():
add_const = relay.const(add_const_value, "uint8")
a = relay.add(a, add_const)
weight_shape = (kernel_h, kernel_w, shape[3], out_channels)
w = tvm.nd.array(np.random.randint(low=0, high=255, size=weight_shape, dtype="uint8"))
weights = relay.const(w, "uint8")
weights_array = tvm.nd.array(
np.random.randint(low=0, high=255, size=weight_shape, dtype="uint8")
)
weights = relay.const(weights_array, "uint8")
conv = relay.qnn.op.conv2d(
a,
weights,
Expand Down Expand Up @@ -66,12 +68,14 @@ def _get_model():
relay.const(0, "int32"), # output zero point
out_dtype="uint8",
)
params = {"w": w, "b": b}
params = {"w": weights_array, "b": b}
return req, params


@requires_ethosn
def test_constant_duplication():
"""Test that constants are not duplicated."""

np.random.seed(0)
model, params = _get_model()
mod = tei.make_module(model, params)
Expand Down
18 changes: 11 additions & 7 deletions tests/python/contrib/test_ethosn/test_conv2d.py
Expand Up @@ -17,9 +17,9 @@

"""Arm(R) Ethos(TM)-N integration conv2d tests"""

import math
import numpy as np
import pytest
import math
import tvm
from tvm import relay
from tvm.testing import requires_ethosn
Expand Down Expand Up @@ -61,7 +61,7 @@ def _get_model(
):
"""Return a model and any parameters it may have"""
a = relay.var("a", shape=shape, dtype=dtype)
if pad == "op" or pad == "both":
if pad in ("op", "both"):
p = _get_same_padding((shape[1], shape[2]), (kernel_h, kernel_w), dilation, strides)
a = relay.nn.pad(
a,
Expand All @@ -76,12 +76,12 @@ def _get_model(
weight_shape = (kernel_h, kernel_w, shape[3] // groups, out_channels)
else:
weight_shape = (kernel_h, kernel_w, out_channels, 1)
w = tvm.nd.array(
weights_array = tvm.nd.array(
np.random.randint(
np.iinfo(dtype).min, high=np.iinfo(dtype).max + 1, size=weight_shape, dtype=dtype
)
)
weights = relay.const(w, dtype)
weights = relay.const(weights_array, dtype)
conv = relay.qnn.op.conv2d(
a,
weights,
Expand All @@ -96,7 +96,7 @@ def _get_model(
strides=strides,
groups=groups,
channels=out_channels,
padding=p if pad == "attr" or pad == "both" else (0, 0, 0, 0),
padding=p if pad in ("attr", "both") else (0, 0, 0, 0),
out_dtype="int32",
)
b = tvm.nd.array(
Expand All @@ -118,14 +118,16 @@ def _get_model(
relay.const(output_zp, "int32"), # output zero point
out_dtype=dtype,
)
params = {"w": w, "b": b}
params = {"w": weights_array, "b": b}
return req, params


@requires_ethosn
@pytest.mark.parametrize("depthwise", [False, True])
@pytest.mark.parametrize("dtype", ["uint8", "int8"])
def test_conv2d(dtype, depthwise):
"""Compare Conv2D output with TVM."""

trials = [
[(1, 17, 20, 26), 4, 3, 1, "attr", (2, 2), (1, 1), False],
[(1, 30, 27, 30), 5, 5, 3, "none", (1, 1), (1, 1), False],
Expand Down Expand Up @@ -208,6 +210,8 @@ def test_conv2d(dtype, depthwise):

@requires_ethosn
def test_conv2d_failure():
"""Check Conv2D error messages."""

trials = [
(
(1, 4, 4, 4),
Expand Down Expand Up @@ -326,7 +330,7 @@ def test_conv2d_failure():
weight_format,
err_msg,
) in trials:
model, params = _get_model(
model, _ = _get_model(
shape,
kernel_h,
kernel_w,
Expand Down
4 changes: 4 additions & 0 deletions tests/python/contrib/test_ethosn/test_depth_to_space.py
Expand Up @@ -34,6 +34,8 @@ def _get_model(shape, block, dtype, layout):
@requires_ethosn
@pytest.mark.parametrize("dtype", ["uint8", "int8"])
def test_depth_to_space(dtype):
"""Compare Depth To Space output with TVM."""

trials = [
(1, 16, 16, 16),
(1, 64, 32, 16),
Expand All @@ -59,6 +61,8 @@ def test_depth_to_space(dtype):

@requires_ethosn
def test_depth_to_space_failure():
"""Check Depth To Space error messages."""

trials = [
((2, 16, 16, 16), 2, "uint8", "NHWC", "batch size=2, batch size must = 1"),
(
Expand Down
25 changes: 12 additions & 13 deletions tests/python/contrib/test_ethosn/test_fullyconnected.py
Expand Up @@ -30,9 +30,9 @@ def _get_model(
):
"""Return a model an any parameters it may have"""
a = relay.var("a", shape=shape, dtype=dtype)
w = tvm.nd.array(np.ones(weight_shape, dtype))
weights = relay.const(w, dtype)
fc = relay.qnn.op.dense(
weights_array = tvm.nd.array(np.ones(weight_shape, dtype))
weights = relay.const(weights_array, dtype)
dense = relay.qnn.op.dense(
a,
weights,
input_zero_point=relay.const(input_zp, "int32"),
Expand All @@ -44,7 +44,7 @@ def _get_model(
)
b = tvm.nd.array(np.random.randint(0, high=255, size=(weight_shape[0],), dtype="int32"))
biasc = relay.const(b, "int32")
bias = relay.nn.bias_add(fc, biasc)
bias = relay.nn.bias_add(dense, biasc)
req = relay.qnn.op.requantize(
bias,
relay.const(input_sc * kernel_sc, "float32"), # input zero scale
Expand All @@ -53,7 +53,7 @@ def _get_model(
relay.const(output_zp, "int32"), # output zero point
out_dtype=dtype,
)
params = {"w": w, "b": b}
params = {"w": weights_array, "b": b}
return req, params


Expand All @@ -76,9 +76,8 @@ def _get_model(
],
)
def test_fullyconnected(shape, out_channels, dtype, input_zp, input_sc, kernel_zp, kernel_sc):
"""
Test fully connected offloading.
"""
"""Compare Fully Connected output with TVM."""

np.random.seed(0)
inputs = {
"a": tvm.nd.array(
Expand Down Expand Up @@ -116,6 +115,8 @@ def test_fullyconnected(shape, out_channels, dtype, input_zp, input_sc, kernel_z

@requires_ethosn
def test_fullyconnected_failure():
"""Check Fully Connected error messages."""

trials = [
(
(1, 64),
Expand All @@ -139,7 +140,8 @@ def test_fullyconnected_failure():
0,
1,
"uint8",
"Weights tensor must have I dimension equal to the number of channels of the input tensor.;",
"Weights tensor must have I dimension equal to the number"
" of channels of the input tensor.;",
),
((1024, 64), (1, 64), 0, 1, 0, 1, 0, 1, "uint8", "batch size=1024, batch size must = 1;"),
]
Expand All @@ -157,10 +159,7 @@ def test_fullyconnected_failure():
dtype,
err_msg,
) in trials:
inputs = {
"a": tvm.nd.array(np.random.randint(0, high=255, size=shape, dtype=dtype)),
}
model, params = _get_model(
model, _ = _get_model(
shape,
weight_shape,
input_zp,
Expand Down
2 changes: 2 additions & 0 deletions tests/python/contrib/test_ethosn/test_leaky_relu.py
Expand Up @@ -49,6 +49,7 @@ def _get_model(shape, input_zp, input_sc, output_zp, output_sc, dtype, alpha):
@pytest.mark.parametrize("alpha", [0.001, 0.5678])
def test_leaky_relu(dtype, shape, alpha):
"""Compare Leaky ReLU output with TVM."""

np.random.seed(0)

iinfo = np.iinfo(dtype)
Expand All @@ -75,6 +76,7 @@ def test_leaky_relu(dtype, shape, alpha):
@pytest.mark.parametrize("alpha", [-1.34, 2.32, 1, 0])
def test_leaky_relu_unsupported_alpha(dtype, shape, alpha):
"""Test unsupported values of alpha (<= 0, >= 1) in Leaky ReLU."""

iinfo = np.iinfo(dtype)
zp_min = iinfo.min

Expand Down

0 comments on commit aa6c712

Please sign in to comment.