Collectors

Collectors run the environment-agent interaction loop and record sampled data to the batch buffer. The serial sampler runs one collector, and in parallel samplers, each worker process runs one collector. Different collectors are needed for CPU-agent vs GPU-agent samplers.

In general, collectors will execute a for loop over time steps, and and inner for loop over environments, and step each environment one at a time. At every step, all information (e.g. observation, env_info, etc.) is recorded to its place in the pre-allocated batch buffer. All information is also fed to the trajectory-info object for each environment, for tracking trajectory-wise measures.

Evaluation collectors only record trajectory-wise results.

Training Collectors

Base Components

class rlpyt.samplers.collectors.BaseCollector(rank, envs, samples_np, batch_T, TrajInfoCls, agent=None, sync=None, step_buffer_np=None, global_B=1, env_ranks=None)

Class that steps environments, possibly in worker process.

start_envs()

e.g. calls reset() on every env.

start_agent()

In CPU-collectors, call agent.collector_initialize() e.g. to set up vector epsilon-greedy, and reset the agent.

collect_batch(agent_inputs, traj_infos)

Main data collection loop.

reset_if_needed(agent_inputs)

Reset agent and or env as needed, if doing between batches.

class rlpyt.samplers.collectors.DecorrelatingStartCollector(rank, envs, samples_np, batch_T, TrajInfoCls, agent=None, sync=None, step_buffer_np=None, global_B=1, env_ranks=None)

Bases: rlpyt.samplers.collectors.BaseCollector

Collector which can step all environments through a random number of random actions during startup, to decorrelate the states in training batches.

start_envs(max_decorrelation_steps=0)

Calls reset() on every environment instance, then steps each one through a random number of random actions, and returns the resulting agent_inputs buffer (observation, prev_action, prev_reward).

CPU-Agent Collectors

class rlpyt.samplers.parallel.cpu.collectors.CpuResetCollector(rank, envs, samples_np, batch_T, TrajInfoCls, agent=None, sync=None, step_buffer_np=None, global_B=1, env_ranks=None)

Bases: rlpyt.samplers.collectors.DecorrelatingStartCollector

Collector which executes agent.step() in the sampling loop (i.e. use in CPU or serial samplers.)

It immediately resets any environment which finishes an episode. This is typically indicated by the environment returning done=True. But this collector defers to the done signal only after looking for env_info["traj_done"], so that RL episodes can end without a call to env_reset() (e.g. used for episodic lives in the Atari env). The agent gets reset based solely on done.

class rlpyt.samplers.parallel.cpu.collectors.CpuWaitResetCollector(*args, **kwargs)

Bases: rlpyt.samplers.collectors.DecorrelatingStartCollector

Collector which executes agent.step() in the sampling loop.

It waits to reset any environments with completed episodes until after the end of collecting the batch, i.e. the done environment is bypassed in remaining timesteps, and zeros are recorded into the batch buffer.

Waiting to reset can be beneficial for two reasons. One is for training recurrent agents; PyTorch’s built-in LSTM cannot reset in the middle of a training sequence, so any samples in a batch after a reset would be ignored and the beginning of new episodes would be missed in training. The other reason is if the environment’s reset function is very slow compared to its step function; it can be faster overall to leave invalid samples after a reset, and perform the environment resets in the workers while the master process is training the agent (this was true for massively parallelized Atari).

GPU-Agent Collectors

class rlpyt.samplers.parallel.gpu.collectors.GpuResetCollector(rank, envs, samples_np, batch_T, TrajInfoCls, agent=None, sync=None, step_buffer_np=None, global_B=1, env_ranks=None)

Bases: rlpyt.samplers.collectors.DecorrelatingStartCollector

Collector which communicates observations to an action-server, which in turn provides the agent’s actions (i.e. use in GPU samplers).

Environment reset logic is the same as in CpuResetCollector.

class rlpyt.samplers.parallel.gpu.collectors.GpuWaitResetCollector(*args, **kwargs)

Bases: rlpyt.samplers.collectors.DecorrelatingStartCollector

Collector which communicates observations to an action-server, which in turn provides the agent’s actions (i.e. use in GPU samplers).

Environment reset logic is the same as in CpuWaitResetCollector.

Evaluation Collectors

class rlpyt.samplers.collectors.BaseEvalCollector(rank, envs, TrajInfoCls, traj_infos_queue, max_T, agent=None, sync=None, step_buffer_np=None)

Collectors for offline agent evalution; not to record intermediate samples.

collect_evaluation()

Run agent evaluation in environment and return completed trajectory infos.

class rlpyt.samplers.parallel.cpu.collectors.CpuEvalCollector(rank, envs, TrajInfoCls, traj_infos_queue, max_T, agent=None, sync=None, step_buffer_np=None)

Bases: rlpyt.samplers.collectors.BaseEvalCollector

Offline agent evaluation collector which calls agent.step() in sampling loop. Immediately resets any environment which finishes a trajectory. Stops when the max time-steps have been reached, or when signaled by the master process (i.e. if enough trajectories have completed).

class rlpyt.samplers.parallel.gpu.collectors.GpuEvalCollector(rank, envs, TrajInfoCls, traj_infos_queue, max_T, agent=None, sync=None, step_buffer_np=None)

Bases: rlpyt.samplers.collectors.BaseEvalCollector

Offline agent evaluation collector which communicates observations to an action-server, which in turn provides the agent’s actions.