Module pearl.user_envs.wrappers.partial_observability

Expand source code
# pyre-ignore-all-errors
from abc import abstractmethod

import numpy as np

try:
    import gymnasium as gym
except ModuleNotFoundError:
    print("gymnasium module not found.")


class PartialObservableWrapper(gym.Wrapper):
    def __init__(self, env, time_between_two_valid_obs=1):
        super(PartialObservableWrapper, self).__init__(env)
        self.env.number_of_steps = 0
        self.time_between_two_valid_obs = time_between_two_valid_obs

    def reset(self, **kwargs):
        observation, info = self.env.reset(**kwargs)
        return self.observation(observation), info

    def step(self, action):
        observation, reward, done, truncated, info = self.env.step(action)
        self.env.number_of_steps += 1
        return self.observation(observation), reward, done, truncated, info

    @abstractmethod
    def observation(self, observation):
        raise NotImplementedError


class CartPolePartialObservableWrapper(PartialObservableWrapper):
    r"""Observation wrapper that make CartPole environment partial observable."""

    def __init__(self, env, time_between_two_valid_obs=1):
        super(CartPolePartialObservableWrapper, self).__init__(
            env, time_between_two_valid_obs
        )
        high = np.array(
            [self.x_threshold * 2, self.theta_threshold_radians * 2, 1.0],
            dtype=np.float32,
        )

        self.observation_space = gym.spaces.Box(-high, high, dtype=np.float32)

    def observation(self, observation):
        if self.env.number_of_steps % self.time_between_two_valid_obs != 0:
            return np.zeros(3, dtype=np.float32)
        else:
            return np.array([observation[0], observation[2], 1.0], dtype=np.float32)


class AcrobotPartialObservableWrapper(PartialObservableWrapper):
    r"""Observation wrapper that make Acrobat environment partial observable."""

    def __init__(self, env, time_between_two_valid_obs=1):
        super(AcrobotPartialObservableWrapper, self).__init__(
            env, time_between_two_valid_obs
        )
        high = np.array([1.0, 1.0, 1.0, 1.0, 1.0], dtype=np.float32)
        self.observation_space = gym.spaces.Box(low=-high, high=high, dtype=np.float32)

    def observation(self, observation):
        if self.env.number_of_steps % self.time_between_two_valid_obs != 0:
            return np.zeros(5, dtype=np.float32)
        else:
            return np.array(
                [observation[0], observation[1], observation[2], observation[3], 1.0],
                dtype=np.float32,
            )


class PendulumPartialObservableWrapper(PartialObservableWrapper):
    r"""Observation wrapper that make Pendulum environment partial observable."""

    def __init__(self, env, time_between_two_valid_obs=1):
        super(PendulumPartialObservableWrapper, self).__init__(
            env, time_between_two_valid_obs
        )
        high = np.array([1.0, 1.0, 1.0], dtype=np.float32)
        self.observation_space = gym.spaces.Box(low=-high, high=high, dtype=np.float32)

    def observation(self, observation):
        if self.env.number_of_steps % self.time_between_two_valid_obs != 0:
            return np.zeros(3, dtype=np.float32)
        else:
            return np.array([observation[0], observation[1], 1.0], dtype=np.float32)


class MountainCarPartialObservableWrapper(PartialObservableWrapper):
    r"""Observation wrapper that make MountainCar environment partial observable."""

    def __init__(self, env, time_between_two_valid_obs=1):
        super(MountainCarPartialObservableWrapper, self).__init__(
            env, time_between_two_valid_obs
        )
        high = np.array([self.env.max_position, 1.0], dtype=np.float32)
        self.observation_space = gym.spaces.Box(low=-high, high=high, dtype=np.float32)

    def observation(self, observation):
        if self.env.number_of_steps % self.time_between_two_valid_obs != 0:
            return np.zeros(2, dtype=np.float32)
        else:
            return np.array([observation[0], 1.0], dtype=np.float32)


class PuckWorldPartialObservableWrapper(PartialObservableWrapper):
    r"""Observation wrapper that make PuckWorld environment partial observable."""

    def __init__(self, env):
        super(PuckWorldPartialObservableWrapper, self).__init__(env)
        high = np.array([np.inf, np.inf, 3], dtype=np.float32)
        self.observation_space = gym.spaces.Box(low=-high, high=high, dtype=np.float32)

    def observation(self, observation):
        idx = self.env.number_of_steps % 4
        return np.array(
            [
                observation[2 * idx],
                observation[2 * idx + 1],
                idx,
            ],
            dtype=np.float32,
        )

Classes

class AcrobotPartialObservableWrapper (env, time_between_two_valid_obs=1)

Observation wrapper that make Acrobat environment partial observable.

Wraps an environment to allow a modular transformation of the :meth:step and :meth:reset methods.

Args

env
The environment to wrap
Expand source code
class AcrobotPartialObservableWrapper(PartialObservableWrapper):
    r"""Observation wrapper that make Acrobat environment partial observable."""

    def __init__(self, env, time_between_two_valid_obs=1):
        super(AcrobotPartialObservableWrapper, self).__init__(
            env, time_between_two_valid_obs
        )
        high = np.array([1.0, 1.0, 1.0, 1.0, 1.0], dtype=np.float32)
        self.observation_space = gym.spaces.Box(low=-high, high=high, dtype=np.float32)

    def observation(self, observation):
        if self.env.number_of_steps % self.time_between_two_valid_obs != 0:
            return np.zeros(5, dtype=np.float32)
        else:
            return np.array(
                [observation[0], observation[1], observation[2], observation[3], 1.0],
                dtype=np.float32,
            )

Ancestors

Methods

def observation(self, observation)
Expand source code
def observation(self, observation):
    if self.env.number_of_steps % self.time_between_two_valid_obs != 0:
        return np.zeros(5, dtype=np.float32)
    else:
        return np.array(
            [observation[0], observation[1], observation[2], observation[3], 1.0],
            dtype=np.float32,
        )

Inherited members

class CartPolePartialObservableWrapper (env, time_between_two_valid_obs=1)

Observation wrapper that make CartPole environment partial observable.

Wraps an environment to allow a modular transformation of the :meth:step and :meth:reset methods.

Args

env
The environment to wrap
Expand source code
class CartPolePartialObservableWrapper(PartialObservableWrapper):
    r"""Observation wrapper that make CartPole environment partial observable."""

    def __init__(self, env, time_between_two_valid_obs=1):
        super(CartPolePartialObservableWrapper, self).__init__(
            env, time_between_two_valid_obs
        )
        high = np.array(
            [self.x_threshold * 2, self.theta_threshold_radians * 2, 1.0],
            dtype=np.float32,
        )

        self.observation_space = gym.spaces.Box(-high, high, dtype=np.float32)

    def observation(self, observation):
        if self.env.number_of_steps % self.time_between_two_valid_obs != 0:
            return np.zeros(3, dtype=np.float32)
        else:
            return np.array([observation[0], observation[2], 1.0], dtype=np.float32)

Ancestors

Methods

def observation(self, observation)
Expand source code
def observation(self, observation):
    if self.env.number_of_steps % self.time_between_two_valid_obs != 0:
        return np.zeros(3, dtype=np.float32)
    else:
        return np.array([observation[0], observation[2], 1.0], dtype=np.float32)

Inherited members

class MountainCarPartialObservableWrapper (env, time_between_two_valid_obs=1)

Observation wrapper that make MountainCar environment partial observable.

Wraps an environment to allow a modular transformation of the :meth:step and :meth:reset methods.

Args

env
The environment to wrap
Expand source code
class MountainCarPartialObservableWrapper(PartialObservableWrapper):
    r"""Observation wrapper that make MountainCar environment partial observable."""

    def __init__(self, env, time_between_two_valid_obs=1):
        super(MountainCarPartialObservableWrapper, self).__init__(
            env, time_between_two_valid_obs
        )
        high = np.array([self.env.max_position, 1.0], dtype=np.float32)
        self.observation_space = gym.spaces.Box(low=-high, high=high, dtype=np.float32)

    def observation(self, observation):
        if self.env.number_of_steps % self.time_between_two_valid_obs != 0:
            return np.zeros(2, dtype=np.float32)
        else:
            return np.array([observation[0], 1.0], dtype=np.float32)

Ancestors

Methods

def observation(self, observation)
Expand source code
def observation(self, observation):
    if self.env.number_of_steps % self.time_between_two_valid_obs != 0:
        return np.zeros(2, dtype=np.float32)
    else:
        return np.array([observation[0], 1.0], dtype=np.float32)

Inherited members

class PartialObservableWrapper (env, time_between_two_valid_obs=1)

Wraps a :class:gymnasium.Env to allow a modular transformation of the :meth:step and :meth:reset methods.

This class is the base class of all wrappers to change the behavior of the underlying environment. Wrappers that inherit from this class can modify the :attr:action_space, :attr:observation_space, :attr:reward_range and :attr:metadata attributes, without changing the underlying environment's attributes. Moreover, the behavior of the :meth:step and :meth:reset methods can be changed by these wrappers.

Some attributes (:attr:spec, :attr:render_mode, :attr:np_random) will point back to the wrapper's environment (i.e. to the corresponding attributes of :attr:env).

Note

If you inherit from :class:Wrapper, don't forget to call super().__init__(env)

Wraps an environment to allow a modular transformation of the :meth:step and :meth:reset methods.

Args

env
The environment to wrap
Expand source code
class PartialObservableWrapper(gym.Wrapper):
    def __init__(self, env, time_between_two_valid_obs=1):
        super(PartialObservableWrapper, self).__init__(env)
        self.env.number_of_steps = 0
        self.time_between_two_valid_obs = time_between_two_valid_obs

    def reset(self, **kwargs):
        observation, info = self.env.reset(**kwargs)
        return self.observation(observation), info

    def step(self, action):
        observation, reward, done, truncated, info = self.env.step(action)
        self.env.number_of_steps += 1
        return self.observation(observation), reward, done, truncated, info

    @abstractmethod
    def observation(self, observation):
        raise NotImplementedError

Ancestors

  • gymnasium.core.Wrapper
  • gymnasium.core.Env
  • typing.Generic

Subclasses

Methods

def observation(self, observation)
Expand source code
@abstractmethod
def observation(self, observation):
    raise NotImplementedError
def reset(self, **kwargs)

Uses the :meth:reset of the :attr:env that can be overwritten to change the returned data.

Expand source code
def reset(self, **kwargs):
    observation, info = self.env.reset(**kwargs)
    return self.observation(observation), info
def step(self, action)

Uses the :meth:step of the :attr:env that can be overwritten to change the returned data.

Expand source code
def step(self, action):
    observation, reward, done, truncated, info = self.env.step(action)
    self.env.number_of_steps += 1
    return self.observation(observation), reward, done, truncated, info
class PendulumPartialObservableWrapper (env, time_between_two_valid_obs=1)

Observation wrapper that make Pendulum environment partial observable.

Wraps an environment to allow a modular transformation of the :meth:step and :meth:reset methods.

Args

env
The environment to wrap
Expand source code
class PendulumPartialObservableWrapper(PartialObservableWrapper):
    r"""Observation wrapper that make Pendulum environment partial observable."""

    def __init__(self, env, time_between_two_valid_obs=1):
        super(PendulumPartialObservableWrapper, self).__init__(
            env, time_between_two_valid_obs
        )
        high = np.array([1.0, 1.0, 1.0], dtype=np.float32)
        self.observation_space = gym.spaces.Box(low=-high, high=high, dtype=np.float32)

    def observation(self, observation):
        if self.env.number_of_steps % self.time_between_two_valid_obs != 0:
            return np.zeros(3, dtype=np.float32)
        else:
            return np.array([observation[0], observation[1], 1.0], dtype=np.float32)

Ancestors

Methods

def observation(self, observation)
Expand source code
def observation(self, observation):
    if self.env.number_of_steps % self.time_between_two_valid_obs != 0:
        return np.zeros(3, dtype=np.float32)
    else:
        return np.array([observation[0], observation[1], 1.0], dtype=np.float32)

Inherited members

class PuckWorldPartialObservableWrapper (env)

Observation wrapper that make PuckWorld environment partial observable.

Wraps an environment to allow a modular transformation of the :meth:step and :meth:reset methods.

Args

env
The environment to wrap
Expand source code
class PuckWorldPartialObservableWrapper(PartialObservableWrapper):
    r"""Observation wrapper that make PuckWorld environment partial observable."""

    def __init__(self, env):
        super(PuckWorldPartialObservableWrapper, self).__init__(env)
        high = np.array([np.inf, np.inf, 3], dtype=np.float32)
        self.observation_space = gym.spaces.Box(low=-high, high=high, dtype=np.float32)

    def observation(self, observation):
        idx = self.env.number_of_steps % 4
        return np.array(
            [
                observation[2 * idx],
                observation[2 * idx + 1],
                idx,
            ],
            dtype=np.float32,
        )

Ancestors

Methods

def observation(self, observation)
Expand source code
def observation(self, observation):
    idx = self.env.number_of_steps % 4
    return np.array(
        [
            observation[2 * idx],
            observation[2 * idx + 1],
            idx,
        ],
        dtype=np.float32,
    )

Inherited members