Module pearl.utils.scripts.benchmark_parallelization

The code in this file provides a way to run multiple pearl experiments in different processes using torch.multiprocessing. Outputs of the code are saved in the folder ~/pearl_execution/outputs/. To run the code, enter the pearl directory, then run ./utils/scripts/meta_only/run_pearl.sh utils/scripts/benchmark_parallelization.py (make sure conda and related packages have been installed with ./utils/scripts/meta_only/setup_conda_pearl_on_devserver.sh)

Expand source code
#!/usr/bin/env fbpython
# (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.
# pyre-ignore-all-errors

"""
The code in this file provides a way to run multiple pearl experiments in different
processes using torch.multiprocessing.
Outputs of the code are saved in the folder ~/pearl_execution/outputs/.
To run the code, enter the pearl directory, then run
./utils/scripts/meta_only/run_pearl.sh utils/scripts/benchmark_parallelization.py
(make sure conda and related packages have been installed with
./utils/scripts/meta_only/setup_conda_pearl_on_devserver.sh)
"""
import os
import warnings
from typing import List

import matplotlib.pyplot as plt
import numpy as np
import torch.multiprocessing as mp
from pearl.pearl_agent import PearlAgent
from pearl.utils.functional_utils.experimentation.set_seed import set_seed

from pearl.utils.functional_utils.train_and_eval.online_learning import online_learning
from pearl.utils.scripts.benchmark_config import (
    benchmark_acrobot_v1_part_1,
    benchmark_acrobot_v1_part_2,
    benchmark_ant_v4,
    benchmark_cartpole_v1_part_1,
    benchmark_cartpole_v1_part_2,
    benchmark_halfcheetah_v4,
    benchmark_hopper_v4,
    benchmark_walker2d_v4,
    generate_rctd3_ant,
    generate_rctd3_half_cheetah_v1,
    generate_rctd3_hopper,
    generate_rctd3_walker,
    get_env,
    num_runs,
    print_every_x_steps,
    rctd3_ant_part_1,
    rctd3_ant_part_2,
    rctd3_ant_part_3,
    rctd3_ant_part_4,
    rctd3_half_cheetah_v1_part_1,
    rctd3_half_cheetah_v1_part_2,
    rctd3_hopper_part_1,
    rctd3_hopper_part_2,
    rctd3_walker_part_1,
    rctd3_walker_part_2,
    test_dynamic_action_space,
)

warnings.filterwarnings("ignore")


def run(experiments) -> None:
    """Assign one run to one process."""
    assert len(experiments) > 0
    all_processes = []

    for e in experiments:
        evaluate(e, all_processes)

    for p in all_processes:
        p.start()
    for p in all_processes:
        p.join()


def evaluate(experiment, all_processes: List[mp.Process]) -> None:
    """Running multiple methods and multiple runs in the given gym environment."""
    env_name = experiment["env_name"]
    num_runs = experiment["num_runs"]
    num_episodes = experiment.get("num_episodes")
    num_steps = experiment.get("num_steps")
    record_period = experiment["record_period"]
    print_every_x_episodes = experiment.get("print_every_x_episodes")
    print_every_x_steps = experiment.get("print_every_x_steps")
    methods = experiment["methods"]
    processes = []
    for method in methods:
        method["agent_args"] = {"device_id": experiment["device_id"]}
        for run_idx in range(num_runs):
            p = mp.Process(
                target=evaluate_single,
                args=(
                    env_name,
                    method,
                    run_idx,
                    num_episodes,
                    num_steps,
                    print_every_x_episodes,
                    print_every_x_steps,
                    record_period,
                ),
            )
            processes.append(p)

    all_processes.extend(processes)


def evaluate_single(
    env_name,
    method,
    run_idx,
    num_episodes,
    num_steps,
    print_every_x_episodes,
    print_every_x_steps,
    record_period,
):
    """Performing one run of experiment."""
    set_seed(run_idx)
    policy_learner = method["policy_learner"]
    policy_learner_args = method["policy_learner_args"]
    agent_args = method["agent_args"]
    env = get_env(env_name)
    policy_learner_args["state_dim"] = env.observation_space.shape[0]

    if "exploration_module" in method and "exploration_module_args" in method:
        policy_learner_args["exploration_module"] = method["exploration_module"](
            **method["exploration_module_args"]
        )
    if "replay_buffer" in method and "replay_buffer_args" in method:
        agent_args["replay_buffer"] = method["replay_buffer"](
            **method["replay_buffer_args"]
        )
    if "safety_module" in method and "safety_module_args" in method:
        agent_args["safety_module"] = method["safety_module"](
            **method["safety_module_args"]
        )
    if (
        "action_representation_module" in method
        and "action_representation_module_args" in method
    ):
        if (
            method["action_representation_module"].__name__
            == "OneHotActionTensorRepresentationModule"
        ):
            method["action_representation_module_args"][
                "max_number_actions"
            ] = env.action_space.n
        policy_learner_args["action_representation_module"] = method[
            "action_representation_module"
        ](**method["action_representation_module_args"])

    if (
        "history_summarization_module" in method
        and "history_summarization_module_args" in method
    ):
        if (
            method["history_summarization_module"].__name__
            == "StackHistorySummarizationModule"
        ):
            policy_learner_args["state_dim"] = (
                env.observation_space.shape[0] + env.action_space.n
            ) * method["history_summarization_module_args"]["history_length"]
        elif (
            method["history_summarization_module"].__name__
            == "LSTMHistorySummarizationModule"
        ):
            method["history_summarization_module_args"][
                "observation_dim"
            ] = env.observation_space.shape[0]
            method["history_summarization_module_args"][
                "action_dim"
            ] = env.action_space.n
            policy_learner_args["state_dim"] = method[
                "history_summarization_module_args"
            ]["hidden_dim"]

        agent_args["history_summarization_module"] = method[
            "history_summarization_module"
        ](**method["history_summarization_module_args"])

    if method["name"] == "DuelingDQN":  # only for Dueling DQN
        assert "network_module" in method and "network_args" in method
        policy_learner_args["network_instance"] = method["network_module"](
            state_dim=env.observation_space.shape[0],
            action_dim=env.action_space.n,
            **method["network_args"],
        )
    if method["name"] == "BootstrappedDQN":  # only for Bootstrapped DQN
        assert "network_module" in method and "network_args" in method
        policy_learner_args["q_ensemble_network"] = method["network_module"](
            state_dim=env.observation_space.shape[0],
            action_dim=env.action_space.n,
            **method["network_args"],
        )
        del policy_learner_args["state_dim"]

    if "dynamic" in method["name"]:
        policy_learner_args["actor_network_type"] = method["actor_network_type"]

    policy_learner_args["action_space"] = env.action_space
    agent = PearlAgent(
        policy_learner=policy_learner(
            **policy_learner_args,
        ),
        **agent_args,
    )
    method_name = method["name"]
    print(f"Run #{run_idx + 1} for {method_name} in {env_name}")
    if (
        method["name"] == "REINFORCE" or method["name"] == "PPO"
    ):  # REINFORCE only performs learning at the end of each episode
        learn_after_episode = True
    else:
        learn_after_episode = False

    info = online_learning(
        agent,
        env,
        number_of_episodes=num_episodes,
        number_of_steps=num_steps,
        print_every_x_episodes=print_every_x_episodes,
        print_every_x_steps=print_every_x_steps,
        learn_after_episode=learn_after_episode,
        seed=run_idx,
        record_period=record_period,
    )
    dir = f"outputs/{env_name}/{method_name}"
    os.makedirs(dir, exist_ok=True)
    for key in info:
        np.save(dir + f"/{run_idx}_{key}.npy", info[key])


def generate_plots(experiments, attributes) -> None:
    for e in experiments:
        generate_one_plot(e, attributes)


def generate_one_plot(experiment, attributes):
    """Generating learning curves for all tested methods in one environment."""
    env_name = experiment["env_name"]
    exp_name = experiment["exp_name"]
    num_runs = experiment["num_runs"]
    record_period = experiment["record_period"]
    methods = experiment["methods"]
    for attr in attributes:
        for method in methods:
            data = []
            for run in range(num_runs):
                try:
                    d = np.load(f"outputs/{env_name}/{method['name']}/{run}_{attr}.npy")
                except FileNotFoundError:
                    print(
                        f"File not found for outputs/{env_name}/{method['name']}/{run}_{attr}.npy"
                    )
                    continue
                data.append(d)
            data = np.array(data)
            mean = data.mean(axis=0)
            std_error = data.std(axis=0) / np.sqrt(num_runs)
            x_list = record_period * np.arange(mean.shape[0])
            if "num_steps" in experiment:
                plt.plot(x_list, mean, label=method["name"])
                plt.fill_between(x_list, mean - std_error, mean + std_error, alpha=0.2)
            else:
                plt.plot(x_list, mean, label=method["name"])
                plt.fill_between(
                    x_list,
                    mean - std_error,
                    mean + std_error,
                    alpha=0.2,
                )
        plt.title(env_name)
        if "num_steps" in experiment:
            plt.xlabel("Steps")
        else:
            plt.xlabel("Episodes")
        plt.ylabel(attr)
        plt.legend()
        plt.savefig(f"outputs/{exp_name}_{env_name}_{attr}.png")
        plt.close()


if __name__ == "__main__":
    # run(benchmark_cartpole_v1_part_1)
    # generate_plots(benchmark_cartpole_v1_part_1, ["return"])
    # run(benchmark_cartpole_v1_part_2)
    # generate_plots(benchmark_cartpole_v1_part_2, ["return"])
    # run(benchmark_acrobot_v1_part_1)
    # generate_plots(benchmark_acrobot_v1_part_1, ["return"])
    # run(benchmark_acrobot_v1_part_2)
    # generate_plots(benchmark_acrobot_v1_part_2, ["return"])
    # run(benchmark_halfcheetah_v4)
    # generate_plots(benchmark_halfcheetah_v4, ["return"])
    # run(benchmark_ant_v4)
    # generate_plots(benchmark_ant_v4, ["return"])
    # run(benchmark_hopper_v4)
    # generate_plots(benchmark_hopper_v4, ["return"])
    # run(benchmark_walker2d_v4)
    # generate_plots(benchmark_walker2d_v4, ["return"])
    # test rctd3
    # run(rctd3_ant_part_1)
    # run(rctd3_ant_part_2)
    # generate_plots(generate_rctd3_ant, ["return", "return_cost"])
    # test dynamic action spaces
    run(test_dynamic_action_space)
    generate_plots(test_dynamic_action_space, ["return"])

Functions

def evaluate(experiment, all_processes: List[multiprocessing.context.Process]) ‑> None

Running multiple methods and multiple runs in the given gym environment.

Expand source code
def evaluate(experiment, all_processes: List[mp.Process]) -> None:
    """Running multiple methods and multiple runs in the given gym environment."""
    env_name = experiment["env_name"]
    num_runs = experiment["num_runs"]
    num_episodes = experiment.get("num_episodes")
    num_steps = experiment.get("num_steps")
    record_period = experiment["record_period"]
    print_every_x_episodes = experiment.get("print_every_x_episodes")
    print_every_x_steps = experiment.get("print_every_x_steps")
    methods = experiment["methods"]
    processes = []
    for method in methods:
        method["agent_args"] = {"device_id": experiment["device_id"]}
        for run_idx in range(num_runs):
            p = mp.Process(
                target=evaluate_single,
                args=(
                    env_name,
                    method,
                    run_idx,
                    num_episodes,
                    num_steps,
                    print_every_x_episodes,
                    print_every_x_steps,
                    record_period,
                ),
            )
            processes.append(p)

    all_processes.extend(processes)
def evaluate_single(env_name, method, run_idx, num_episodes, num_steps, print_every_x_episodes, print_every_x_steps, record_period)

Performing one run of experiment.

Expand source code
def evaluate_single(
    env_name,
    method,
    run_idx,
    num_episodes,
    num_steps,
    print_every_x_episodes,
    print_every_x_steps,
    record_period,
):
    """Performing one run of experiment."""
    set_seed(run_idx)
    policy_learner = method["policy_learner"]
    policy_learner_args = method["policy_learner_args"]
    agent_args = method["agent_args"]
    env = get_env(env_name)
    policy_learner_args["state_dim"] = env.observation_space.shape[0]

    if "exploration_module" in method and "exploration_module_args" in method:
        policy_learner_args["exploration_module"] = method["exploration_module"](
            **method["exploration_module_args"]
        )
    if "replay_buffer" in method and "replay_buffer_args" in method:
        agent_args["replay_buffer"] = method["replay_buffer"](
            **method["replay_buffer_args"]
        )
    if "safety_module" in method and "safety_module_args" in method:
        agent_args["safety_module"] = method["safety_module"](
            **method["safety_module_args"]
        )
    if (
        "action_representation_module" in method
        and "action_representation_module_args" in method
    ):
        if (
            method["action_representation_module"].__name__
            == "OneHotActionTensorRepresentationModule"
        ):
            method["action_representation_module_args"][
                "max_number_actions"
            ] = env.action_space.n
        policy_learner_args["action_representation_module"] = method[
            "action_representation_module"
        ](**method["action_representation_module_args"])

    if (
        "history_summarization_module" in method
        and "history_summarization_module_args" in method
    ):
        if (
            method["history_summarization_module"].__name__
            == "StackHistorySummarizationModule"
        ):
            policy_learner_args["state_dim"] = (
                env.observation_space.shape[0] + env.action_space.n
            ) * method["history_summarization_module_args"]["history_length"]
        elif (
            method["history_summarization_module"].__name__
            == "LSTMHistorySummarizationModule"
        ):
            method["history_summarization_module_args"][
                "observation_dim"
            ] = env.observation_space.shape[0]
            method["history_summarization_module_args"][
                "action_dim"
            ] = env.action_space.n
            policy_learner_args["state_dim"] = method[
                "history_summarization_module_args"
            ]["hidden_dim"]

        agent_args["history_summarization_module"] = method[
            "history_summarization_module"
        ](**method["history_summarization_module_args"])

    if method["name"] == "DuelingDQN":  # only for Dueling DQN
        assert "network_module" in method and "network_args" in method
        policy_learner_args["network_instance"] = method["network_module"](
            state_dim=env.observation_space.shape[0],
            action_dim=env.action_space.n,
            **method["network_args"],
        )
    if method["name"] == "BootstrappedDQN":  # only for Bootstrapped DQN
        assert "network_module" in method and "network_args" in method
        policy_learner_args["q_ensemble_network"] = method["network_module"](
            state_dim=env.observation_space.shape[0],
            action_dim=env.action_space.n,
            **method["network_args"],
        )
        del policy_learner_args["state_dim"]

    if "dynamic" in method["name"]:
        policy_learner_args["actor_network_type"] = method["actor_network_type"]

    policy_learner_args["action_space"] = env.action_space
    agent = PearlAgent(
        policy_learner=policy_learner(
            **policy_learner_args,
        ),
        **agent_args,
    )
    method_name = method["name"]
    print(f"Run #{run_idx + 1} for {method_name} in {env_name}")
    if (
        method["name"] == "REINFORCE" or method["name"] == "PPO"
    ):  # REINFORCE only performs learning at the end of each episode
        learn_after_episode = True
    else:
        learn_after_episode = False

    info = online_learning(
        agent,
        env,
        number_of_episodes=num_episodes,
        number_of_steps=num_steps,
        print_every_x_episodes=print_every_x_episodes,
        print_every_x_steps=print_every_x_steps,
        learn_after_episode=learn_after_episode,
        seed=run_idx,
        record_period=record_period,
    )
    dir = f"outputs/{env_name}/{method_name}"
    os.makedirs(dir, exist_ok=True)
    for key in info:
        np.save(dir + f"/{run_idx}_{key}.npy", info[key])
def generate_one_plot(experiment, attributes)

Generating learning curves for all tested methods in one environment.

Expand source code
def generate_one_plot(experiment, attributes):
    """Generating learning curves for all tested methods in one environment."""
    env_name = experiment["env_name"]
    exp_name = experiment["exp_name"]
    num_runs = experiment["num_runs"]
    record_period = experiment["record_period"]
    methods = experiment["methods"]
    for attr in attributes:
        for method in methods:
            data = []
            for run in range(num_runs):
                try:
                    d = np.load(f"outputs/{env_name}/{method['name']}/{run}_{attr}.npy")
                except FileNotFoundError:
                    print(
                        f"File not found for outputs/{env_name}/{method['name']}/{run}_{attr}.npy"
                    )
                    continue
                data.append(d)
            data = np.array(data)
            mean = data.mean(axis=0)
            std_error = data.std(axis=0) / np.sqrt(num_runs)
            x_list = record_period * np.arange(mean.shape[0])
            if "num_steps" in experiment:
                plt.plot(x_list, mean, label=method["name"])
                plt.fill_between(x_list, mean - std_error, mean + std_error, alpha=0.2)
            else:
                plt.plot(x_list, mean, label=method["name"])
                plt.fill_between(
                    x_list,
                    mean - std_error,
                    mean + std_error,
                    alpha=0.2,
                )
        plt.title(env_name)
        if "num_steps" in experiment:
            plt.xlabel("Steps")
        else:
            plt.xlabel("Episodes")
        plt.ylabel(attr)
        plt.legend()
        plt.savefig(f"outputs/{exp_name}_{env_name}_{attr}.png")
        plt.close()
def generate_plots(experiments, attributes) ‑> None
Expand source code
def generate_plots(experiments, attributes) -> None:
    for e in experiments:
        generate_one_plot(e, attributes)
def run(experiments) ‑> None

Assign one run to one process.

Expand source code
def run(experiments) -> None:
    """Assign one run to one process."""
    assert len(experiments) > 0
    all_processes = []

    for e in experiments:
        evaluate(e, all_processes)

    for p in all_processes:
        p.start()
    for p in all_processes:
        p.join()