rl8.policies package

Module contents

Definitions related to the union of models and action distributions.

This is the main definition used by training algorithms for sampling from models and action distributions. It’s recommended to use this interface when deploying a policy or model such that the action distribution is always paired with the model and transformations required for model inference are properly handled.

class rl8.policies.GenericPolicyBase[source]

Bases: Generic[_Model]

The base policy, defining the interfaces for the union of a feedforward model and an action distribution.

All policies inherit from this base class.

distribution_cls: type[rl8.distributions.Distribution]

Underlying policy action distribution that’s parameterized by features produced by GenericPolicyBase.model.

model: _Model

Underlying policy model that processes environment observations into a value function approximation and into features to be consumed by an action distribution for action sampling.

property action_spec: TensorSpec

Return the action spec used for constructing the model.

property device: str | device

Return the device the policy’s model is on.

property observation_spec: TensorSpec

Return the observation spec used for constructing the model.

abstract save(path: str | PathLike[str], /) PythonModel[source]

Save the policy by cloud pickling it to path and returning the interface used for deploying it with MLflow.

This method is only defined to expose a common interface between different algorithms. This is by no means the only way to save a policy and isn’t even the recommended way to save a policy.

to(device: str | device, /) Self[source]

Move the policy and its attributes to device.

class rl8.policies.MLflowPolicyModel[source]

Bases: PythonModel

A MLflow Python model implementation of a feedforward policy.

This is by no means the only way to define a MLflow interface for a feedforward policy, nor is it the recommended way to deploy or serve your trained policy with MLflow. This is simply a minimal and generic implementation of a MLflow Python model for feedforward policies that serves as a convenience. The majority of policy deployment use cases will probably be satisified with this minimal implementation. Use cases not covered by this implementation are encouraged to write their own implementation that fits their needs as this implementation will likely not see further development beyond bugfixes.

On top of this implementation being minimal and in “maintenance mode”, it doesn’t support all the many kinds of policy models one could define with rl8. This implementation supports many observation spaces, but this implementation does not support all action spaces. Action spaces are limited to (flattened) 1D spaces; more than 1D is possible, but it’s likely it will experience inconsistent behavior when storing actions in the output dataframe.

Examples

A minimal example of training a policy, saving it with MLflow, and then reloading it for inference using this interface.

>>> from tempfile import TemporaryDirectory
...
... import mlflow
...
... from rl8 import Trainer
... from rl8.env import DiscreteDummyEnv
... # Create the trainer and step it once for the heck of it.
... trainer = Trainer(DiscreteDummyEnv)
... trainer.step()
... # Create a temporary directory for storing model artifacts
... # and the actual MLflow model. This'll get cleaned-up
... # once the context ends.
... with TemporaryDirectory() as tmp:
...     # This is where you set options specific to your
...     # use case. At a bare minimum, the policy's
...     # artifact (the policy pickle file) is specified,
...     # but you may want to add code files, data files,
...     # dependencies/requirements, etc..
...     mlflow.pyfunc.save_model(
...         f"{tmp}/model",
...         python_model=trainer.algorithm.policy.save(f"{tmp}/policy.pkl"),
...         artifacts={"policy": f"{tmp}/policy.pkl"},
...     )
...     model = mlflow.pyfunc.load_model(f"{tmp}/model")
...     # We cheat here a bit and use the environment's spec
...     # to generate a valid input example. These are usually
...     # constructed by some other service.
...     obs = DiscreteDummyEnv(1).observation_spec.rand([1, 1]).cpu().numpy()
...     model.predict({"obs": obs})  
load_context(context: PythonModelContext) None[source]

Loads the saved policy on model instantiation.

predict(context: PythonModelContext, model_input: dict[str, Any]) DataFrame[source]

Sample from the underlying policy using model_input as input.

Parameters:
  • context – Python model context that’s unused for this implementation.

  • model_input – Policy model input (or observation). The observation space is expected to be a 1D vector or a composite spec that maps strings to tensor specs; the policy model is expected to ingest a tensordict and handle all the input preprocessing (such as tensor concatenation) on its own. The model input (or observation) is expected to match the policy model’s observation space within an "obs" key and is expected to be of shape [B, T, ...] for each tensor within the observation where B is the batch dimension, and T is the time or sequence dimension. The underlying policy will handle reshaping of the model input for batch inference and the policy’s outputs will be of shape [B * T, ...] such that the batch and time dimensions are flattened into the first dimension. Thus, the index of the resulting output dataframe from this method will correspond to indicies of the flattened first dimension.

Returns:

A dataframe with B * T rows containing sampled actions, log probabilities of sampling those actions, and value estimates. B is the model input’s batch dimension, and T is the model input’s time or sequence dimension.

class rl8.policies.MLflowRecurrentPolicyModel[source]

Bases: PythonModel

A MLflow Python model implementation of a recurrent policy.

This is by no means the only way to define a MLflow interface for a recurrent policy, nor is it the recommended way to deploy or serve your trained policy with MLflow. This is simply a minimal and generic implementation of a MLflow Python model for recurrent policies that serves as a convenience. The majority of policy deployment use cases will probably be satisified with this minimal implementation. Use cases not covered by this implementation are encouraged to write their own implementation that fits their needs as this implementation will likely not see further development beyond bugfixes.

On top of this implementation being minimal and in “maintenance mode”, it doesn’t support all the many kinds of policy models one could define with rl8. This implementation supports many observation spaces, but this implementation does not support all action spaces. Action spaces are limited to (flattened) 1D spaces; more than 1D is possible, but it’s likely it will experience inconsistent behavior when storing actions in the output dataframe.

Examples

A minimal example of training a policy, saving it with MLflow, and then reloading it for inference using this interface.

>>> from tempfile import TemporaryDirectory
...
... import mlflow
...
... from rl8 import RecurrentTrainer
... from rl8.env import DiscreteDummyEnv
... # Create the trainer and step it once for the heck of it.
... trainer = RecurrentTrainer(DiscreteDummyEnv)
... trainer.step()
... # Create a temporary directory for storing model artifacts
... # and the actual MLflow model. This'll get cleaned-up
... # once the context ends.
... with TemporaryDirectory() as tmp:
...     # This is where you set options specific to your
...     # use case. At a bare minimum, the policy's
...     # artifact (the policy pickle file) is specified,
...     # but you may want to add code files, data files,
...     # dependencies/requirements, etc..
...     mlflow.pyfunc.save_model(
...         f"{tmp}/model",
...         python_model=trainer.algorithm.policy.save(f"{tmp}/policy.pkl"),
...         artifacts={"policy": f"{tmp}/policy.pkl"},
...     )
...     model = mlflow.pyfunc.load_model(f"{tmp}/model")
...     # We cheat here a bit and use the environment's spec
...     # to generate a valid input example. These are usually
...     # constructed by some other service.
...     obs = DiscreteDummyEnv(1).observation_spec.rand([1, 1]).cpu().numpy()
...     model.predict({"obs": obs})  
load_context(context: PythonModelContext) None[source]

Loads the saved policy on model instantiation.

predict(context: PythonModelContext, model_input: dict[str, Any]) list[pandas.core.frame.DataFrame][source]

Sample from the underlying policy using model_input as input.

Parameters:
  • context – Python model context that’s unused for this implementation.

  • model_input – Policy model input (or observation). The observation space is expected to be a composite spec that maps strings to tensor specs; the policy model is expected to ingest a tensordict and handle all the input preprocessing (such as tensor concatenation) on its own. The model input (or observation) is expected to match the policy model’s observation space and should contain the recurrent model’s recurrent state (unless a new recurrent state is to be instantiated). The model inputs are expected to be of shape [B, T, ...] for each tensor within the observation where B is the batch dimension, and T is the time or sequence dimension, while the model recurrent states are expected to be of shape [B, 1, ...]. The underlying policy will handle reshaping of the model input for batch inference and the policy’s outputs will be of shape [B * T, ...] such that the batch and time dimensions are flattened into the first dimension. Thus, the index of the resulting output dataframe from this method will correspond to indicies of the flattened first dimension. The output dataframe will also contain the updated recurrent states for just the final timestep. These recurrent states are repeated along the time dimension to allow storing of recurrent states within the same dataframe as the model outputs.

Returns:

the first with B * T rows containing sampled actions, log probabilities of sampling those actions, and value estimates; the second with B rows containing updated recurrent model states. B is the model input’s batch dimension, and T is the model input’s time or sequence dimension.

Return type:

Two dataframes

class rl8.policies.Policy(observation_spec: TensorSpec, action_spec: TensorSpec, /, *, model: None | Model = None, model_cls: None | ModelFactory = None, model_config: None | dict[str, Any] = None, distribution_cls: None | type[rl8.distributions.Distribution] = None, device: str | device = 'cpu')[source]

Bases: GenericPolicyBase[Model]

The union of a feedforward model and an action distribution.

Parameters:
  • observation_spec – Spec defining observations from the environment and inputs to the model’s forward pass.

  • action_spec – Spec defining the action distribution’s outputs and the inputs to the environment.

  • model – Model instance to use. Mutually exclusive with model_cls.

  • model_cls – Model class or class factory to use.

  • model_config – Model class args.

  • distribution_cls – Action distribution class.

  • device – Device the policy resides on.

sample(batch: TensorDict, /, *, kind: Literal['last', 'all'] = 'last', deterministic: bool = False, inplace: bool = False, requires_grad: bool = False, return_actions: bool = True, return_logp: bool = False, return_values: bool = False, return_views: bool = False) TensorDict[source]

Use batch to sample from the policy, sampling actions from the model and optionally sampling additional values often used for training and analysis.

Parameters:
  • batch – Batch to feed into the policy’s underlying model. Expected to be of size [B, T, ...] where B is the batch dimension, and T is the time or sequence dimension. B is typically the number of parallel environments being sampled for during massively parallel training, and T is typically the number of time steps or observations sampled from the environments. The B and T dimensions are typically combined into one dimension during batch preprocessing according to the model’s view requirements.

  • kind

    String indicating the type of sample to perform. The model’s view requirements handles preprocessing slightly differently depending on the value. Options include:

    • ”last”: Sample from batch using only the samples

      necessary to sample for the most recent observations within the batch’s T dimension.

    • ”all”: Sample from batch using all observations within

      the batch’s T dimension.

  • deterministic – Whether to sample from the policy deterministically (the actions are always the same for the same inputs) or stochastically (there is a randomness to the policy’s actions).

  • inplace – Whether to store policy outputs in the given batch tensordict. Otherwise, create a separate tensordict that will only contain policy outputs.

  • requires_grad – Whether to enable gradients for the underlying model during forward passes. This should only be enabled during a training loop or when requiring gradients for explainability or other analysis reasons.

  • return_actions – Whether to sample the policy’s action distribution and return the sampled actions.

  • return_logp – Whether to return the log probability of taking the sampled actions. Often enabled during a training loop for aggregating training data a bit more efficiently.

  • return_values – Whether to return the value function approximation in the given observations. Often enabled during a training loop for aggregating training data a bit more efficiently.

  • return_views – Whether to return the observation view requirements in the output batch. Even if this flag is enabled, new views are only returned if the views are not already present in the output batch (i.e., if inplace is True and the views are already in the batch, then the returned batch will just contain the original views).

Returns:

A tensordict containing AT LEAST actions sampled from the policy of batch size [B * T, ...] where B is the input’s batch dimension, and T is the time or sequence dimension.

save(path: str | PathLike[str], /) MLflowPolicyModel[source]

Save the policy by cloud pickling it to path and returning the interface used for deploying it with MLflow.

This method is only defined to expose a common interface between different algorithms. This is by no means the only way to save a policy and isn’t even the recommended way to save a policy.

class rl8.policies.RecurrentPolicy(observation_spec: TensorSpec, action_spec: TensorSpec, /, *, model: None | RecurrentModel = None, model_cls: None | RecurrentModelFactory = None, model_config: None | dict[str, Any] = None, distribution_cls: None | type[rl8.distributions.Distribution] = None, device: str | device = 'cpu')[source]

Bases: GenericPolicyBase[RecurrentModel]

The union of a recurrent model and an action distribution.

Parameters:
  • observation_spec – Spec defining observations from the environment and inputs to the model’s forward pass.

  • action_spec – Spec defining the action distribution’s outputs and the inputs to the environment.

  • model – Model instance to use. Mutually exclusive with model_cls.

  • model_cls – Model class or class factory to use.

  • model_config – Model class args.

  • distribution_cls – Action distribution class.

  • device – Device the policy resides on.

init_states(n: int, /) TensorDict[source]

Return new recurrent states for the policy’s model.

sample(batch: TensorDict, /, states: None | TensorDict = None, *, deterministic: bool = False, inplace: bool = False, requires_grad: bool = False, return_actions: bool = True, return_logp: bool = False, return_values: bool = False) tuple[tensordict._td.TensorDict, tensordict._td.TensorDict][source]

Use batch and states to sample from the policy, sampling actions from the model and optionally sampling additional values often used for training and analysis.

Parameters:
  • batch – Batch to feed into the policy’s underlying model. Expected to be of size [B, T, ...] where B is the batch dimension, and T is the time or sequence dimension. B is typically the number of parallel environments being sampled for during massively parallel training, and T is typically the number of time steps or observations sampled from the environments.

  • states – States to feed into the policy’s underlying model. Expected to be of size [B, T, ...] where B is the batch dimension, and T is the time or sequence dimension. B is typically the number of parallel environments being sampled for during massively parallel training, and T is typically the number of time steps or observations sampled from the environments.

  • deterministic – Whether to sample from the policy deterministically (the actions are always the same for the same inputs) or stochastically (there is a randomness to the policy’s actions).

  • inplace – Whether to store policy outputs in the given batch tensordict. Otherwise, create a separate tensordict that will only contain policy outputs.

  • requires_grad – Whether to enable gradients for the underlying model during forward passes. This should only be enabled during a training loop or when requiring gradients for explainability or other analysis reasons.

  • return_actions – Whether to sample the policy’s action distribution and return the sampled actions.

  • return_logp – Whether to return the log probability of taking the sampled actions. Often enabled during a training loop for aggregating training data a bit more efficiently.

  • return_values – Whether to return the value function approximation in the given observations. Often enabled during a training loop for aggregating training data a bit more efficiently.

Returns:

A tensordict containing AT LEAST actions sampled from the policy and a tensordict containing updated recurrent states. The returned recurrent states will only have shape [B, ...] WITHOUT a time dimension T since only the last recurrent state of the series should be returned. Other returned values will have batch size [B * T, ...] where B is the input’s batch dimension, and T is the time or sequence dimension.

save(path: str | PathLike[str], /) MLflowRecurrentPolicyModel[source]

Save the policy by cloud pickling it to path and returning the interface used for deploying it with MLflow.

This method is only defined to expose a common interface between different algorithms. This is by no means the only way to save a policy and isn’t even the recommended way to save a policy.

property state_spec: CompositeSpec

Return the policy’s model’s state spec for defining recurrent state dimensions.