Asynchronous Samplers

Separate sampler classes are needed for asynchronous sampling-optimization mode, and they closely match the options for the other samplers. In asynchronous mode, the sampler will run in a separate process forked from the main (training) process. Parallel asynchronous samplers will fork additional processes.

Base Components

class rlpyt.samplers.async_.base.AsyncSamplerMixin

Mixin class defining master runner initialization method for all asynchronous samplers.

async_initialize(agent, bootstrap_value=False, traj_info_kwargs=None, seed=None)

Instantiate an example environment and use it to initialize the agent (on shared memory). Pre-allocate a double-buffer for sample batches, and return that buffer along with example data (e.g. observation, action, etc.)

class rlpyt.samplers.async_.base.AsyncParallelSamplerMixin

Bases: rlpyt.samplers.async_.base.AsyncSamplerMixin

Mixin class defining methods for the asynchronous sampler main process (which is forked from the overall master process).

obtain_samples(itr, db_idx)

Communicates to workers which batch buffer to use, and signals them to start collection. Waits until workers finish, and then retrieves completed trajectory-info objects from the workers and returns them in a list.

Serial

class rlpyt.samplers.async_.serial_sampler.AsyncSerialSampler(*args, CollectorCls=<class 'rlpyt.samplers.async_.collectors.DbCpuResetCollector'>, eval_CollectorCls=<class 'rlpyt.samplers.serial.collectors.SerialEvalCollector'>, **kwargs)

Bases: rlpyt.samplers.async_.base.AsyncSamplerMixin, rlpyt.samplers.base.BaseSampler

Sampler which runs asynchronously in a python process forked from the master (training) process, but with no further parallelism.

initialize(affinity)

Initialization inside the main sampler process. Sets process hardware affinities, creates specified number of environment instances and instantiates the collector with them. If applicable, does the same for evaluation environment instances. Moves the agent to device (could be GPU), and calls on agent.async_cpu() initialization. Starts up collector.

obtain_samples(itr, db_idx)

First calls the agent to retrieve new parameter values from the training process’s agent. Then passes the double-buffer index to the collector and collects training sample batch. Returns list of completed trajectory-info objects.

evaluate_agent(itr)

First calls the agent to retrieve new parameter values from the training process’s agent.

CPU-Agent

class rlpyt.samplers.async_.cpu_sampler.AsyncCpuSampler(*args, CollectorCls=<class 'rlpyt.samplers.async_.collectors.DbCpuResetCollector'>, eval_CollectorCls=<class 'rlpyt.samplers.parallel.cpu.collectors.CpuEvalCollector'>, **kwargs)

Bases: rlpyt.samplers.async_.base.AsyncParallelSamplerMixin, rlpyt.samplers.parallel.base.ParallelSamplerBase

Parallel sampler for agent action-selection on CPU, to use in asynchronous runner. The master (training) process will have forked the main sampler process, which here will fork sampler workers from itself, and otherwise will run similarly to the CpuSampler.

initialize(affinity)

Runs inside the main sampler process. Sets process hardware affinity and calls the agent.async_cpu() initialization. Then proceeds with usual parallel sampler initialization.

obtain_samples(itr, db_idx)

Calls the agent to retrieve new parameter values from the training process, then proceeds with base async parallel method.

evaluate_agent(itr)

Calls the agent to retrieve new parameter values from the training process, then proceeds with base async parallel method.

GPU-Agent

Main Class

class rlpyt.samplers.async_.gpu_sampler.AsyncGpuSampler(*args, CollectorCls=<class 'rlpyt.samplers.async_.collectors.DbGpuResetCollector'>, eval_CollectorCls=<class 'rlpyt.samplers.parallel.gpu.collectors.GpuEvalCollector'>, **kwargs)

Bases: rlpyt.samplers.async_.action_server.AsyncActionServer, rlpyt.samplers.async_.gpu_sampler.AsyncGpuSamplerBase

Component Definitions

class rlpyt.samplers.async_.gpu_sampler.AsyncGpuSamplerBase(*args, CollectorCls=<class 'rlpyt.samplers.async_.collectors.DbGpuResetCollector'>, eval_CollectorCls=<class 'rlpyt.samplers.parallel.gpu.collectors.GpuEvalCollector'>, **kwargs)

Bases: rlpyt.samplers.async_.base.AsyncParallelSamplerMixin, rlpyt.samplers.parallel.base.ParallelSamplerBase

Main definitions for asynchronous parallel sampler using GPU(s) for action selection. The main sampler process (forked from the overall master), forks action-server processes, one per GPU to be used, and the action-server process(es) fork their own parallel CPU workers. This same sampler object is used in the main sampler process and in the action server process(es), but for different methods, labeled by comments in the code (easier way to pass arguments along).

initialize(affinity)

Initialization inside the main sampler process. Builds one level of parallel synchronization objects, and forks action-server processes, one per GPU to be used.

action_server_process(rank, env_ranks, double_buffer_slice, affinity, seed, n_envs_list)

Target method used for forking action-server process(es) from the main sampler process. By inheriting the sampler object from the sampler process, can more easily pass args to the environment worker processes, which are forked from here.

Assigns hardware affinity, and then forks parallel worker processes and moves agent model to device. Then enters infinite loop: waits for signals from main sampler process to collect training samples or perform evaluation, and then serves actions during collection. At every loop, calls agent to retrieve new parameter values from the training process, which are communicated through shared CPU memory.

class rlpyt.samplers.async_.action_server.AsyncActionServer

Bases: rlpyt.samplers.parallel.gpu.action_server.ActionServer

serve_actions_evaluation(itr)

Similar to normal action-server, but with different signaling logic for ending evaluation early; receive signal from main sampler process and pass it along to my workers.

GPU-Agent, Alternating Workers

Main Classes

class rlpyt.samplers.async_.alternating_sampler.AsyncAlternatingSampler(*args, **kwargs)

Bases: rlpyt.samplers.async_.action_server.AsyncAlternatingActionServer, rlpyt.samplers.async_.alternating_sampler.AsyncAlternatingSamplerBase

class rlpyt.samplers.async_.alternating_sampler.AsyncNoOverlapAlternatingSampler(*args, **kwargs)

Bases: rlpyt.samplers.async_.action_server.AsyncNoOverlapAlternatingActionServer, rlpyt.samplers.async_.alternating_sampler.AsyncAlternatingSamplerBase

Component Definitions

class rlpyt.samplers.async_.alternating_sampler.AsyncAlternatingSamplerBase(*args, **kwargs)

Bases: rlpyt.samplers.async_.gpu_sampler.AsyncGpuSamplerBase

Defines several methods to extend the asynchronous GPU sampler to use two alternating sets of environment workers.

class rlpyt.samplers.async_.action_server.AsyncAlternatingActionServer

Bases: rlpyt.samplers.parallel.gpu.action_server.AlternatingActionServer

serve_actions_evaluation(itr)

Similar to normal action-server, but with different signaling logic for ending evaluation early; receive signal from main sampler process and pass it along to my workers.

class rlpyt.samplers.async_.action_server.AsyncNoOverlapAlternatingActionServer

Bases: rlpyt.samplers.parallel.gpu.action_server.NoOverlapAlternatingActionServer

Not tested, possibly faulty corner cases for synchronization.

serve_actions_evaluation(itr)

Similar to normal action-server, but with different signaling logic for ending evaluation early; receive signal from main sampler process and pass it along to my workers.