Skip to content

Commit

Permalink
Add reference variables to test
Browse files Browse the repository at this point in the history
Signed-off-by: Max H. Gerlach <git@maxgerlach.de>
  • Loading branch information
maxhgerlach committed Aug 27, 2021
1 parent 9506a38 commit a2466d3
Showing 1 changed file with 99 additions and 75 deletions.
174 changes: 99 additions & 75 deletions test/parallel/test_tensorflow.py
Expand Up @@ -30,6 +30,9 @@
import tensorflow as tf
from horovod.tensorflow.util import _executing_eagerly
from tensorflow.python.framework import ops
from tensorflow.python.ops import resource_variable_ops
from tensorflow.python.ops import variables as tf_ops_variables

import warnings

import horovod.tensorflow as hvd
Expand Down Expand Up @@ -2243,30 +2246,37 @@ def test_horovod_broadcast_inplace_cpu(self):
tf.float64, tf.bool]
dims = [1, 2, 3]
root_ranks = list(range(size))
for dtype, dim, root_rank in itertools.product(dtypes, dims, root_ranks):
with tf.device("/cpu:0"):
if dtype == tf.bool:
initial_value = tf.cast((tf.ones([17] * dim) * rank) % 2, dtype)
else:
initial_value = tf.cast(tf.ones([17] * dim) * rank, dtype)
if not hvd._executing_eagerly():
var = tf.Variable(initial_value)
init = tf.compat.v1.global_variables_initializer()
self.evaluate(init)
else:
var = self.tfe.Variable(initial_value)
root_tensor = tf.ones([17] * dim) * root_rank
if dtype == tf.bool:
root_tensor = root_tensor % 2
broadcasted_tensor, = hvd.broadcast_([var], root_rank)
self.assertEqual(var.dtype.base_dtype, dtype)
self.assertEqual(broadcasted_tensor.dtype.base_dtype, dtype)
np.testing.assert_array_equal(self.evaluate(broadcasted_tensor), self.evaluate(var),
err_msg="broadcasted_var and var may not differ, actually they should have the same underlying buffer")
self.assertTrue(
self.evaluate(tf.reduce_all(tf.equal(
tf.cast(root_tensor, tf.int32), tf.cast(broadcasted_tensor, tf.int32)))),
"Inplace hvd.broadcast_ produces incorrect broadcasted variable value")
for use_resource in [False, True]:
if not use_resource and _executing_eagerly():
continue
for dtype, dim, root_rank in itertools.product(dtypes, dims, root_ranks):
with tf.device("/cpu:0"):
if dtype == tf.bool:
initial_value = tf.cast((tf.ones([17] * dim) * rank) % 2, dtype)
else:
initial_value = tf.cast(tf.ones([17] * dim) * rank, dtype)
if not hvd._executing_eagerly():
if use_resource:
var = resource_variable_ops.ResourceVariable(initial_value)
else:
var = tf_ops_variables.RefVariable(initial_value)
init = tf.compat.v1.global_variables_initializer()
self.evaluate(init)
else:
assert use_resource
var = self.tfe.Variable(initial_value)
root_tensor = tf.ones([17] * dim) * root_rank
if dtype == tf.bool:
root_tensor = root_tensor % 2
broadcasted_tensor, = hvd.broadcast_([var], root_rank)
self.assertEqual(var.dtype.base_dtype, dtype)
self.assertEqual(broadcasted_tensor.dtype.base_dtype, dtype)
np.testing.assert_array_equal(self.evaluate(broadcasted_tensor), self.evaluate(var),
err_msg="broadcasted_var and var may not differ, actually they should have the same underlying buffer")
self.assertTrue(
self.evaluate(tf.reduce_all(tf.equal(
tf.cast(root_tensor, tf.int32), tf.cast(broadcasted_tensor, tf.int32)))),
"Inplace hvd.broadcast_ produces incorrect broadcasted variable value")


def test_horovod_broadcast_inplace_gpu(self):
Expand Down Expand Up @@ -2294,32 +2304,39 @@ def test_horovod_broadcast_inplace_gpu(self):
dtypes = [tf.int64, tf.float16, tf.float32, tf.float64]
dims = [1, 2, 3]
root_ranks = list(range(size))
for counter, (dtype, dim, root_rank) in enumerate(itertools.product(dtypes, dims, root_ranks)):
with tf.device("/gpu:%d" % local_rank):
if dtype == tf.bool:
initial_value = tf.cast((tf.ones([17] * dim) * rank) % 2, dtype)
else:
initial_value = tf.cast(tf.ones([17] * dim) * rank, dtype)
root_tensor = tf.ones([17] * dim) * root_rank
if dtype == tf.bool:
root_tensor = root_tensor % 2
if not hvd._executing_eagerly() or not tf.compat.v1.resource_variables_enabled():
var = tf.Variable(initial_value)
init = tf.compat.v1.global_variables_initializer()
self.evaluate(init)
else:
var = self.tfe.Variable(initial_value)
broadcasted_tensor, = hvd.broadcast_([var], root_rank)
self.assertEqual(var.dtype.base_dtype, dtype)
self.assertEqual(broadcasted_tensor.dtype.base_dtype, dtype)
np.testing.assert_array_equal(self.evaluate(broadcasted_tensor), self.evaluate(var),
err_msg="broadcasted_var and var may not differ, actually they should have the same underlying buffer")
self.assertTrue(
self.evaluate(tf.reduce_all(tf.equal(
tf.cast(root_tensor, tf.int32), tf.cast(broadcasted_tensor, tf.int32)))),
"Inplace hvd.broadcast_ produces incorrect broadcasted variable value")
for use_resource in [False, True]:
if not use_resource and _executing_eagerly():
continue
for counter, (dtype, dim, root_rank) in enumerate(itertools.product(dtypes, dims, root_ranks)):
with tf.device("/gpu:%d" % local_rank):
if dtype == tf.bool:
initial_value = tf.cast((tf.ones([17] * dim) * rank) % 2, dtype)
else:
initial_value = tf.cast(tf.ones([17] * dim) * rank, dtype)
root_tensor = tf.ones([17] * dim) * root_rank
if dtype == tf.bool:
root_tensor = root_tensor % 2
if not hvd._executing_eagerly():
if use_resource:
var = resource_variable_ops.ResourceVariable(initial_value)
else:
var = tf_ops_variables.RefVariable(initial_value)
init = tf.compat.v1.global_variables_initializer()
self.evaluate(init)
else:
assert use_resource
var = self.tfe.Variable(initial_value)
broadcasted_tensor, = hvd.broadcast_([var], root_rank)
self.assertEqual(var.dtype.base_dtype, dtype)
self.assertEqual(broadcasted_tensor.dtype.base_dtype, dtype)
np.testing.assert_array_equal(self.evaluate(broadcasted_tensor), self.evaluate(var),
err_msg="broadcasted_var and var may not differ, actually they should have the same underlying buffer")
self.assertTrue(
self.evaluate(tf.reduce_all(tf.equal(
tf.cast(root_tensor, tf.int32), tf.cast(broadcasted_tensor, tf.int32)))),
"Inplace hvd.broadcast_ produces incorrect broadcasted variable value")

def test_horovod_broadcast_inplace_multiple(self):
def test_horovod_broadcast_inplace_multiple_cpu(self):
"""Test that the inplace broadcast correctly broadcasts multiple variables on CPU."""
if LooseVersion(tf.__version__) < LooseVersion('2.6.0'):
self.skipTest("Custom Ops using resource variables only work with TF 2.6+")
Expand All @@ -2335,32 +2352,39 @@ def test_horovod_broadcast_inplace_multiple(self):
dtypes = [tf.float32]
dims = [1, 2, 3]
root_ranks = list(range(size))
for dtype, root_rank in itertools.product(dtypes, root_ranks):
with tf.device("/cpu:0"):
variables = []
root_tensors = []
for dim in dims:
initial_value = tf.cast(tf.ones([17] * dim) * rank, dtype)
if not hvd._executing_eagerly():
var = tf.Variable(initial_value, name=f"{dim}_dim_var")
init = tf.compat.v1.global_variables_initializer()
self.evaluate(init)
else:
var = self.tfe.Variable(initial_value, name=f"{dim}_dim_var")
root_tensor = tf.ones([17] * dim) * root_rank
variables.append(var)
root_tensors.append(root_tensor)

broadcasted_tensors = hvd.broadcast_(variables, root_rank)
for broadcasted_tensor, var, root_tensor in zip(broadcasted_tensors, variables, root_tensors):
self.assertEqual(var.dtype.base_dtype, dtype)
self.assertEqual(broadcasted_tensor.dtype.base_dtype, dtype)
np.testing.assert_array_equal(self.evaluate(broadcasted_tensor), self.evaluate(var),
err_msg="broadcasted_var and var may not differ, actually they should have the same underlying buffer")
self.assertTrue(
self.evaluate(tf.reduce_all(tf.equal(
tf.cast(root_tensor, tf.int32), tf.cast(broadcasted_tensor, tf.int32)))),
"Inplace hvd.broadcast_ produces incorrect broadcasted variable value")
for use_resource in [False, True]:
if not use_resource and _executing_eagerly():
continue
for dtype, root_rank in itertools.product(dtypes, root_ranks):
with tf.device("/cpu:0"):
variables = []
root_tensors = []
for dim in dims:
initial_value = tf.cast(tf.ones([17] * dim) * rank, dtype)
if not hvd._executing_eagerly():
if use_resource:
var = resource_variable_ops.ResourceVariable(initial_value, name=f"dim_{dim}_var")
else:
var = tf_ops_variables.RefVariable(initial_value, name=f"dim_{dim}_var")
init = tf.compat.v1.global_variables_initializer()
self.evaluate(init)
else:
assert use_resource
var = self.tfe.Variable(initial_value, name=f"dim_{dim}_var")
root_tensor = tf.ones([17] * dim) * root_rank
variables.append(var)
root_tensors.append(root_tensor)

broadcasted_tensors = hvd.broadcast_(variables, root_rank)
for broadcasted_tensor, var, root_tensor in zip(broadcasted_tensors, variables, root_tensors):
self.assertEqual(var.dtype.base_dtype, dtype)
self.assertEqual(broadcasted_tensor.dtype.base_dtype, dtype)
np.testing.assert_array_equal(self.evaluate(broadcasted_tensor), self.evaluate(var),
err_msg="broadcasted_var and var may not differ, actually they should have the same underlying buffer")
self.assertTrue(
self.evaluate(tf.reduce_all(tf.equal(
tf.cast(root_tensor, tf.int32), tf.cast(broadcasted_tensor, tf.int32)))),
"Inplace hvd.broadcast_ produces incorrect broadcasted variable value")

def test_horovod_broadcast_cpu_process_sets(self):
"""Test that the broadcast correctly broadcasts 1D, 2D, 3D tensors on CPU
Expand Down

0 comments on commit a2466d3

Please sign in to comment.