Samplers

Several sampler classes are implemented for different parallelization schemes, with multiple environment instances running on CPU resources and agent forward passes happening on either CPU or GPU. The implemented samplers execute a fixed number of time-steps at each call to obtain_samples(), which returns a batch of data with leading dimensions [batch_T, batch_B].

Something about choosing which sampler based on parallel needs/availability, and different for each case, but try them out.

class rlpyt.samplers.base.BaseSampler(EnvCls, env_kwargs, batch_T, batch_B, CollectorCls, max_decorrelation_steps=100, TrajInfoCls=<class 'rlpyt.samplers.collections.TrajInfo'>, eval_n_envs=0, eval_CollectorCls=None, eval_env_kwargs=None, eval_max_steps=None, eval_max_trajectories=None)

Class which interfaces with the Runner, in master process only.

Parameters:
  • EnvCls – class (or factory function) callable to instantiate an environment object
  • env_kwargs (dict) – keyword arguments passed to EnvCls() to instantiate
  • batch_T (int) – number of time-steps per sample batch
  • batch_B (int) – number of environment instances to run (in parallel), becomes second batch dimension
  • CollectorCls – callable to instantiate the collector, which manages agent-environment interaction loop
  • max_decorrelation_steps (int) – if taking random number of steps before start of training, to decorrelate batch states
  • TrajInfoCls – callable to instantiate object for tracking trajectory-wise info
  • eval_n_envs (int) – number of environment instances for agent evaluation (0 for no separate evaluation)
  • eval_CollectorCls – callable to instantiate evaluation collector
  • eval_env_kwargs – keyword arguments passed to EnvCls() for eval envs
  • eval_max_steps – max total number of steps (time * n_envs) per evaluation call
  • eval_max_trajectories – optional earlier cutoff for evaluation phase
initialize(*args, **kwargs)

Should instantiate all components, including setup of parallel process if applicable.

obtain_samples(itr)

Execute agent-environment interactions and return data batch.

evaluate_agent(itr)

Run offline agent evaluation, if applicable.

Serial Sampler

class rlpyt.samplers.serial.sampler.SerialSampler(*args, CollectorCls=<class 'rlpyt.samplers.parallel.cpu.collectors.CpuResetCollector'>, eval_CollectorCls=<class 'rlpyt.samplers.serial.collectors.SerialEvalCollector'>, **kwargs)

Bases: rlpyt.samplers.base.BaseSampler

The simplest sampler; no parallelism, everything occurs in same, master Python process. This can be easier for debugging (e.g. can use breakpoint() in master process) and might be fast enough for experiment purposes. Should be used with collectors which generate the agent’s actions internally, i.e. CPU-based collectors but not GPU-based ones.

initialize(agent, affinity=None, seed=None, bootstrap_value=False, traj_info_kwargs=None, rank=0, world_size=1)

Store the input arguments. Instantiate the specified number of environment instances (batch_B). Initialize the agent, and pre-allocate a memory buffer to hold the samples collected in each batch. Applies traj_info_kwargs settings to the TrajInfoCls by direct class attribute assignment. Instantiates the Collector and, if applicable, the evaluation Collector.

Returns a structure of inidividual examples for data fields such as observation, action, etc, which can be used to allocate a replay buffer.

obtain_samples(itr)

Call the collector to execute a batch of agent-environment interactions. Return data in torch tensors, and a list of trajectory-info objects from episodes which ended.

evaluate_agent(itr)

Call the evaluation collector to execute agent-environment interactions.

Parallel Samplers

class rlpyt.samplers.parallel.base.ParallelSamplerBase(EnvCls, env_kwargs, batch_T, batch_B, CollectorCls, max_decorrelation_steps=100, TrajInfoCls=<class 'rlpyt.samplers.collections.TrajInfo'>, eval_n_envs=0, eval_CollectorCls=None, eval_env_kwargs=None, eval_max_steps=None, eval_max_trajectories=None)

Bases: rlpyt.samplers.base.BaseSampler

Base class for samplers which use worker processes to run environment steps in parallel, across CPU resources.

initialize(agent, affinity, seed, bootstrap_value=False, traj_info_kwargs=None, world_size=1, rank=0, worker_process=None)

Creates an example instance of the environment for agent initialization (which may differ by sub-class) and to pre-allocate batch buffers, then deletes the environment instance. Batch buffers are allocated on shared memory, so that worker processes can read/write directly to them.

Computes the number of parallel processes based on the affinity argument. Forks worker processes, which instantiate their own environment and collector objects. Waits for the worker process to complete all initialization (such as decorrelating environment states) before returning. Barriers and other parallel indicators are constructed to manage worker processes.

Warning

If doing offline agent evaluation, will use at least one evaluation environment instance per parallel worker, which might increase the total number of evaluation instances over what was requested. This may result in bias towards shorter episodes if the episode length is variable, and if the max number of evalution steps divided over the number of eval environments (eval_max_steps / actual_eval_n_envs), is not large relative to the max episode length.

obtain_samples(itr)

Signal worker processes to collect samples, and wait until they finish. Workers will write directly to the pre-allocated samples buffer, which this method returns. Trajectory-info objects from completed trajectories are retrieved from workers through a parallel queue object and are also returned.

evaluate_agent(itr)

Signal worker processes to perform agent evaluation. If a max number of evaluation trajectories was specified, keep watch over the number of trajectories finished and signal an early end if the limit is reached. Return a list of trajectory-info objects from the completed episodes.

CPU-Agent

class rlpyt.samplers.parallel.cpu.sampler.CpuSampler(*args, CollectorCls=<class 'rlpyt.samplers.parallel.cpu.collectors.CpuResetCollector'>, eval_CollectorCls=<class 'rlpyt.samplers.parallel.cpu.collectors.CpuEvalCollector'>, **kwargs)

Parallel sampler for using the CPU resource of each worker to compute agent forward passes; for use with CPU-based collectors.

obtain_samples(itr)

First, have the agent sync shared memory; in case training uses a GPU, the agent needs to copy its (new) GPU model parameters to the shared-memory CPU model which all the workers use. Then call super class’s method.

evaluate_agent(itr)

Like in obtain_samples(), first sync agent shared memory.

GPU-Agent

class rlpyt.samplers.parallel.gpu.sampler.GpuSamplerBase(*args, CollectorCls=<class 'rlpyt.samplers.parallel.gpu.collectors.GpuResetCollector'>, eval_CollectorCls=<class 'rlpyt.samplers.parallel.gpu.collectors.GpuEvalCollector'>, **kwargs)

Bases: rlpyt.samplers.parallel.base.ParallelSamplerBase

Base class for parallel samplers which use worker processes to execute environment steps on CPU resources but the master process to execute agent forward passes for action selection, presumably on GPU. Use GPU-based collecter classes.

In addition to the usual batch buffer for data samples, allocates a step buffer over shared memory, which is used for communication with workers. The step buffer includes observations, which the workers write and the master reads, and actions, which the master write and the workers read. (The step buffer has leading dimension [batch_B], for the number of parallel environments, and each worker gets its own slice along that dimension.) The step buffer object is held in both numpy array and torch tensor forms over the same memory; e.g. workers write to the numpy array form, and the agent is able to read the torch tensor form.

(Possibly more information about how the stepping works, but write in action-server or smwr like that.)

obtain_samples(itr)

Signals worker to begin environment step execution loop, and drops into serve_actions() method to provide actions to workers based on the new observations at each step.

evaluate_agent(itr)

Signals workers to begin agent evaluation loop, and drops into serve_actions_evaluation() to provide actions to workers at each step.

_agent_init(agent, env, global_B=1, env_ranks=None)

Initializes the agent, having it not share memory, because all agent functions (training and sampling) happen in the master process, presumably on GPU.

class rlpyt.samplers.parallel.gpu.action_server.ActionServer

Mixin class with methods for serving actions to worker processes which execute environment steps.

serve_actions(itr)

Called in master process during obtain_samples().

Performs agent action- selection loop in concert with workers executing environment steps. Uses shared memory buffers to communicate agent/environment data at each time step. Uses semaphores for synchronization: one per worker to acquire when they finish writing the next step of observations, one per worker to release when master has written the next actions. Resets the agent one B-index at a time when the corresponding environment resets (i.e. agent’s recurrent state, with leading dimension batch_B).

Also communicates agent_info to workers, which are responsible for recording all data into the batch buffer.

If requested, collects additional agent value estimation of final observation for bootstrapping (the one thing written to the batch buffer here).

Warning

If trying to modify, must be careful to keep correct logic of the semaphores, to make sure they drain properly. If a semaphore ends up with an extra release, synchronization can be lost silently, leading to wrong and confusing results.

serve_actions_evaluation(itr)

Similar to serve_actions(). If a maximum number of eval trajectories was specified, keeps track of the number completed and terminates evaluation if the max is reached. Returns a list of completed trajectory-info objects.

class rlpyt.samplers.parallel.gpu.sampler.GpuSampler(*args, CollectorCls=<class 'rlpyt.samplers.parallel.gpu.collectors.GpuResetCollector'>, eval_CollectorCls=<class 'rlpyt.samplers.parallel.gpu.collectors.GpuEvalCollector'>, **kwargs)

Bases: rlpyt.samplers.parallel.gpu.action_server.ActionServer, rlpyt.samplers.parallel.gpu.sampler.GpuSamplerBase

GPU-Agent, Alternating Workers

class rlpyt.samplers.parallel.gpu.alternating_sampler.AlternatingSamplerBase(*args, **kwargs)

Twice the standard number of worker processes are forked, and they may share CPU resources in pairs. Environment instances are divided evenly among the two sets. While one set of workers steps their environments, the action-server process computes the actions for the other set of workers, which are paused until their new actions are ready (this pause happens in the GpuSampler). The two sets of workers alternate in this procedure, keeping the CPU maximally busy. The intention is to hide the time to compute actions from the critical path of execution, which can provide up to a 2x speed boost in theory, if the environment-step time and agent-step time were othewise equal.

If the latency in computing and returning the agent’s action is longer than environment stepping, then this alternation might not be faster, because it calls agent action selection twice as many times.

initialize(agent, *args, **kwargs)

Like the super class’s initialize(), but creates additional set of synchronization and communication objects for the alternate workers.

class rlpyt.samplers.parallel.gpu.action_server.AlternatingActionServer

Mixin class for serving actions in the alternating GPU sampler. The synchronization format in this class allows the two worker groups to execute partially simultaneously; workers wait to step for their new action to be ready but do not wait for the other set of workers to be done stepping.

class rlpyt.samplers.parallel.gpu.action_server.NoOverlapAlternatingActionServer

Mixin class for serving actions in the alternating GPU sampler. The synchronization format in this class disallows the two worker groups from executing simultaneously; workers wait to step for their new action to be ready and also wait for the other set of workers to be done stepping.

Warning

Not sure the logic around semaphores is correct for all cases at the end of serve_actions_evaluation() (see TODO comment).

class rlpyt.samplers.parallel.gpu.alternating_sampler.AlternatingSampler(*args, **kwargs)

Bases: rlpyt.samplers.parallel.gpu.action_server.AlternatingActionServer, rlpyt.samplers.parallel.gpu.alternating_sampler.AlternatingSamplerBase

class rlpyt.samplers.parallel.gpu.alternating_sampler.NoOverlapAlternatingSampler(*args, **kwargs)

Bases: rlpyt.samplers.parallel.gpu.action_server.NoOverlapAlternatingActionServer, rlpyt.samplers.parallel.gpu.alternating_sampler.AlternatingSamplerBase

Parallel Sampler Worker

The same function is used as the target for forking worker processes in all parallel samplers.

rlpyt.samplers.parallel.worker.sampling_process(common_kwargs, worker_kwargs)

Target function used for forking parallel worker processes in the samplers. After initialize_worker(), it creates the specified number of environment instances and gives them to the collector when instantiating it. It then calls collector startup methods for environments and agent. If applicable, instantiates evaluation environment instances and evaluation collector.

Then enters infinite loop, waiting for signals from master to collect training samples or else run evaluation, until signaled to exit.

rlpyt.samplers.parallel.worker.initialize_worker(rank, seed=None, cpu=None, torch_threads=None)

Assign CPU affinity, set random seed, set torch_threads if needed to prevent MKL deadlock.