Module pearl.utils.instantiations.environments.sparse_reward_environment

This file contains environment to simulate sparse rewards Also contains history summarization module that needs to be used together when defining PearlAgent

Set up is following: 2d box environment, where the agent gets initialized in a center of a square arena, and there is a target - 2d point, randomly generated in the arena. The agent gets reward 0 only when it gets close enough to the target, otherwise the reward is -1.

There are 2 versions in this file: - one for discrete action space - one for contineous action space

Expand source code
This file contains environment to simulate sparse rewards
Also contains history summarization module that needs to be used together
when defining PearlAgent

Set up is following:
2d box environment, where the agent gets initialized in a center of a square arena,
and there is a target - 2d point, randomly generated in the arena.
The agent gets reward 0 only when it gets close enough to the target, otherwise the reward is -1.

There are 2 versions in this file:
- one for discrete action space
- one for contineous action space
import math
import random
from abc import abstractmethod
from typing import List, Optional, Tuple

import torch

from pearl.api.action import Action
from pearl.api.action_result import ActionResult
from pearl.api.action_space import ActionSpace

from pearl.api.environment import Environment
from pearl.utils.instantiations.spaces.discrete_action import DiscreteActionSpace

class SparseRewardEnvironment(Environment):
    def __init__(
        length: float,
        height: float,
        max_episode_duration: int = 500,
        reward_distance: float = 1,
    ) -> None:
        self._length = length
        self._height = height
        self._max_episode_duration = max_episode_duration
        # reset will initialize following
        self._agent_position: Optional[Tuple[float, float]] = None
        self._goal: Optional[Tuple[float, float]] = None
        self._step_count = 0
        self._reward_distance = reward_distance

    def step(self, action: Action) -> ActionResult:

    def reset(self, seed: Optional[int] = None) -> Tuple[torch.Tensor, ActionSpace]:

        # reset (x, y)
        self._agent_position = (self._length / 2, self._height / 2)
        self._goal = (random.uniform(0, self._length), random.uniform(0, self._height))
        self._step_count = 0
        assert self._agent_position is not None
        assert (goal := self._goal) is not None
        return (
            torch.tensor(list(self._agent_position) + list(goal)),

    def _update_position(self, delta: Tuple[float, float]) -> None:
        This API is to update and clip and ensure agent always stay in map
        delta_x, delta_y = delta
        assert self._agent_position is not None
        x, y = self._agent_position
        self._agent_position = (
            max(min(x + delta_x, self._length), 0),
            max(min(y + delta_y, self._height), 0),

    def _check_win(self) -> bool:
            True if reached goal
            False if not reached goal
        assert self._agent_position is not None
        assert self._goal is not None
        if math.dist(self._agent_position, self._goal) < self._reward_distance:
            return True
        return False

class ContinuousSparseRewardEnvironment(SparseRewardEnvironment):
    Given action vector (x, y)
    agent position is updated accordingly

    def step(self, action: Action) -> ActionResult:
        assert isinstance(action, torch.Tensor)
        self._update_position((action[0].item(), action[1].item()))

        has_win = self._check_win()
        self._step_count += 1
        terminated = has_win or self._step_count >= self._max_episode_duration
        assert self._agent_position is not None
        assert (goal := self._goal) is not None
        return ActionResult(
            observation=torch.tensor(list(self._agent_position) + list(goal)),
            reward=0 if has_win else -1,

    def action_space(self) -> ActionSpace:
        # pyre-fixme[7]: Expected `ActionSpace` but got `None`.
        # FIXME: does this really do not have an action space?
        return None

class DiscreteSparseRewardEnvironment(ContinuousSparseRewardEnvironment):
    Given action count N, action index will be 0,...,N-1
    For action n, position will be changed by:
    x +=  cos(360/N * n) * step_size
    y +=  sin(360/N * n) * step_size

    # FIXME: This environment mixes the concepts of action index and action feature.
    def __init__(
        length: float,
        height: float,
        step_size: float = 0.01,
        action_count: int = 4,
        max_episode_duration: int = 500,
        reward_distance: Optional[float] = None,
    ) -> None:
        super(DiscreteSparseRewardEnvironment, self).__init__(
            reward_distance if reward_distance is not None else step_size,
        self._step_size = step_size
        self._action_count = action_count
        self._actions: List[torch.Tensor] = [
                    math.cos(2 * math.pi / self._action_count * i),
                    math.sin(2 * math.pi / self._action_count * i),
            * self._step_size
            for i in range(action_count)

    def step(self, action: Action) -> ActionResult:
        assert action < self._action_count and action >= 0
        return super(DiscreteSparseRewardEnvironment, self).step(

    def action_space(self) -> DiscreteActionSpace:
        return DiscreteActionSpace(
            actions=[torch.tensor([i]) for i in range(self._action_count)]


class ContinuousSparseRewardEnvironment (length: float, height: float, max_episode_duration: int = 500, reward_distance: float = 1)

Given action vector (x, y) agent position is updated accordingly

Expand source code
class ContinuousSparseRewardEnvironment(SparseRewardEnvironment):
    Given action vector (x, y)
    agent position is updated accordingly

    def step(self, action: Action) -> ActionResult:
        assert isinstance(action, torch.Tensor)
        self._update_position((action[0].item(), action[1].item()))

        has_win = self._check_win()
        self._step_count += 1
        terminated = has_win or self._step_count >= self._max_episode_duration
        assert self._agent_position is not None
        assert (goal := self._goal) is not None
        return ActionResult(
            observation=torch.tensor(list(self._agent_position) + list(goal)),
            reward=0 if has_win else -1,

    def action_space(self) -> ActionSpace:
        # pyre-fixme[7]: Expected `ActionSpace` but got `None`.
        # FIXME: does this really do not have an action space?
        return None



Inherited members

class DiscreteSparseRewardEnvironment (length: float, height: float, step_size: float = 0.01, action_count: int = 4, max_episode_duration: int = 500, reward_distance: Optional[float] = None)

Given action count N, action index will be 0,…,N-1 For action n, position will be changed by: x += cos(360/N * n) * step_size y += sin(360/N * n) * step_size

Expand source code
class DiscreteSparseRewardEnvironment(ContinuousSparseRewardEnvironment):
    Given action count N, action index will be 0,...,N-1
    For action n, position will be changed by:
    x +=  cos(360/N * n) * step_size
    y +=  sin(360/N * n) * step_size

    # FIXME: This environment mixes the concepts of action index and action feature.
    def __init__(
        length: float,
        height: float,
        step_size: float = 0.01,
        action_count: int = 4,
        max_episode_duration: int = 500,
        reward_distance: Optional[float] = None,
    ) -> None:
        super(DiscreteSparseRewardEnvironment, self).__init__(
            reward_distance if reward_distance is not None else step_size,
        self._step_size = step_size
        self._action_count = action_count
        self._actions: List[torch.Tensor] = [
                    math.cos(2 * math.pi / self._action_count * i),
                    math.sin(2 * math.pi / self._action_count * i),
            * self._step_size
            for i in range(action_count)

    def step(self, action: Action) -> ActionResult:
        assert action < self._action_count and action >= 0
        return super(DiscreteSparseRewardEnvironment, self).step(

    def action_space(self) -> DiscreteActionSpace:
        return DiscreteActionSpace(
            actions=[torch.tensor([i]) for i in range(self._action_count)]


Inherited members

class SparseRewardEnvironment (length: float, height: float, max_episode_duration: int = 500, reward_distance: float = 1)

An abstract interface for environments. An Environment is an object that the agent interacts with and provides the agent with observations in the form of an ActionResult object. This interface follows the design of environments in Gymnasium.

Expand source code
class SparseRewardEnvironment(Environment):
    def __init__(
        length: float,
        height: float,
        max_episode_duration: int = 500,
        reward_distance: float = 1,
    ) -> None:
        self._length = length
        self._height = height
        self._max_episode_duration = max_episode_duration
        # reset will initialize following
        self._agent_position: Optional[Tuple[float, float]] = None
        self._goal: Optional[Tuple[float, float]] = None
        self._step_count = 0
        self._reward_distance = reward_distance

    def step(self, action: Action) -> ActionResult:

    def reset(self, seed: Optional[int] = None) -> Tuple[torch.Tensor, ActionSpace]:

        # reset (x, y)
        self._agent_position = (self._length / 2, self._height / 2)
        self._goal = (random.uniform(0, self._length), random.uniform(0, self._height))
        self._step_count = 0
        assert self._agent_position is not None
        assert (goal := self._goal) is not None
        return (
            torch.tensor(list(self._agent_position) + list(goal)),

    def _update_position(self, delta: Tuple[float, float]) -> None:
        This API is to update and clip and ensure agent always stay in map
        delta_x, delta_y = delta
        assert self._agent_position is not None
        x, y = self._agent_position
        self._agent_position = (
            max(min(x + delta_x, self._length), 0),
            max(min(y + delta_y, self._height), 0),

    def _check_win(self) -> bool:
            True if reached goal
            False if not reached goal
        assert self._agent_position is not None
        assert self._goal is not None
        if math.dist(self._agent_position, self._goal) < self._reward_distance:
            return True
        return False



Inherited members