Module pearl.utils.instantiations.environments.sparse_reward_environment
This file contains environment to simulate sparse rewards Also contains history summarization module that needs to be used together when defining PearlAgent
Set up is following: 2d box environment, where the agent gets initialized in a center of a square arena, and there is a target - 2d point, randomly generated in the arena. The agent gets reward 0 only when it gets close enough to the target, otherwise the reward is -1.
There are 2 versions in this file: - one for discrete action space - one for contineous action space
Expand source code
"""
This file contains environment to simulate sparse rewards
Also contains history summarization module that needs to be used together
when defining PearlAgent
Set up is following:
2d box environment, where the agent gets initialized in a center of a square arena,
and there is a target - 2d point, randomly generated in the arena.
The agent gets reward 0 only when it gets close enough to the target, otherwise the reward is -1.
There are 2 versions in this file:
- one for discrete action space
- one for contineous action space
"""
import math
import random
from abc import abstractmethod
from typing import List, Optional, Tuple
import torch
from pearl.api.action import Action
from pearl.api.action_result import ActionResult
from pearl.api.action_space import ActionSpace
from pearl.api.environment import Environment
from pearl.utils.instantiations.spaces.discrete_action import DiscreteActionSpace
class SparseRewardEnvironment(Environment):
def __init__(
self,
length: float,
height: float,
max_episode_duration: int = 500,
reward_distance: float = 1,
) -> None:
self._length = length
self._height = height
self._max_episode_duration = max_episode_duration
# reset will initialize following
self._agent_position: Optional[Tuple[float, float]] = None
self._goal: Optional[Tuple[float, float]] = None
self._step_count = 0
self._reward_distance = reward_distance
@abstractmethod
def step(self, action: Action) -> ActionResult:
pass
def reset(self, seed: Optional[int] = None) -> Tuple[torch.Tensor, ActionSpace]:
# reset (x, y)
self._agent_position = (self._length / 2, self._height / 2)
self._goal = (random.uniform(0, self._length), random.uniform(0, self._height))
self._step_count = 0
assert self._agent_position is not None
assert (goal := self._goal) is not None
return (
torch.tensor(list(self._agent_position) + list(goal)),
self.action_space,
)
def _update_position(self, delta: Tuple[float, float]) -> None:
"""
This API is to update and clip and ensure agent always stay in map
"""
delta_x, delta_y = delta
assert self._agent_position is not None
x, y = self._agent_position
self._agent_position = (
max(min(x + delta_x, self._length), 0),
max(min(y + delta_y, self._height), 0),
)
def _check_win(self) -> bool:
"""
Return:
True if reached goal
False if not reached goal
"""
assert self._agent_position is not None
assert self._goal is not None
if math.dist(self._agent_position, self._goal) < self._reward_distance:
return True
return False
class ContinuousSparseRewardEnvironment(SparseRewardEnvironment):
"""
Given action vector (x, y)
agent position is updated accordingly
"""
def step(self, action: Action) -> ActionResult:
assert isinstance(action, torch.Tensor)
self._update_position((action[0].item(), action[1].item()))
has_win = self._check_win()
self._step_count += 1
terminated = has_win or self._step_count >= self._max_episode_duration
assert self._agent_position is not None
assert (goal := self._goal) is not None
return ActionResult(
observation=torch.tensor(list(self._agent_position) + list(goal)),
reward=0 if has_win else -1,
terminated=terminated,
truncated=False,
)
@property
def action_space(self) -> ActionSpace:
# pyre-fixme[7]: Expected `ActionSpace` but got `None`.
# FIXME: does this really do not have an action space?
return None
class DiscreteSparseRewardEnvironment(ContinuousSparseRewardEnvironment):
"""
Given action count N, action index will be 0,...,N-1
For action n, position will be changed by:
x += cos(360/N * n) * step_size
y += sin(360/N * n) * step_size
"""
# FIXME: This environment mixes the concepts of action index and action feature.
def __init__(
self,
length: float,
height: float,
step_size: float = 0.01,
action_count: int = 4,
max_episode_duration: int = 500,
reward_distance: Optional[float] = None,
) -> None:
super(DiscreteSparseRewardEnvironment, self).__init__(
length,
height,
max_episode_duration,
reward_distance if reward_distance is not None else step_size,
)
self._step_size = step_size
self._action_count = action_count
self._actions: List[torch.Tensor] = [
torch.tensor(
[
math.cos(2 * math.pi / self._action_count * i),
math.sin(2 * math.pi / self._action_count * i),
]
)
* self._step_size
for i in range(action_count)
]
def step(self, action: Action) -> ActionResult:
assert action < self._action_count and action >= 0
return super(DiscreteSparseRewardEnvironment, self).step(
self._actions[int(action.item())]
)
@property
def action_space(self) -> DiscreteActionSpace:
return DiscreteActionSpace(
actions=[torch.tensor([i]) for i in range(self._action_count)]
)
Classes
class ContinuousSparseRewardEnvironment (length: float, height: float, max_episode_duration: int = 500, reward_distance: float = 1)
-
Given action vector (x, y) agent position is updated accordingly
Expand source code
class ContinuousSparseRewardEnvironment(SparseRewardEnvironment): """ Given action vector (x, y) agent position is updated accordingly """ def step(self, action: Action) -> ActionResult: assert isinstance(action, torch.Tensor) self._update_position((action[0].item(), action[1].item())) has_win = self._check_win() self._step_count += 1 terminated = has_win or self._step_count >= self._max_episode_duration assert self._agent_position is not None assert (goal := self._goal) is not None return ActionResult( observation=torch.tensor(list(self._agent_position) + list(goal)), reward=0 if has_win else -1, terminated=terminated, truncated=False, ) @property def action_space(self) -> ActionSpace: # pyre-fixme[7]: Expected `ActionSpace` but got `None`. # FIXME: does this really do not have an action space? return None
Ancestors
- SparseRewardEnvironment
- Environment
- abc.ABC
Subclasses
Inherited members
class DiscreteSparseRewardEnvironment (length: float, height: float, step_size: float = 0.01, action_count: int = 4, max_episode_duration: int = 500, reward_distance: Optional[float] = None)
-
Given action count N, action index will be 0,…,N-1 For action n, position will be changed by: x += cos(360/N * n) * step_size y += sin(360/N * n) * step_size
Expand source code
class DiscreteSparseRewardEnvironment(ContinuousSparseRewardEnvironment): """ Given action count N, action index will be 0,...,N-1 For action n, position will be changed by: x += cos(360/N * n) * step_size y += sin(360/N * n) * step_size """ # FIXME: This environment mixes the concepts of action index and action feature. def __init__( self, length: float, height: float, step_size: float = 0.01, action_count: int = 4, max_episode_duration: int = 500, reward_distance: Optional[float] = None, ) -> None: super(DiscreteSparseRewardEnvironment, self).__init__( length, height, max_episode_duration, reward_distance if reward_distance is not None else step_size, ) self._step_size = step_size self._action_count = action_count self._actions: List[torch.Tensor] = [ torch.tensor( [ math.cos(2 * math.pi / self._action_count * i), math.sin(2 * math.pi / self._action_count * i), ] ) * self._step_size for i in range(action_count) ] def step(self, action: Action) -> ActionResult: assert action < self._action_count and action >= 0 return super(DiscreteSparseRewardEnvironment, self).step( self._actions[int(action.item())] ) @property def action_space(self) -> DiscreteActionSpace: return DiscreteActionSpace( actions=[torch.tensor([i]) for i in range(self._action_count)] )
Ancestors
Inherited members
class SparseRewardEnvironment (length: float, height: float, max_episode_duration: int = 500, reward_distance: float = 1)
-
An abstract interface for environments. An
Environment
is an object that the agent interacts with and provides the agent with observations in the form of anActionResult
object. This interface follows the design of environments in Gymnasium.Expand source code
class SparseRewardEnvironment(Environment): def __init__( self, length: float, height: float, max_episode_duration: int = 500, reward_distance: float = 1, ) -> None: self._length = length self._height = height self._max_episode_duration = max_episode_duration # reset will initialize following self._agent_position: Optional[Tuple[float, float]] = None self._goal: Optional[Tuple[float, float]] = None self._step_count = 0 self._reward_distance = reward_distance @abstractmethod def step(self, action: Action) -> ActionResult: pass def reset(self, seed: Optional[int] = None) -> Tuple[torch.Tensor, ActionSpace]: # reset (x, y) self._agent_position = (self._length / 2, self._height / 2) self._goal = (random.uniform(0, self._length), random.uniform(0, self._height)) self._step_count = 0 assert self._agent_position is not None assert (goal := self._goal) is not None return ( torch.tensor(list(self._agent_position) + list(goal)), self.action_space, ) def _update_position(self, delta: Tuple[float, float]) -> None: """ This API is to update and clip and ensure agent always stay in map """ delta_x, delta_y = delta assert self._agent_position is not None x, y = self._agent_position self._agent_position = ( max(min(x + delta_x, self._length), 0), max(min(y + delta_y, self._height), 0), ) def _check_win(self) -> bool: """ Return: True if reached goal False if not reached goal """ assert self._agent_position is not None assert self._goal is not None if math.dist(self._agent_position, self._goal) < self._reward_distance: return True return False
Ancestors
- Environment
- abc.ABC
Subclasses
Inherited members