Module pearl.replay_buffers.sequential_decision_making.on_policy_episodic_replay_buffer

Expand source code
from typing import List, Optional

from pearl.api.action import Action
from pearl.api.action_space import ActionSpace
from pearl.api.reward import Reward
from pearl.api.state import SubjectiveState
from pearl.replay_buffers.tensor_based_replay_buffer import TensorBasedReplayBuffer
from pearl.replay_buffers.transition import Transition


class OnPolicyEpisodicReplayBuffer(TensorBasedReplayBuffer):
    def __init__(self, capacity: int, discounted_factor: float = 1.0) -> None:
        super(OnPolicyEpisodicReplayBuffer, self).__init__(
            capacity=capacity,
            has_next_state=False,
            has_next_action=False,
            has_next_available_actions=False,
        )
        # this is used to delay push SARS
        # wait for next action is available and then final push
        # this is designed for single transition for now
        self.state_action_cache: List[Transition] = []
        self._discounted_factor = discounted_factor

    def push(
        self,
        state: SubjectiveState,
        action: Action,
        reward: Reward,
        next_state: Optional[SubjectiveState],
        curr_available_actions: ActionSpace,
        next_available_actions: ActionSpace,
        done: bool,
        max_number_actions: Optional[int] = None,
        cost: Optional[float] = None,
    ) -> None:
        (
            curr_available_actions_tensor_with_padding,
            curr_unavailable_actions_mask,
        ) = self._create_action_tensor_and_mask(
            max_number_actions, curr_available_actions
        )

        current_state = self._process_single_state(state)
        current_action = self._process_single_action(action)
        next_reward = self._process_single_reward(reward)
        self.state_action_cache.append(
            Transition(
                state=current_state,
                action=current_action,
                reward=next_reward,
                cum_reward=None,
                next_state=None,
                curr_available_actions=curr_available_actions_tensor_with_padding,
                curr_unavailable_actions_mask=curr_unavailable_actions_mask,
                next_available_actions=None,
                next_unavailable_actions_mask=None,
                done=self._process_single_done(done),
            ).to(self.device)
        )

        if done:
            # discounted_return at time i = sum of (self._discounted_factor^(j-i) * Rj) j is [i, T]
            discounted_return = 0.0
            for i in range(len(self.state_action_cache) - 1, -1, -1):
                cum_reward = self.state_action_cache[i].reward + discounted_return
                self.state_action_cache[i].cum_reward = self._process_single_reward(
                    cum_reward
                )
                self.memory.append(self.state_action_cache[i])
                discounted_return = self._discounted_factor * cum_reward

            self.state_action_cache = []

Classes

class OnPolicyEpisodicReplayBuffer (capacity: int, discounted_factor: float = 1.0)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class OnPolicyEpisodicReplayBuffer(TensorBasedReplayBuffer):
    def __init__(self, capacity: int, discounted_factor: float = 1.0) -> None:
        super(OnPolicyEpisodicReplayBuffer, self).__init__(
            capacity=capacity,
            has_next_state=False,
            has_next_action=False,
            has_next_available_actions=False,
        )
        # this is used to delay push SARS
        # wait for next action is available and then final push
        # this is designed for single transition for now
        self.state_action_cache: List[Transition] = []
        self._discounted_factor = discounted_factor

    def push(
        self,
        state: SubjectiveState,
        action: Action,
        reward: Reward,
        next_state: Optional[SubjectiveState],
        curr_available_actions: ActionSpace,
        next_available_actions: ActionSpace,
        done: bool,
        max_number_actions: Optional[int] = None,
        cost: Optional[float] = None,
    ) -> None:
        (
            curr_available_actions_tensor_with_padding,
            curr_unavailable_actions_mask,
        ) = self._create_action_tensor_and_mask(
            max_number_actions, curr_available_actions
        )

        current_state = self._process_single_state(state)
        current_action = self._process_single_action(action)
        next_reward = self._process_single_reward(reward)
        self.state_action_cache.append(
            Transition(
                state=current_state,
                action=current_action,
                reward=next_reward,
                cum_reward=None,
                next_state=None,
                curr_available_actions=curr_available_actions_tensor_with_padding,
                curr_unavailable_actions_mask=curr_unavailable_actions_mask,
                next_available_actions=None,
                next_unavailable_actions_mask=None,
                done=self._process_single_done(done),
            ).to(self.device)
        )

        if done:
            # discounted_return at time i = sum of (self._discounted_factor^(j-i) * Rj) j is [i, T]
            discounted_return = 0.0
            for i in range(len(self.state_action_cache) - 1, -1, -1):
                cum_reward = self.state_action_cache[i].reward + discounted_return
                self.state_action_cache[i].cum_reward = self._process_single_reward(
                    cum_reward
                )
                self.memory.append(self.state_action_cache[i])
                discounted_return = self._discounted_factor * cum_reward

            self.state_action_cache = []

Ancestors

Inherited members