Module pearl.user_envs.wrappers.partial_observability
Expand source code
# pyre-ignore-all-errors
from abc import abstractmethod
import numpy as np
try:
import gymnasium as gym
except ModuleNotFoundError:
print("gymnasium module not found.")
class PartialObservableWrapper(gym.Wrapper):
def __init__(self, env, time_between_two_valid_obs=1):
super(PartialObservableWrapper, self).__init__(env)
self.env.number_of_steps = 0
self.time_between_two_valid_obs = time_between_two_valid_obs
def reset(self, **kwargs):
observation, info = self.env.reset(**kwargs)
return self.observation(observation), info
def step(self, action):
observation, reward, done, truncated, info = self.env.step(action)
self.env.number_of_steps += 1
return self.observation(observation), reward, done, truncated, info
@abstractmethod
def observation(self, observation):
raise NotImplementedError
class CartPolePartialObservableWrapper(PartialObservableWrapper):
r"""Observation wrapper that make CartPole environment partial observable."""
def __init__(self, env, time_between_two_valid_obs=1):
super(CartPolePartialObservableWrapper, self).__init__(
env, time_between_two_valid_obs
)
high = np.array(
[self.x_threshold * 2, self.theta_threshold_radians * 2, 1.0],
dtype=np.float32,
)
self.observation_space = gym.spaces.Box(-high, high, dtype=np.float32)
def observation(self, observation):
if self.env.number_of_steps % self.time_between_two_valid_obs != 0:
return np.zeros(3, dtype=np.float32)
else:
return np.array([observation[0], observation[2], 1.0], dtype=np.float32)
class AcrobotPartialObservableWrapper(PartialObservableWrapper):
r"""Observation wrapper that make Acrobat environment partial observable."""
def __init__(self, env, time_between_two_valid_obs=1):
super(AcrobotPartialObservableWrapper, self).__init__(
env, time_between_two_valid_obs
)
high = np.array([1.0, 1.0, 1.0, 1.0, 1.0], dtype=np.float32)
self.observation_space = gym.spaces.Box(low=-high, high=high, dtype=np.float32)
def observation(self, observation):
if self.env.number_of_steps % self.time_between_two_valid_obs != 0:
return np.zeros(5, dtype=np.float32)
else:
return np.array(
[observation[0], observation[1], observation[2], observation[3], 1.0],
dtype=np.float32,
)
class PendulumPartialObservableWrapper(PartialObservableWrapper):
r"""Observation wrapper that make Pendulum environment partial observable."""
def __init__(self, env, time_between_two_valid_obs=1):
super(PendulumPartialObservableWrapper, self).__init__(
env, time_between_two_valid_obs
)
high = np.array([1.0, 1.0, 1.0], dtype=np.float32)
self.observation_space = gym.spaces.Box(low=-high, high=high, dtype=np.float32)
def observation(self, observation):
if self.env.number_of_steps % self.time_between_two_valid_obs != 0:
return np.zeros(3, dtype=np.float32)
else:
return np.array([observation[0], observation[1], 1.0], dtype=np.float32)
class MountainCarPartialObservableWrapper(PartialObservableWrapper):
r"""Observation wrapper that make MountainCar environment partial observable."""
def __init__(self, env, time_between_two_valid_obs=1):
super(MountainCarPartialObservableWrapper, self).__init__(
env, time_between_two_valid_obs
)
high = np.array([self.env.max_position, 1.0], dtype=np.float32)
self.observation_space = gym.spaces.Box(low=-high, high=high, dtype=np.float32)
def observation(self, observation):
if self.env.number_of_steps % self.time_between_two_valid_obs != 0:
return np.zeros(2, dtype=np.float32)
else:
return np.array([observation[0], 1.0], dtype=np.float32)
class PuckWorldPartialObservableWrapper(PartialObservableWrapper):
r"""Observation wrapper that make PuckWorld environment partial observable."""
def __init__(self, env):
super(PuckWorldPartialObservableWrapper, self).__init__(env)
high = np.array([np.inf, np.inf, 3], dtype=np.float32)
self.observation_space = gym.spaces.Box(low=-high, high=high, dtype=np.float32)
def observation(self, observation):
idx = self.env.number_of_steps % 4
return np.array(
[
observation[2 * idx],
observation[2 * idx + 1],
idx,
],
dtype=np.float32,
)
Classes
class AcrobotPartialObservableWrapper (env, time_between_two_valid_obs=1)
-
Observation wrapper that make Acrobat environment partial observable.
Wraps an environment to allow a modular transformation of the :meth:
step
and :meth:reset
methods.Args
env
- The environment to wrap
Expand source code
class AcrobotPartialObservableWrapper(PartialObservableWrapper): r"""Observation wrapper that make Acrobat environment partial observable.""" def __init__(self, env, time_between_two_valid_obs=1): super(AcrobotPartialObservableWrapper, self).__init__( env, time_between_two_valid_obs ) high = np.array([1.0, 1.0, 1.0, 1.0, 1.0], dtype=np.float32) self.observation_space = gym.spaces.Box(low=-high, high=high, dtype=np.float32) def observation(self, observation): if self.env.number_of_steps % self.time_between_two_valid_obs != 0: return np.zeros(5, dtype=np.float32) else: return np.array( [observation[0], observation[1], observation[2], observation[3], 1.0], dtype=np.float32, )
Ancestors
- PartialObservableWrapper
- gymnasium.core.Wrapper
- gymnasium.core.Env
- typing.Generic
Methods
def observation(self, observation)
-
Expand source code
def observation(self, observation): if self.env.number_of_steps % self.time_between_two_valid_obs != 0: return np.zeros(5, dtype=np.float32) else: return np.array( [observation[0], observation[1], observation[2], observation[3], 1.0], dtype=np.float32, )
Inherited members
class CartPolePartialObservableWrapper (env, time_between_two_valid_obs=1)
-
Observation wrapper that make CartPole environment partial observable.
Wraps an environment to allow a modular transformation of the :meth:
step
and :meth:reset
methods.Args
env
- The environment to wrap
Expand source code
class CartPolePartialObservableWrapper(PartialObservableWrapper): r"""Observation wrapper that make CartPole environment partial observable.""" def __init__(self, env, time_between_two_valid_obs=1): super(CartPolePartialObservableWrapper, self).__init__( env, time_between_two_valid_obs ) high = np.array( [self.x_threshold * 2, self.theta_threshold_radians * 2, 1.0], dtype=np.float32, ) self.observation_space = gym.spaces.Box(-high, high, dtype=np.float32) def observation(self, observation): if self.env.number_of_steps % self.time_between_two_valid_obs != 0: return np.zeros(3, dtype=np.float32) else: return np.array([observation[0], observation[2], 1.0], dtype=np.float32)
Ancestors
- PartialObservableWrapper
- gymnasium.core.Wrapper
- gymnasium.core.Env
- typing.Generic
Methods
def observation(self, observation)
-
Expand source code
def observation(self, observation): if self.env.number_of_steps % self.time_between_two_valid_obs != 0: return np.zeros(3, dtype=np.float32) else: return np.array([observation[0], observation[2], 1.0], dtype=np.float32)
Inherited members
class MountainCarPartialObservableWrapper (env, time_between_two_valid_obs=1)
-
Observation wrapper that make MountainCar environment partial observable.
Wraps an environment to allow a modular transformation of the :meth:
step
and :meth:reset
methods.Args
env
- The environment to wrap
Expand source code
class MountainCarPartialObservableWrapper(PartialObservableWrapper): r"""Observation wrapper that make MountainCar environment partial observable.""" def __init__(self, env, time_between_two_valid_obs=1): super(MountainCarPartialObservableWrapper, self).__init__( env, time_between_two_valid_obs ) high = np.array([self.env.max_position, 1.0], dtype=np.float32) self.observation_space = gym.spaces.Box(low=-high, high=high, dtype=np.float32) def observation(self, observation): if self.env.number_of_steps % self.time_between_two_valid_obs != 0: return np.zeros(2, dtype=np.float32) else: return np.array([observation[0], 1.0], dtype=np.float32)
Ancestors
- PartialObservableWrapper
- gymnasium.core.Wrapper
- gymnasium.core.Env
- typing.Generic
Methods
def observation(self, observation)
-
Expand source code
def observation(self, observation): if self.env.number_of_steps % self.time_between_two_valid_obs != 0: return np.zeros(2, dtype=np.float32) else: return np.array([observation[0], 1.0], dtype=np.float32)
Inherited members
class PartialObservableWrapper (env, time_between_two_valid_obs=1)
-
Wraps a :class:
gymnasium.Env
to allow a modular transformation of the :meth:step
and :meth:reset
methods.This class is the base class of all wrappers to change the behavior of the underlying environment. Wrappers that inherit from this class can modify the :attr:
action_space
, :attr:observation_space
, :attr:reward_range
and :attr:metadata
attributes, without changing the underlying environment's attributes. Moreover, the behavior of the :meth:step
and :meth:reset
methods can be changed by these wrappers.Some attributes (:attr:
spec
, :attr:render_mode
, :attr:np_random
) will point back to the wrapper's environment (i.e. to the corresponding attributes of :attr:env
).Note
If you inherit from :class:
Wrapper
, don't forget to callsuper().__init__(env)
Wraps an environment to allow a modular transformation of the :meth:
step
and :meth:reset
methods.Args
env
- The environment to wrap
Expand source code
class PartialObservableWrapper(gym.Wrapper): def __init__(self, env, time_between_two_valid_obs=1): super(PartialObservableWrapper, self).__init__(env) self.env.number_of_steps = 0 self.time_between_two_valid_obs = time_between_two_valid_obs def reset(self, **kwargs): observation, info = self.env.reset(**kwargs) return self.observation(observation), info def step(self, action): observation, reward, done, truncated, info = self.env.step(action) self.env.number_of_steps += 1 return self.observation(observation), reward, done, truncated, info @abstractmethod def observation(self, observation): raise NotImplementedError
Ancestors
- gymnasium.core.Wrapper
- gymnasium.core.Env
- typing.Generic
Subclasses
- AcrobotPartialObservableWrapper
- CartPolePartialObservableWrapper
- MountainCarPartialObservableWrapper
- PendulumPartialObservableWrapper
- PuckWorldPartialObservableWrapper
Methods
def observation(self, observation)
-
Expand source code
@abstractmethod def observation(self, observation): raise NotImplementedError
def reset(self, **kwargs)
-
Uses the :meth:
reset
of the :attr:env
that can be overwritten to change the returned data.Expand source code
def reset(self, **kwargs): observation, info = self.env.reset(**kwargs) return self.observation(observation), info
def step(self, action)
-
Uses the :meth:
step
of the :attr:env
that can be overwritten to change the returned data.Expand source code
def step(self, action): observation, reward, done, truncated, info = self.env.step(action) self.env.number_of_steps += 1 return self.observation(observation), reward, done, truncated, info
class PendulumPartialObservableWrapper (env, time_between_two_valid_obs=1)
-
Observation wrapper that make Pendulum environment partial observable.
Wraps an environment to allow a modular transformation of the :meth:
step
and :meth:reset
methods.Args
env
- The environment to wrap
Expand source code
class PendulumPartialObservableWrapper(PartialObservableWrapper): r"""Observation wrapper that make Pendulum environment partial observable.""" def __init__(self, env, time_between_two_valid_obs=1): super(PendulumPartialObservableWrapper, self).__init__( env, time_between_two_valid_obs ) high = np.array([1.0, 1.0, 1.0], dtype=np.float32) self.observation_space = gym.spaces.Box(low=-high, high=high, dtype=np.float32) def observation(self, observation): if self.env.number_of_steps % self.time_between_two_valid_obs != 0: return np.zeros(3, dtype=np.float32) else: return np.array([observation[0], observation[1], 1.0], dtype=np.float32)
Ancestors
- PartialObservableWrapper
- gymnasium.core.Wrapper
- gymnasium.core.Env
- typing.Generic
Methods
def observation(self, observation)
-
Expand source code
def observation(self, observation): if self.env.number_of_steps % self.time_between_two_valid_obs != 0: return np.zeros(3, dtype=np.float32) else: return np.array([observation[0], observation[1], 1.0], dtype=np.float32)
Inherited members
class PuckWorldPartialObservableWrapper (env)
-
Observation wrapper that make PuckWorld environment partial observable.
Wraps an environment to allow a modular transformation of the :meth:
step
and :meth:reset
methods.Args
env
- The environment to wrap
Expand source code
class PuckWorldPartialObservableWrapper(PartialObservableWrapper): r"""Observation wrapper that make PuckWorld environment partial observable.""" def __init__(self, env): super(PuckWorldPartialObservableWrapper, self).__init__(env) high = np.array([np.inf, np.inf, 3], dtype=np.float32) self.observation_space = gym.spaces.Box(low=-high, high=high, dtype=np.float32) def observation(self, observation): idx = self.env.number_of_steps % 4 return np.array( [ observation[2 * idx], observation[2 * idx + 1], idx, ], dtype=np.float32, )
Ancestors
- PartialObservableWrapper
- gymnasium.core.Wrapper
- gymnasium.core.Env
- typing.Generic
Methods
def observation(self, observation)
-
Expand source code
def observation(self, observation): idx = self.env.number_of_steps % 4 return np.array( [ observation[2 * idx], observation[2 * idx + 1], idx, ], dtype=np.float32, )
Inherited members