Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow custom spaces in VectorEnv #2038

Merged
merged 6 commits into from Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions gym/error.py
Expand Up @@ -165,3 +165,10 @@ class ClosedEnvironmentError(Exception):
Trying to call `reset`, or `step`, while the environment is closed.
"""
pass

class CustomSpaceError(Exception):
"""
The space is a custom gym.Space instance, and is not supported by
`AsyncVectorEnv` with `shared_memory=True`.
"""
pass
23 changes: 16 additions & 7 deletions gym/vector/async_vector_env.py
Expand Up @@ -8,7 +8,7 @@
from gym import logger
from gym.vector.vector_env import VectorEnv
from gym.error import (AlreadyPendingCallError, NoAsyncCallError,
ClosedEnvironmentError)
ClosedEnvironmentError, CustomSpaceError)
from gym.vector.utils import (create_shared_memory, create_empty_array,
write_to_shared_memory, read_from_shared_memory,
concatenate, CloudpickleWrapper, clear_mpi_env_vars)
Expand Down Expand Up @@ -88,10 +88,18 @@ def __init__(self, env_fns, observation_space=None, action_space=None,
observation_space=observation_space, action_space=action_space)

if self.shared_memory:
_obs_buffer = create_shared_memory(self.single_observation_space,
n=self.num_envs, ctx=ctx)
self.observations = read_from_shared_memory(_obs_buffer,
self.single_observation_space, n=self.num_envs)
try:
_obs_buffer = create_shared_memory(self.single_observation_space,
n=self.num_envs, ctx=ctx)
self.observations = read_from_shared_memory(_obs_buffer,
self.single_observation_space, n=self.num_envs)
except CustomSpaceError:
raise ValueError('Using `shared_memory=True` in `AsyncVectorEnv` '
'is incompatible with non-standard Gym observation spaces '
'(i.e. custom spaces inheriting from `gym.Space`), and is '
'only compatible with default Gym spaces (e.g. `Box`, '
'`Tuple`, `Dict`) for batching. Set `shared_memory=False` '
'if you use custom observation spaces.')
else:
_obs_buffer = None
self.observations = create_empty_array(
Expand Down Expand Up @@ -176,7 +184,8 @@ def reset_wait(self, timeout=None):
self._state = AsyncState.DEFAULT

if not self.shared_memory:
concatenate(results, self.observations, self.single_observation_space)
self.observations = concatenate(results, self.observations,
self.single_observation_space)

return deepcopy(self.observations) if self.copy else self.observations

Expand Down Expand Up @@ -235,7 +244,7 @@ def step_wait(self, timeout=None):
observations_list, rewards, dones, infos = zip(*results)

if not self.shared_memory:
concatenate(observations_list, self.observations,
self.observations = concatenate(observations_list, self.observations,
self.single_observation_space)

return (deepcopy(self.observations) if self.copy else self.observations,
Expand Down
8 changes: 5 additions & 3 deletions gym/vector/sync_vector_env.py
Expand Up @@ -63,9 +63,10 @@ def reset_wait(self):
for env in self.envs:
observation = env.reset()
observations.append(observation)
concatenate(observations, self.observations, self.single_observation_space)
self.observations = concatenate(observations, self.observations,
self.single_observation_space)

return np.copy(self.observations) if self.copy else self.observations
return deepcopy(self.observations) if self.copy else self.observations

def step_async(self, actions):
self._actions = actions
Expand All @@ -78,7 +79,8 @@ def step_wait(self):
observation = env.reset()
observations.append(observation)
infos.append(info)
concatenate(observations, self.observations, self.single_observation_space)
self.observations = concatenate(observations, self.observations,
self.single_observation_space)

return (deepcopy(self.observations) if self.copy else self.observations,
np.copy(self._rewards), np.copy(self._dones), infos)
Expand Down
33 changes: 31 additions & 2 deletions gym/vector/tests/test_async_vector_env.py
Expand Up @@ -2,10 +2,11 @@
import numpy as np

from multiprocessing import TimeoutError
from gym.spaces import Box
from gym.spaces import Box, Tuple
from gym.error import (AlreadyPendingCallError, NoAsyncCallError,
ClosedEnvironmentError)
from gym.vector.tests.utils import make_env, make_slow_env
from gym.vector.tests.utils import (CustomSpace, make_env,
make_slow_env, make_custom_space_env)

from gym.vector.async_vector_env import AsyncVectorEnv

Expand Down Expand Up @@ -194,3 +195,31 @@ def test_check_observations_async_vector_env(shared_memory):
with pytest.raises(RuntimeError):
env = AsyncVectorEnv(env_fns, shared_memory=shared_memory)
env.close(terminate=True)


def test_custom_space_async_vector_env():
env_fns = [make_custom_space_env(i) for i in range(4)]
try:
env = AsyncVectorEnv(env_fns, shared_memory=False)
reset_observations = env.reset()
actions = ('action-2', 'action-3', 'action-5', 'action-7')
step_observations, rewards, dones, _ = env.step(actions)
finally:
env.close()

assert isinstance(env.single_observation_space, CustomSpace)
assert isinstance(env.observation_space, Tuple)

assert isinstance(reset_observations, tuple)
assert reset_observations == ('reset', 'reset', 'reset', 'reset')

assert isinstance(step_observations, tuple)
assert step_observations == ('step(action-2)', 'step(action-3)',
'step(action-5)', 'step(action-7)')


def test_custom_space_async_vector_env_shared_memory():
env_fns = [make_custom_space_env(i) for i in range(4)]
with pytest.raises(ValueError):
env = AsyncVectorEnv(env_fns, shared_memory=True)
env.close(terminate=True)
14 changes: 12 additions & 2 deletions gym/vector/tests/test_shared_memory.py
Expand Up @@ -7,9 +7,10 @@
from multiprocessing import Array, Process
from collections import OrderedDict

from gym.spaces import Tuple, Dict
from gym.spaces import Box, Tuple, Dict
from gym.error import CustomSpaceError
from gym.vector.utils.spaces import _BaseGymSpaces
from gym.vector.tests.utils import spaces
from gym.vector.tests.utils import spaces, custom_spaces

from gym.vector.utils.shared_memory import (create_shared_memory,
read_from_shared_memory, write_to_shared_memory)
Expand Down Expand Up @@ -64,6 +65,15 @@ def assert_nested_type(lhs, rhs, n):
assert_nested_type(shared_memory, expected_type, n=n)


@pytest.mark.parametrize('n', [1, 8])
@pytest.mark.parametrize('ctx', [None, 'fork', 'spawn'], ids=['default', 'fork', 'spawn'])
@pytest.mark.parametrize('space', custom_spaces)
def test_create_shared_memory_custom_space(n, ctx, space):
ctx = mp if (ctx is None) else mp.get_context(ctx)
with pytest.raises(CustomSpaceError):
shared_memory = create_shared_memory(space, n=n, ctx=ctx)


@pytest.mark.parametrize('space', spaces,
ids=[space.__class__.__name__ for space in spaces])
def test_write_to_shared_memory(space):
Expand Down
17 changes: 16 additions & 1 deletion gym/vector/tests/test_spaces.py
Expand Up @@ -2,7 +2,7 @@
import numpy as np

from gym.spaces import Box, MultiDiscrete, Tuple, Dict
from gym.vector.tests.utils import spaces
from gym.vector.tests.utils import spaces, custom_spaces, CustomSpace

from gym.vector.utils.spaces import _BaseGymSpaces, batch_space

Expand Down Expand Up @@ -32,8 +32,23 @@
})
]

expected_custom_batch_spaces_4 = [
Tuple((CustomSpace(), CustomSpace(), CustomSpace(), CustomSpace())),
Tuple((
Tuple((CustomSpace(), CustomSpace(), CustomSpace(), CustomSpace())),
Box(low=0, high=255, shape=(4,), dtype=np.uint8)
))
]

@pytest.mark.parametrize('space,expected_batch_space_4', list(zip(spaces,
expected_batch_spaces_4)), ids=[space.__class__.__name__ for space in spaces])
def test_batch_space(space, expected_batch_space_4):
batch_space_4 = batch_space(space, n=4)
assert batch_space_4 == expected_batch_space_4


@pytest.mark.parametrize('space,expected_batch_space_4', list(zip(custom_spaces,
expected_custom_batch_spaces_4)), ids=[space.__class__.__name__ for space in custom_spaces])
def test_batch_space_custom_space(space, expected_batch_space_4):
batch_space_4 = batch_space(space, n=4)
assert batch_space_4 == expected_batch_space_4
25 changes: 23 additions & 2 deletions gym/vector/tests/test_sync_vector_env.py
@@ -1,8 +1,8 @@
import pytest
import numpy as np

from gym.spaces import Box
from gym.vector.tests.utils import make_env
from gym.spaces import Box, Tuple
from gym.vector.tests.utils import CustomSpace, make_env, make_custom_space_env

from gym.vector.sync_vector_env import SyncVectorEnv

Expand Down Expand Up @@ -70,3 +70,24 @@ def test_check_observations_sync_vector_env():
with pytest.raises(RuntimeError):
env = SyncVectorEnv(env_fns)
env.close()


def test_custom_space_sync_vector_env():
env_fns = [make_custom_space_env(i) for i in range(4)]
try:
env = SyncVectorEnv(env_fns)
reset_observations = env.reset()
actions = ('action-2', 'action-3', 'action-5', 'action-7')
step_observations, rewards, dones, _ = env.step(actions)
finally:
env.close()

assert isinstance(env.single_observation_space, CustomSpace)
assert isinstance(env.observation_space, Tuple)

assert isinstance(reset_observations, tuple)
assert reset_observations == ('reset', 'reset', 'reset', 'reset')

assert isinstance(step_observations, tuple)
assert step_observations == ('step(action-2)', 'step(action-3)',
'step(action-5)', 'step(action-7)')
14 changes: 13 additions & 1 deletion gym/vector/tests/test_vector_env.py
@@ -1,10 +1,12 @@
import pytest
import numpy as np

from gym.vector.tests.utils import make_env
from gym.spaces import Tuple
from gym.vector.tests.utils import CustomSpace, make_env

from gym.vector.async_vector_env import AsyncVectorEnv
from gym.vector.sync_vector_env import SyncVectorEnv
from gym.vector.vector_env import VectorEnv

@pytest.mark.parametrize('shared_memory', [True, False])
def test_vector_env_equal(shared_memory):
Expand Down Expand Up @@ -41,3 +43,13 @@ def test_vector_env_equal(shared_memory):
finally:
async_env.close()
sync_env.close()


def test_custom_space_vector_env():
env = VectorEnv(4, CustomSpace(), CustomSpace())

assert isinstance(env.single_observation_space, CustomSpace)
assert isinstance(env.observation_space, Tuple)

assert isinstance(env.single_action_space, CustomSpace)
assert isinstance(env.action_space, Tuple)
31 changes: 31 additions & 0 deletions gym/vector/tests/utils.py
Expand Up @@ -47,6 +47,30 @@ def step(self, action):
reward, done = 0., False
return observation, reward, done, {}

class CustomSpace(gym.Space):
"""Minimal custom observation space."""
def __eq__(self, other):
return isinstance(other, CustomSpace)

custom_spaces = [
CustomSpace(),
Tuple((CustomSpace(), Box(low=0, high=255, shape=(), dtype=np.uint8)))
]

class CustomSpaceEnv(gym.Env):
def __init__(self):
super(CustomSpaceEnv, self).__init__()
self.observation_space = CustomSpace()
self.action_space = CustomSpace()

def reset(self):
return 'reset'

def step(self, action):
observation = 'step({0:s})'.format(action)
reward, done = 0., False
return observation, reward, done, {}

def make_env(env_name, seed):
def _make():
env = gym.make(env_name)
Expand All @@ -60,3 +84,10 @@ def _make():
env.seed(seed)
return env
return _make

def make_custom_space_env(seed):
def _make():
env = CustomSpaceEnv()
env.seed(seed)
return env
return _make
18 changes: 15 additions & 3 deletions gym/vector/utils/numpy_utils.py
@@ -1,6 +1,6 @@
import numpy as np

from gym.spaces import Tuple, Dict
from gym.spaces import Space, Tuple, Dict
from gym.vector.utils.spaces import _BaseGymSpaces
from collections import OrderedDict

Expand Down Expand Up @@ -42,8 +42,11 @@ def concatenate(items, out, space):
return concatenate_tuple(items, out, space)
elif isinstance(space, Dict):
return concatenate_dict(items, out, space)
elif isinstance(space, Space):
return concatenate_custom(items, out, space)
else:
raise NotImplementedError()
raise ValueError('Space of type `{0}` is not a valid `gym.Space` '
'instance.'.format(type(space)))

def concatenate_base(items, out, space):
return np.stack(items, axis=0, out=out)
Expand All @@ -56,6 +59,9 @@ def concatenate_dict(items, out, space):
return OrderedDict([(key, concatenate([item[key] for item in items],
out[key], subspace)) for (key, subspace) in space.spaces.items()])

def concatenate_custom(items, out, space):
return tuple(items)


def create_empty_array(space, n=1, fn=np.zeros):
"""Create an empty (possibly nested) numpy array.
Expand Down Expand Up @@ -96,8 +102,11 @@ def create_empty_array(space, n=1, fn=np.zeros):
return create_empty_array_tuple(space, n=n, fn=fn)
elif isinstance(space, Dict):
return create_empty_array_dict(space, n=n, fn=fn)
elif isinstance(space, Space):
return create_empty_array_custom(space, n=n, fn=fn)
else:
raise NotImplementedError()
raise ValueError('Space of type `{0}` is not a valid `gym.Space` '
'instance.'.format(type(space)))

def create_empty_array_base(space, n=1, fn=np.zeros):
shape = space.shape if (n is None) else (n,) + space.shape
Expand All @@ -110,3 +119,6 @@ def create_empty_array_tuple(space, n=1, fn=np.zeros):
def create_empty_array_dict(space, n=1, fn=np.zeros):
return OrderedDict([(key, create_empty_array(subspace, n=n, fn=fn))
for (key, subspace) in space.spaces.items()])

def create_empty_array_custom(space, n=1, fn=np.zeros):
return None
19 changes: 16 additions & 3 deletions gym/vector/utils/shared_memory.py
Expand Up @@ -5,6 +5,7 @@

from gym import logger
from gym.spaces import Tuple, Dict
from gym.error import CustomSpaceError
from gym.vector.utils.spaces import _BaseGymSpaces

__all__ = [
Expand Down Expand Up @@ -41,7 +42,11 @@ def create_shared_memory(space, n=1, ctx=mp):
elif isinstance(space, Dict):
return create_dict_shared_memory(space, n=n, ctx=ctx)
else:
raise NotImplementedError()
raise CustomSpaceError('Cannot create a shared memory for space with '
'type `{0}`. Shared memory only supports '
'default Gym spaces (e.g. `Box`, `Tuple`, '
'`Dict`, etc...), and does not support custom '
'Gym spaces.'.format(type(space)))

def create_base_shared_memory(space, n=1, ctx=mp):
dtype = space.dtype.char
Expand Down Expand Up @@ -92,7 +97,11 @@ def read_from_shared_memory(shared_memory, space, n=1):
elif isinstance(space, Dict):
return read_dict_from_shared_memory(shared_memory, space, n=n)
else:
raise NotImplementedError()
raise CustomSpaceError('Cannot read from a shared memory for space with '
'type `{0}`. Shared memory only supports '
'default Gym spaces (e.g. `Box`, `Tuple`, '
'`Dict`, etc...), and does not support custom '
'Gym spaces.'.format(type(space)))

def read_base_from_shared_memory(shared_memory, space, n=1):
return np.frombuffer(shared_memory.get_obj(),
Expand Down Expand Up @@ -136,7 +145,11 @@ def write_to_shared_memory(index, value, shared_memory, space):
elif isinstance(space, Dict):
write_dict_to_shared_memory(index, value, shared_memory, space)
else:
raise NotImplementedError()
raise CustomSpaceError('Cannot write to a shared memory for space with '
'type `{0}`. Shared memory only supports '
'default Gym spaces (e.g. `Box`, `Tuple`, '
'`Dict`, etc...), and does not support custom '
'Gym spaces.'.format(type(space)))

def write_base_to_shared_memory(index, value, shared_memory, space):
size = int(np.prod(space.shape))
Expand Down