robomimic.utils package#

Submodules#

robomimic.utils.dataset module#

This file contains Dataset classes that are used by torch dataloaders to fetch batches from hdf5 files.

class robomimic.utils.dataset.SequenceDataset(hdf5_path, obs_keys, dataset_keys, frame_stack=1, seq_length=1, pad_frame_stack=True, pad_seq_length=True, get_pad_mask=False, goal_mode=None, hdf5_cache_mode=None, hdf5_use_swmr=True, hdf5_normalize_obs=False, filter_by_attribute=None, load_next_obs=True)#

Bases: torch.utils.data.dataset.Dataset

close_and_delete_hdf5_handle()#

Maybe close the file handle.

get_dataset_for_ep(ep, key)#

Helper utility to get a dataset for a specific demonstration. Takes into account whether the dataset has been loaded into memory.

get_dataset_sampler()#

Return instance of torch.utils.data.Sampler or None. Allows for dataset to define custom sampling logic, such as re-weighting the probability of samples being drawn. See the train function in scripts/train.py, and torch DataLoader documentation, for more info.

get_dataset_sequence_from_demo(demo_id, index_in_demo, keys, seq_length=1)#

Extract a (sub)sequence of dataset items from a demo given the @keys of the items (e.g., states, actions).

Parameters
  • demo_id (str) – id of the demo, e.g., demo_0

  • index_in_demo (int) – beginning index of the sequence wrt the demo

  • keys (tuple) – list of keys to extract

  • seq_length (int) – sequence length to extract. Seq gets post-pended with repeated items if out of range

Returns

a dictionary of extracted items.

get_item(index)#

Main implementation of getitem when not using cache.

get_obs_normalization_stats()#

Returns dictionary of mean and std for each observation key if using observation normalization, otherwise None.

Returns

a dictionary for observation

normalization. This maps observation keys to dicts with a “mean” and “std” of shape (1, …) where … is the default shape for the observation.

Return type

obs_normalization_stats (dict)

get_obs_sequence_from_demo(demo_id, index_in_demo, keys, num_frames_to_stack=0, seq_length=1, prefix='obs')#

Extract a (sub)sequence of observation items from a demo given the @keys of the items.

Parameters
  • demo_id (str) – id of the demo, e.g., demo_0

  • index_in_demo (int) – beginning index of the sequence wrt the demo

  • keys (tuple) – list of keys to extract

  • num_frames_to_stack (int) – numbers of frame to stack. Seq gets prepended with repeated items if out of range

  • seq_length (int) – sequence length to extract. Seq gets post-pended with repeated items if out of range

  • prefix (str) – one of “obs”, “next_obs”

Returns

a dictionary of extracted items.

get_sequence_from_demo(demo_id, index_in_demo, keys, num_frames_to_stack=0, seq_length=1)#

Extract a (sub)sequence of data items from a demo given the @keys of the items.

Parameters
  • demo_id (str) – id of the demo, e.g., demo_0

  • index_in_demo (int) – beginning index of the sequence wrt the demo

  • keys (tuple) – list of keys to extract

  • num_frames_to_stack (int) – numbers of frame to stack. Seq gets prepended with repeated items if out of range

  • seq_length (int) – sequence length to extract. Seq gets post-pended with repeated items if out of range

Returns

a dictionary of extracted items.

get_trajectory_at_index(index)#

Method provided as a utility to get an entire trajectory, given the corresponding @index.

property hdf5_file#

This property allows for a lazy hdf5 file open.

hdf5_file_opened()#

Convenient context manager to open the file on entering the scope and then close it on leaving.

load_dataset_in_memory(demo_list, hdf5_file, obs_keys, dataset_keys, load_next_obs)#

Loads the hdf5 dataset into memory, preserving the structure of the file. Note that this differs from self.getitem_cache, which, if active, actually caches the outputs of the getitem operation.

Parameters
  • demo_list (list) – list of demo keys, e.g., ‘demo_0’

  • hdf5_file (h5py.File) – file handle to the hdf5 dataset.

  • obs_keys (list, tuple) – observation keys to fetch, e.g., ‘images’

  • dataset_keys (list, tuple) – dataset keys to fetch, e.g., ‘actions’

  • load_next_obs (bool) – whether to load next_obs from the dataset

Returns

dictionary of loaded data.

Return type

all_data (dict)

load_demo_info(filter_by_attribute=None, demos=None)#
Parameters
  • filter_by_attribute (str) – if provided, use the provided filter key to select a subset of demonstration trajectories to load

  • demos (list) – list of demonstration keys to load from the hdf5 file. If omitted, all demos in the file (or under the @filter_by_attribute filter key) are used.

normalize_obs()#

Computes a dataset-wide mean and standard deviation for the observations (per dimension and per obs key) and returns it.

robomimic.utils.env_utils module#

This file contains several utility functions for working with environment wrappers provided by the repository, and with environment metadata saved in dataset files.

robomimic.utils.env_utils.check_env_type(type_to_check, env_meta=None, env_type=None, env=None)#

Checks whether the passed env_meta, env_type, or env is of type @type_to_check. Type corresponds to EB.EnvType.

Parameters
  • type_to_check (int) – type to check equality against

  • env_meta (dict) –

    environment metadata, which should be loaded from demonstration hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see @FileUtils.env_from_checkpoint). Contains 3 keys:

    ’env_name’

    name of environment

    ’type’

    type of environment, should be a value in EB.EnvType

    ’env_kwargs’

    dictionary of keyword arguments to pass to environment constructor

  • env_type (int) – the type of environment, which determines the env class that will be instantiated. Should be a value in EB.EnvType.

  • env (instance of EB.EnvBase) – environment instance

robomimic.utils.env_utils.create_env(env_type, env_name, render=False, render_offscreen=False, use_image_obs=False, **kwargs)#

Create environment.

Parameters
  • env_type (int) – the type of environment, which determines the env class that will be instantiated. Should be a value in EB.EnvType.

  • env_name (str) – name of environment

  • render (bool) – if True, environment supports on-screen rendering

  • render_offscreen (bool) – if True, environment supports off-screen rendering. This is forced to be True if @use_image_obs is True.

  • use_image_obs (bool) – if True, environment is expected to render rgb image observations on every env.step call. Set this to False for efficiency reasons, if image observations are not required.

robomimic.utils.env_utils.create_env_for_data_processing(env_meta, camera_names, camera_height, camera_width, reward_shaping)#

Creates environment for processing dataset observations and rewards.

Parameters
  • env_meta (dict) –

    environment metadata, which should be loaded from demonstration hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see @FileUtils.env_from_checkpoint). Contains 3 keys:

    ’env_name’

    name of environment

    ’type’

    type of environment, should be a value in EB.EnvType

    ’env_kwargs’

    dictionary of keyword arguments to pass to environment constructor

  • camera_names (list of st) – list of camera names that correspond to image observations

  • camera_height (int) – camera height for all cameras

  • camera_width (int) – camera width for all cameras

  • reward_shaping (bool) – if True, use shaped environment rewards, else use sparse task completion rewards

robomimic.utils.env_utils.create_env_from_metadata(env_meta, env_name=None, render=False, render_offscreen=False, use_image_obs=False)#

Create environment.

Parameters
  • env_meta (dict) –

    environment metadata, which should be loaded from demonstration hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see @FileUtils.env_from_checkpoint). Contains 3 keys:

    ’env_name’

    name of environment

    ’type’

    type of environment, should be a value in EB.EnvType

    ’env_kwargs’

    dictionary of keyword arguments to pass to environment constructor

  • env_name (str) – name of environment. Only needs to be provided if making a different environment from the one in @env_meta.

  • render (bool) – if True, environment supports on-screen rendering

  • render_offscreen (bool) – if True, environment supports off-screen rendering. This is forced to be True if @use_image_obs is True.

  • use_image_obs (bool) – if True, environment is expected to render rgb image observations on every env.step call. Set this to False for efficiency reasons, if image observations are not required.

robomimic.utils.env_utils.get_env_class(env_meta=None, env_type=None, env=None)#

Return env class from either env_meta, env_type, or env. Note the use of lazy imports - this ensures that modules are only imported when the corresponding env type is requested. This can be useful in practice. For example, a training run that only requires access to gym environments should not need to import robosuite.

Parameters
  • env_meta (dict) –

    environment metadata, which should be loaded from demonstration hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see @FileUtils.env_from_checkpoint). Contains 3 keys:

    ’env_name’

    name of environment

    ’type’

    type of environment, should be a value in EB.EnvType

    ’env_kwargs’

    dictionary of keyword arguments to pass to environment constructor

  • env_type (int) – the type of environment, which determines the env class that will be instantiated. Should be a value in EB.EnvType.

  • env (instance of EB.EnvBase) – environment instance

robomimic.utils.env_utils.get_env_type(env_meta=None, env_type=None, env=None)#

Helper function to get env_type from a variety of inputs.

Parameters
  • env_meta (dict) –

    environment metadata, which should be loaded from demonstration hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see @FileUtils.env_from_checkpoint). Contains 3 keys:

    ’env_name’

    name of environment

    ’type’

    type of environment, should be a value in EB.EnvType

    ’env_kwargs’

    dictionary of keyword arguments to pass to environment constructor

  • env_type (int) – the type of environment, which determines the env class that will be instantiated. Should be a value in EB.EnvType.

  • env (instance of EB.EnvBase) – environment instance

robomimic.utils.env_utils.is_robosuite_env(env_meta=None, env_type=None, env=None)#

Determines whether the environment is a robosuite environment. Accepts either env_meta, env_type, or env.

robomimic.utils.file_utils module#

A collection of utility functions for working with files, such as reading metadata from demonstration datasets, loading model checkpoints, or downloading dataset files.

class robomimic.utils.file_utils.DownloadProgressBar(*_, **__)#

Bases: tqdm.std.tqdm

update_to(b=1, bsize=1, tsize=None)#
robomimic.utils.file_utils.algo_name_from_checkpoint(ckpt_path=None, ckpt_dict=None)#

Return algorithm name that was used to train a checkpoint or loaded model dictionary.

Parameters
  • ckpt_path (str) – Path to checkpoint file. Only needed if not providing @ckpt_dict.

  • ckpt_dict (dict) – Loaded model checkpoint dictionary. Only needed if not providing @ckpt_path.

Returns

algorithm name

ckpt_dict (dict): loaded checkpoint dictionary (convenient to avoid

re-loading checkpoint from disk multiple times)

Return type

algo_name (str)

robomimic.utils.file_utils.config_from_checkpoint(algo_name=None, ckpt_path=None, ckpt_dict=None, verbose=False)#

Helper function to restore config from a checkpoint file or loaded model dictionary.

Parameters
  • algo_name (str) – Algorithm name.

  • ckpt_path (str) – Path to checkpoint file. Only needed if not providing @ckpt_dict.

  • ckpt_dict (dict) – Loaded model checkpoint dictionary. Only needed if not providing @ckpt_path.

  • verbose (bool) – if True, include print statements

Returns

Raw loaded configuration, without properties replaced.

ckpt_dict (dict): loaded checkpoint dictionary (convenient to avoid

re-loading checkpoint from disk multiple times)

Return type

config (dict)

robomimic.utils.file_utils.create_hdf5_filter_key(hdf5_path, demo_keys, key_name)#

Creates a new hdf5 filter key in hdf5 file @hdf5_path with name @key_name that corresponds to the demonstrations @demo_keys. Filter keys are generally useful to create named subsets of the demonstrations in an hdf5, making it easy to train, test, or report statistics on a subset of the trajectories in a file.

Returns the list of episode lengths that correspond to the filtering.

Parameters
  • hdf5_path (str) – path to hdf5 file

  • demo_keys ([str]) – list of demonstration keys which should correspond to this filter key. For example, [“demo_0”, “demo_1”].

  • key_name (str) – name of filter key to create

Returns

list of episode lengths that corresponds to

each demonstration in the new filter key

Return type

ep_lengths ([int])

robomimic.utils.file_utils.download_url(url, download_dir, check_overwrite=True)#

First checks that @url is reachable, then downloads the file at that url into the directory specified by @download_dir. Prints a progress bar during the download using tqdm.

Modified from https://github.com/tqdm/tqdm#hooks-and-callbacks, and https://stackoverflow.com/a/53877507.

Parameters
  • url (str) – url string

  • download_dir (str) – path to directory where file should be downloaded

  • check_overwrite (bool) – if True, will sanity check the download fpath to make sure a file of that name doesn’t already exist there

robomimic.utils.file_utils.env_from_checkpoint(ckpt_path=None, ckpt_dict=None, env_name=None, render=False, render_offscreen=False, verbose=False)#

Creates an environment using the metadata saved in a checkpoint.

Parameters
  • ckpt_path (str) – Path to checkpoint file. Only needed if not providing @ckpt_dict.

  • ckpt_dict (dict) – Loaded model checkpoint dictionary. Only needed if not providing @ckpt_path.

  • env_name (str) – if provided, override environment name saved in checkpoint

  • render (bool) – if True, environment supports on-screen rendering

  • render_offscreen (bool) – if True, environment supports off-screen rendering. This is forced to be True if saved model uses image observations.

Returns

environment created using checkpoint

ckpt_dict (dict): loaded checkpoint dictionary (convenient to avoid

re-loading checkpoint from disk multiple times)

Return type

env (EnvBase instance)

robomimic.utils.file_utils.get_env_metadata_from_dataset(dataset_path)#

Retrieves env metadata from dataset.

Parameters

dataset_path (str) – path to dataset

Returns

environment metadata. Contains 3 keys:

’env_name’

name of environment

’type’

type of environment, should be a value in EB.EnvType

’env_kwargs’

dictionary of keyword arguments to pass to environment constructor

Return type

env_meta (dict)

robomimic.utils.file_utils.get_shape_metadata_from_dataset(dataset_path, all_obs_keys=None, verbose=False)#

Retrieves shape metadata from dataset.

Parameters
  • dataset_path (str) – path to dataset

  • all_obs_keys (list) – list of all modalities used by the model. If not provided, all modalities present in the file are used.

  • verbose (bool) – if True, include print statements

Returns

shape metadata. Contains the following keys:

’ac_dim’

action space dimension

’all_shapes’

dictionary that maps observation key string to shape

’all_obs_keys’

list of all observation modalities used

’use_images’

bool, whether or not image modalities are present

Return type

shape_meta (dict)

robomimic.utils.file_utils.load_dict_from_checkpoint(ckpt_path)#

Load checkpoint dictionary from a checkpoint file.

Parameters

ckpt_path (str) – Path to checkpoint file.

Returns

Loaded checkpoint dictionary.

Return type

ckpt_dict (dict)

robomimic.utils.file_utils.maybe_dict_from_checkpoint(ckpt_path=None, ckpt_dict=None)#

Utility function for the common use case where either an ckpt path or a ckpt_dict is provided. This is a no-op if ckpt_dict is not None, otherwise it loads the model dict from the ckpt path.

Parameters
  • ckpt_path (str) – Path to checkpoint file. Only needed if not providing @ckpt_dict.

  • ckpt_dict (dict) – Loaded model checkpoint dictionary. Only needed if not providing @ckpt_path.

Returns

Loaded checkpoint dictionary.

Return type

ckpt_dict (dict)

robomimic.utils.file_utils.policy_from_checkpoint(device=None, ckpt_path=None, ckpt_dict=None, verbose=False)#

This function restores a trained policy from a checkpoint file or loaded model dictionary.

Parameters
  • device (torch.device) – if provided, put model on this device

  • ckpt_path (str) – Path to checkpoint file. Only needed if not providing @ckpt_dict.

  • ckpt_dict (dict) – Loaded model checkpoint dictionary. Only needed if not providing @ckpt_path.

  • verbose (bool) – if True, include print statements

Returns

instance of Algo that has the saved weights from

the checkpoint file, and also acts as a policy that can easily interact with an environment in a training loop

ckpt_dict (dict): loaded checkpoint dictionary (convenient to avoid

re-loading checkpoint from disk multiple times)

Return type

model (RolloutPolicy)

robomimic.utils.file_utils.update_config(cfg)#

Updates the config for backwards-compatibility if it uses outdated configurations.

See https://github.com/ARISE-Initiative/robomimic/releases/tag/v0.2.0 for more info.

Parameters

cfg (dict) – Raw dictionary of config values

robomimic.utils.file_utils.url_is_alive(url)#

Checks that a given URL is reachable. From https://gist.github.com/dehowell/884204.

Parameters

url (str) – url string

Returns

True if url is reachable, False otherwise

Return type

is_alive (bool)

robomimic.utils.hyperparam_utils module#

A collection of utility functions and classes for generating config jsons for hyperparameter sweeps.

class robomimic.utils.hyperparam_utils.ConfigGenerator(base_config_file, script_file)#

Bases: object

Useful class to keep track of hyperparameters to sweep, and to generate the json configs for each experiment run.

add_param(key, name, group, values, value_names=None)#

Add parameter to the hyperparameter sweep.

Parameters
  • key (str) – location of parameter in the config, using hierarchical key format (ex. train/data = config.train.data)

  • name (str) – name, as it will appear in the experiment name

  • group (int) – group id - parameters with the same ID have their values swept together

  • values (list) – list of values to sweep over for this parameter

  • value_names ([str]) – if provided, strings to use in experiment name for each value, instead of the parameter value. This is helpful for parameters that may have long or large values (for example, dataset path).

generate()#

Generates json configs for the hyperparameter sweep using attributes @self.parameters, @self.base_config_file, and @self.script_file, all of which should have first been set externally by calling @add_param, @set_base_config_file, and @set_script_file.

robomimic.utils.hyperparam_utils.get_value_for_key(dic, k)#

Get value for nested dictionary with levels denoted by “/” or “.”. For example, if @k is “a/b”, then this function returns @dic[“a”][“b”].

Parameters
  • dic (dict) – a nested dictionary

  • k (str) – a single string meant to index several levels down into the nested dictionary, where levels can be denoted by “/” or by “.”.

Returns

the nested dictionary value for the provided key

Return type

val

robomimic.utils.hyperparam_utils.load_json(json_file, verbose=True)#

Simple utility function to load a json file as a dict.

Parameters
  • json_file (str) – path to json file to load

  • verbose (bool) – if True, pretty print the loaded json dictionary

Returns

json dictionary

Return type

config (dict)

robomimic.utils.hyperparam_utils.save_json(config, json_file)#

Simple utility function to save a dictionary to a json file on disk.

Parameters
  • config (dict) – dictionary to save

  • json_file (str) – path to json file to write

robomimic.utils.hyperparam_utils.set_value_for_key(dic, k, v)#

Set value for hierarchical dictionary with levels denoted by “/” or “.”.

Parameters
  • dic (dict) – a nested dictionary

  • k (str) – a single string meant to index several levels down into the nested dictionary, where levels can be denoted by “/” or by “.”.

  • v – the value to set at the provided key

robomimic.utils.log_utils module#

This file contains utility classes and functions for logging to stdout, stderr, and to tensorboard.

class robomimic.utils.log_utils.DataLogger(log_dir, log_tb=True)#

Bases: object

Logging class to log metrics to tensorboard and/or retrieve running statistics about logged data.

close()#

Run before terminating to make sure all logs are flushed

get_stats(k)#

Computes running statistics for a particular key.

Parameters

k (str) – key string

Returns

dictionary of statistics

Return type

stats (dict)

record(k, v, epoch, data_type='scalar', log_stats=False)#

Record data with logger.

Parameters
  • k (str) – key string

  • v (float or image) – value to store

  • epoch – current epoch number

  • data_type (str) – the type of data. either ‘scalar’ or ‘image’

  • log_stats (bool) – whether to store the mean/max/min/std for all data logged so far with key k

class robomimic.utils.log_utils.PrintLogger(log_file)#

Bases: object

This class redirects print statements to both console and a file.

flush()#
write(message)#
class robomimic.utils.log_utils.custom_tqdm(*_, **__)#

Bases: tqdm.std.tqdm

Small extension to tqdm to make a few changes from default behavior. By default tqdm writes to stderr. Instead, we change it to write to stdout.

robomimic.utils.log_utils.silence_stdout()#

This contextmanager will redirect stdout so that nothing is printed to the terminal. Taken from the link below:

https://stackoverflow.com/questions/6735917/redirecting-stdout-to-nothing-in-python

robomimic.utils.loss_utils module#

This file contains a collection of useful loss functions for use with torch tensors.

robomimic.utils.loss_utils.KLD_0_1_loss(mu, logvar)#

KL divergence loss. Computes D_KL( N(mu, sigma) || N(0, 1) ). Note that this function averages across the batch dimension, but sums across dimension.

Parameters
  • mu (torch.Tensor) – mean tensor of shape (B, D)

  • logvar (torch.Tensor) – logvar tensor of shape (B, D)

Returns

KL divergence loss between the input gaussian distribution

and N(0, 1)

Return type

loss (torch.Tensor)

robomimic.utils.loss_utils.KLD_gaussian_loss(mu_1, logvar_1, mu_2, logvar_2)#

KL divergence loss between two Gaussian distributions. This function computes the average loss across the batch.

Parameters
  • mu_1 (torch.Tensor) – first means tensor of shape (B, D)

  • logvar_1 (torch.Tensor) – first logvars tensor of shape (B, D)

  • mu_2 (torch.Tensor) – second means tensor of shape (B, D)

  • logvar_2 (torch.Tensor) – second logvars tensor of shape (B, D)

Returns

KL divergence loss between the two gaussian distributions

Return type

loss (torch.Tensor)

robomimic.utils.loss_utils.cosine_loss(preds, labels)#

Cosine loss between two tensors.

Parameters
  • preds (torch.Tensor) – torch tensor

  • labels (torch.Tensor) – torch tensor

Returns

cosine loss

Return type

loss (torch.Tensor)

robomimic.utils.loss_utils.log_mean_exp(x, dim)#

Compute the log(mean(exp(x), dim)) in a numerically stable manner. Adapted from CS 236 at Stanford.

Parameters
  • x (torch.Tensor) – a tensor

  • dim (int) – dimension along which mean is computed

Returns

log(mean(exp(x), dim))

Return type

y (torch.Tensor)

robomimic.utils.loss_utils.log_normal(x, m, v)#

Log probability of tensor x under diagonal multivariate normal with mean m and variance v. The last dimension of the tensors is treated as the dimension of the Gaussian distribution - all other dimensions are treated as independent Gaussians. Adapted from CS 236 at Stanford.

Parameters
  • x (torch.Tensor) – tensor with shape (B, …, D)

  • m (torch.Tensor) – means tensor with shape (B, …, D) or (1, …, D)

  • v (torch.Tensor) – variances tensor with shape (B, …, D) or (1, …, D)

Returns

log probabilities of shape (B, …)

Return type

log_prob (torch.Tensor)

robomimic.utils.loss_utils.log_normal_mixture(x, m, v, w=None, log_w=None)#

Log probability of tensor x under a uniform mixture of Gaussians. Adapted from CS 236 at Stanford.

Parameters
  • x (torch.Tensor) – tensor with shape (B, D)

  • m (torch.Tensor) – means tensor with shape (B, M, D) or (1, M, D), where M is number of mixture components

  • v (torch.Tensor) – variances tensor with shape (B, M, D) or (1, M, D) where M is number of mixture components

  • w (torch.Tensor) – weights tensor - if provided, should be shape (B, M) or (1, M)

  • log_w (torch.Tensor) – log-weights tensor - if provided, should be shape (B, M) or (1, M)

Returns

log probabilities of shape (B,)

Return type

log_prob (torch.Tensor)

robomimic.utils.loss_utils.log_sum_exp(x, dim=0)#

Compute the log(sum(exp(x), dim)) in a numerically stable manner. Adapted from CS 236 at Stanford.

Parameters
  • x (torch.Tensor) – a tensor

  • dim (int) – dimension along which sum is computed

Returns

log(sum(exp(x), dim))

Return type

y (torch.Tensor)

robomimic.utils.loss_utils.project_values_onto_atoms(values, probabilities, atoms)#

Project the categorical distribution given by @probabilities on the grid of values given by @values onto a grid of values given by @atoms. This is useful when computing a bellman backup where the backed up values from the original grid will not be in the original support, requiring L2 projection.

Each value in @values has a corresponding probability in @probabilities - this probability mass is shifted to the closest neighboring grid points in @atoms in proportion. For example, if the value in question is 0.2, and the neighboring atoms are 0 and 1, then 0.8 of the probability weight goes to atom 0 and 0.2 of the probability weight will go to 1.

Adapted from https://github.com/deepmind/acme/blob/master/acme/tf/losses/distributional.py#L42

Parameters
  • values – value grid to project, of shape (batch_size, n_atoms)

  • probabilities – probabilities for categorical distribution on @values, shape (batch_size, n_atoms)

  • atoms – value grid to project onto, of shape (n_atoms,) or (1, n_atoms)

Returns

new probability vectors that correspond to the L2 projection of the categorical distribution onto @atoms

robomimic.utils.macros module#

Set of global variables shared across robomimic

robomimic.utils.obs_utils module#

A collection of utilities for working with observation dictionaries and different kinds of modalities such as images.

class robomimic.utils.obs_utils.DepthModality#

Bases: robomimic.utils.obs_utils.Modality

Modality for depth observations

name = 'depth'#
class robomimic.utils.obs_utils.ImageModality#

Bases: robomimic.utils.obs_utils.Modality

Modality for RGB image observations

name = 'rgb'#
class robomimic.utils.obs_utils.LowDimModality#

Bases: robomimic.utils.obs_utils.Modality

Modality for low dimensional observations

name = 'low_dim'#
class robomimic.utils.obs_utils.Modality#

Bases: object

Observation Modality class to encapsulate necessary functions needed to process observations of this modality

classmethod add_keys(keys)#

Adds the observation @keys associated with this modality to the current set of keys.

Parameters

keys (list or set) – observation keys to add to associate with this modality

keys = {}#
name = None#
classmethod process_obs(obs)#

Prepares an observation @obs of this modality for network input.

Parameters

obs (np.array or torch.Tensor) – raw observation, which may include a leading batch dimension

Returns

processed observation

Return type

np.array or torch.Tensor

classmethod process_obs_from_dict(obs_dict, inplace=True)#

Receives a dictionary of keyword mapped observations @obs_dict, and processes the observations with keys corresponding to this modality. A copy will be made of the received dictionary unless @inplace is True

Parameters
  • obs_dict (dict) – Dictionary mapping observation keys to observations

  • inplace (bool) – If True, will modify @obs_dict in place, otherwise, will create a copy

Returns

observation dictionary with processed observations corresponding to this modality

Return type

dict

classmethod set_keys(keys)#

Sets the observation keys associated with this modality.

Parameters

keys (list or set) – observation keys to associate with this modality

classmethod set_obs_processor(processor=None)#

Sets the processor for this observation modality. If @processor is set to None, then the obs processor will use the default one (self.process_obs(…)). Otherwise, @processor should be a function to process this corresponding observation modality.

Parameters

processor (function or None) – If not None, should be function that takes in either a np.array or torch.Tensor and output the processed array / tensor. If None, will reset to the default processor (self.process_obs(…))

classmethod set_obs_unprocessor(unprocessor=None)#

Sets the unprocessor for this observation modality. If @unprocessor is set to None, then the obs unprocessor will use the default one (self.unprocess_obs(…)). Otherwise, @unprocessor should be a function to process this corresponding observation modality.

Parameters

unprocessor (function or None) – If not None, should be function that takes in either a np.array or torch.Tensor and output the unprocessed array / tensor. If None, will reset to the default unprocessor (self.unprocess_obs(…))

classmethod unprocess_obs(obs)#

Prepares an observation @obs of this modality for deployment.

Parameters

obs (np.array or torch.Tensor) – processed observation, which may include a leading batch dimension

Returns

unprocessed observation

Return type

np.array or torch.Tensor

class robomimic.utils.obs_utils.ObservationKeyToModalityDict#

Bases: dict

Custom dictionary class with the sole additional purpose of automatically registering new “keys” at runtime without breaking. This is mainly for backwards compatibility, where certain keys such as “latent”, “actions”, etc. are used automatically by certain models (e.g.: VAEs) but were never specified by the user externally in their config. Thus, this dictionary will automatically handle those keys by implicitly associating them with the low_dim modality.

class robomimic.utils.obs_utils.ScanModality#

Bases: robomimic.utils.obs_utils.Modality

Modality for scan observations

name = 'scan'#
robomimic.utils.obs_utils.batch_image_chw_to_hwc(im)#

Inverse of channel swap in @batch_image_hwc_to_chw.

Parameters

im (np.array or torch.Tensor) – image of shape (batch, channel, height, width) or (channel, height, width)

Returns

image of shape (batch, height, width, channel)

or (height, width, channel)

Return type

im (np.array or torch.Tensor)

robomimic.utils.obs_utils.batch_image_hwc_to_chw(im)#

Channel swap for images - useful for preparing images for torch training.

Parameters

im (np.array or torch.Tensor) – image of shape (batch, height, width, channel) or (height, width, channel)

Returns

image of shape (batch, channel, height, width)

or (channel, height, width)

Return type

im (np.array or torch.Tensor)

robomimic.utils.obs_utils.center_crop(im, t_h, t_w)#

Takes a center crop of an image.

Parameters
  • im (np.array or torch.Tensor) – image of shape (…, height, width, channel)

  • t_h (int) – height of crop

  • t_w (int) – width of crop

Returns

center cropped image

Return type

im (np.array or torch.Tensor)

robomimic.utils.obs_utils.crop_image_from_indices(images, crop_indices, crop_height, crop_width)#

Crops images at the locations specified by @crop_indices. Crops will be taken across all channels.

Parameters
  • images (torch.Tensor) – batch of images of shape […, C, H, W]

  • crop_indices (torch.Tensor) – batch of indices of shape […, N, 2] where N is the number of crops to take per image and each entry corresponds to the pixel height and width of where to take the crop. Note that the indices can also be of shape […, 2] if only 1 crop should be taken per image. Leading dimensions must be consistent with @images argument. Each index specifies the top left of the crop. Values must be in range [0, H - CH - 1] x [0, W - CW - 1] where H and W are the height and width of @images and CH and CW are @crop_height and @crop_width.

  • crop_height (int) – height of crop to take

  • crop_width (int) – width of crop to take

Returns

cropped images of shape […, C, @crop_height, @crop_width]

Return type

crops (torch.Tesnor)

robomimic.utils.obs_utils.get_processed_shape(obs_modality, input_shape)#

Given observation modality @obs_modality and expected inputs of shape @input_shape (excluding batch dimension), return the expected processed observation shape resulting from process_{obs_modality}.

Parameters
  • obs_modality (str) – Observation modality to use (e.g.: low_dim, rgb, depth, etc…)

  • input_shape (list of int) – Expected input dimensions, excluding the batch dimension

Returns

expected processed input shape

Return type

list of int

robomimic.utils.obs_utils.has_modality(modality, obs_keys)#

Returns True if @modality is present in the list of observation keys @obs_keys.

Parameters
  • modality (str) – modality to check for, e.g.: rgb, depth, etc.

  • obs_keys (list) – list of observation keys

robomimic.utils.obs_utils.initialize_default_obs_encoder(obs_encoder_config)#

Initializes the default observation encoder kwarg information to be used by all networks if no values are manually specified at runtime.

Parameters

obs_encoder_config (Config) – Observation encoder config to use. Should be equivalent to config.observation.encoder

robomimic.utils.obs_utils.initialize_obs_modality_mapping_from_dict(modality_mapping)#

This function is an alternative to @initialize_obs_utils_with_obs_specs, that allows manually setting of modalities. NOTE: Only one of these should be called at runtime – not both! (Note that all training scripts that use a config)

automatically handle obs modality mapping, so using this function is usually unnecessary)

Parameters

modality_mapping (dict) – Maps modality string names (e.g.: rgb, low_dim, etc.) to a list of observation keys that should belong to that modality

robomimic.utils.obs_utils.initialize_obs_utils_with_config(config)#

Utility function to parse config and call @initialize_obs_utils_with_obs_specs and @initialize_default_obs_encoder_kwargs with the correct arguments.

Parameters

config (BaseConfig instance) – config object

robomimic.utils.obs_utils.initialize_obs_utils_with_obs_specs(obs_modality_specs)#

This function should be called before using any observation key-specific functions in this file, in order to make sure that all utility functions are aware of the observation modalities (e.g. which ones are low-dimensional, which ones are rgb, etc.).

It constructs two dictionaries: (1) that map observation modality (e.g. low_dim, rgb) to a list of observation keys under that modality, and (2) that maps the inverse, specific observation keys to their corresponding observation modality.

Input should be a nested dictionary (or list of such dicts) with the following structure:

obs_variant (str):

obs_modality (str): observation keys (list) …

Example

{
“obs”: {

“low_dim”: [“robot0_eef_pos”, “robot0_eef_quat”], “rgb”: [“agentview_image”, “robot0_eye_in_hand”],

} “goal”: {

“low_dim”: [“robot0_eef_pos”], “rgb”: [“agentview_image”]

}

}

In the example, raw observations consist of low-dim and rgb modalities, with the robot end effector pose under low-dim, and the agentview and wrist camera images under rgb, while goal observations also consist of low-dim and rgb modalities, with a subset of the raw observation keys per modality.

Parameters

obs_modality_specs (dict or list) – A nested dictionary (see docstring above for an example) or a list of nested dictionaries. Accepting a list as input makes it convenient for situations where multiple modules may each have their own modality spec.

robomimic.utils.obs_utils.key_is_obs_modality(key, obs_modality)#

Check if observation key corresponds to modality @obs_modality.

Parameters
  • key (str) – obs key name to check

  • obs_modality (str) – observation modality - e.g.: “low_dim”, “rgb”

robomimic.utils.obs_utils.normalize_obs(obs_dict, obs_normalization_stats)#

Normalize observations using the provided “mean” and “std” entries for each observation key. The observation dictionary will be modified in-place.

Parameters
  • obs_dict (dict) – dictionary mapping observation key to np.array or torch.Tensor. Leading batch dimensions are optional.

  • obs_normalization_stats (dict) – this should map observation keys to dicts with a “mean” and “std” of shape (1, …) where … is the default shape for the observation.

Returns

obs dict with normalized observation arrays

Return type

obs_dict (dict)

robomimic.utils.obs_utils.obs_encoder_kwargs_from_config(obs_encoder_config)#

Generate a set of args used to create visual backbones for networks from the observation encoder config.

Parameters

obs_encoder_config (Config) – Config object containing relevant encoder information. Should be equivalent to config.observation.encoder

Returns

Processed encoder kwargs

Return type

dict

robomimic.utils.obs_utils.process_frame(frame, channel_dim, scale)#

Given frame fetched from dataset, process for network input. Converts array to float (from uint8), normalizes pixels from range [0, @scale] to [0, 1], and channel swaps from (H, W, C) to (C, H, W).

Parameters
  • frame (np.array or torch.Tensor) – frame array

  • channel_dim (int) – Number of channels to sanity check for

  • scale (float) – Value to normalize inputs by

Returns

processed frame

Return type

processed_frame (np.array or torch.Tensor)

robomimic.utils.obs_utils.process_obs(obs, obs_modality=None, obs_key=None)#

Process observation @obs corresponding to @obs_modality modality (or implicitly inferred from @obs_key) to prepare for network input.

Note that either obs_modality OR obs_key must be specified!

If both are specified, obs_key will override obs_modality

Parameters
  • obs (np.array or torch.Tensor) – Observation to process. Leading batch dimension is optional

  • obs_modality (str) – Observation modality (e.g.: depth, image, low_dim, etc.)

  • obs_key (str) – Name of observation from which to infer @obs_modality

Returns

processed observation

Return type

processed_obs (np.array or torch.Tensor)

robomimic.utils.obs_utils.process_obs_dict(obs_dict)#

Process observations in observation dictionary to prepare for network input.

Parameters

obs_dict (dict) – dictionary mapping observation keys to np.array or torch.Tensor. Leading batch dimensions are optional.

Returns

dictionary where observation keys have been processed by their corresponding processors

Return type

new_dict (dict)

robomimic.utils.obs_utils.register_encoder_core(target_class)#
robomimic.utils.obs_utils.register_obs_key(target_class)#
robomimic.utils.obs_utils.register_randomizer(target_class)#
robomimic.utils.obs_utils.repeat_and_stack_observation(obs_dict, n)#

Given an observation dictionary and a desired repeat value @n, this function will return a new observation dictionary where each modality is repeated @n times and the copies are stacked in the first dimension.

For example, if a batch of 3 observations comes in, and n is 2, the output will look like [ob1; ob1; ob2; ob2; ob3; ob3] in each modality.

Parameters
  • obs_dict (dict) – dictionary mapping observation key to np.array or torch.Tensor. Leading batch dimensions are optional.

  • n (int) – number to repeat by

Returns

repeated obs dict

Return type

repeat_obs_dict (dict)

robomimic.utils.obs_utils.sample_random_image_crops(images, crop_height, crop_width, num_crops, pos_enc=False)#

For each image, randomly sample @num_crops crops of size (@crop_height, @crop_width), from @images.

Parameters
  • images (torch.Tensor) – batch of images of shape […, C, H, W]

  • crop_height (int) – height of crop to take

  • crop_width (int) – width of crop to take

  • num_crops (n) – number of crops to sample

  • pos_enc (bool) – if True, also add 2 channels to the outputs that gives a spatial encoding of the original source pixel locations. This means that the output crops will contain information about where in the source image it was sampled from.

Returns

crops of shape (…, @num_crops, C, @crop_height, @crop_width)

if @pos_enc is False, otherwise (…, @num_crops, C + 2, @crop_height, @crop_width)

crop_inds (torch.Tensor): sampled crop indices of shape (…, N, 2)

Return type

crops (torch.Tensor)

robomimic.utils.obs_utils.unprocess_frame(frame, channel_dim, scale)#

Given frame prepared for network input, prepare for saving to dataset. Inverse of @process_frame.

Parameters
  • frame (np.array or torch.Tensor) – frame array

  • channel_dim (int) – What channel dimension should be (used for sanity check)

  • scale (float) – Scaling factor to apply during denormalization

Returns

frame passed through

inverse operation of @process_frame

Return type

unprocessed_frame (np.array or torch.Tensor)

robomimic.utils.obs_utils.unprocess_obs(obs, obs_modality=None, obs_key=None)#

Prepare observation @obs corresponding to @obs_modality modality (or implicitly inferred from @obs_key) to prepare for deployment.

Note that either obs_modality OR obs_key must be specified!

If both are specified, obs_key will override obs_modality

Parameters
  • obs (np.array or torch.Tensor) – Observation to unprocess. Leading batch dimension is optional

  • obs_modality (str) – Observation modality (e.g.: depth, image, low_dim, etc.)

  • obs_key (str) – Name of observation from which to infer @obs_modality

Returns

unprocessed observation

Return type

unprocessed_obs (np.array or torch.Tensor)

robomimic.utils.obs_utils.unprocess_obs_dict(obs_dict)#

Prepare processed observation dictionary for saving to dataset. Inverse of @process_obs.

Parameters

obs_dict (dict) – dictionary mapping observation keys to np.array or torch.Tensor. Leading batch dimensions are optional.

Returns

dictionary where observation keys have been unprocessed by

their respective unprocessor methods

Return type

new_dict (dict)

robomimic.utils.python_utils module#

Set of general purpose utility functions for easier interfacing with Python API

robomimic.utils.python_utils.extract_class_init_kwargs_from_dict(cls, dic, copy=False, verbose=False)#

Helper function to return a dictionary of key-values that specifically correspond to @cls class’s __init__ constructor method, from @dic which may or may not contain additional, irrelevant kwargs.

Note that @dic may possibly be missing certain kwargs as specified by cls.__init__. No error will be raised.

Parameters
  • cls (object) – Class from which to grab __init__ kwargs that will be be used as filtering keys for @dic

  • dic (dict) – Dictionary containing multiple key-values

  • copy (bool) – If True, will deepcopy all values corresponding to the specified @keys

  • verbose (bool) – If True (or if macro DEBUG is True), then will print out mismatched keys

Returns

Extracted subset dictionary possibly containing only the specified keys from cls.__init__ and their

corresponding values

Return type

dict

robomimic.utils.python_utils.extract_subset_dict(dic, keys, copy=False)#

Helper function to extract a subset of dictionary key-values from a current dictionary. Optionally (deep)copies the values extracted from the original @dic if @copy is True.

Parameters
  • dic (dict) – Dictionary containing multiple key-values

  • keys (Iterable) – Specific keys to extract from @dic. If the key doesn’t exist in @dic, then the key is skipped

  • copy (bool) – If True, will deepcopy all values corresponding to the specified @keys

Returns

Extracted subset dictionary containing only the specified @keys and their corresponding values

Return type

dict

robomimic.utils.python_utils.get_class_init_kwargs(cls)#

Helper function to return a list of all valid keyword arguments (excluding “self”) for the given @cls class.

Parameters

cls (object) – Class from which to grab __init__ kwargs

Returns

All keyword arguments (excluding “self”) specified by @cls __init__ constructor method

Return type

list

robomimic.utils.tensor_utils module#

A collection of utilities for working with nested tensor structures consisting of numpy arrays and torch tensors.

robomimic.utils.tensor_utils.assert_size_at_dim(x, size, dim, msg)#

Ensure that arrays and tensors in nested dictionary or list or tuple have size @size in dim @dim.

Parameters
  • x (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • size (int) – size that tensors should have at @dim

  • dim (int) – dimension to check

robomimic.utils.tensor_utils.assert_size_at_dim_single(x, size, dim, msg)#

Ensure that array or tensor @x has size @size in dim @dim.

Parameters
  • x (np.ndarray or torch.Tensor) – input array or tensor

  • size (int) – size that tensors should have at @dim

  • dim (int) – dimension to check

  • msg (str) – text to display if assertion fails

robomimic.utils.tensor_utils.clone(x)#

Clones all torch tensors and numpy arrays in nested dictionary or list or tuple and returns a new nested structure.

Parameters

x (dict or list or tuple) – a possibly nested dictionary or list or tuple

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.contiguous(x)#

Makes all torch tensors and numpy arrays contiguous in nested dictionary or list or tuple and returns a new nested structure.

Parameters

x (dict or list or tuple) – a possibly nested dictionary or list or tuple

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.detach(x)#

Detaches all torch tensors in nested dictionary or list or tuple and returns a new nested structure.

Parameters

x (dict or list or tuple) – a possibly nested dictionary or list or tuple

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.expand_at(x, size, dim)#

Expand all tensors in nested dictionary or list or tuple at a single dimension @dim by @size.

Parameters
  • x (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • size (int) – size to expand

  • dim (int) – dimension to expand

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.expand_at_single(x, size, dim)#

Expand a tensor at a single dimension @dim by @size

Parameters
  • x (torch.Tensor) – input tensor

  • size (int) – size to expand

  • dim (int) – dimension to expand

Returns

expanded tensor

Return type

y (torch.Tensor)

robomimic.utils.tensor_utils.flatten(x, begin_axis=1)#

Flatten all tensors in nested dictionary or list or tuple, from @begin_axis onwards.

Parameters
  • x (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • begin_axis (int) – which axis to flatten from

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.flatten_nested_dict_list(d, parent_key='', sep='_', item_key='')#

Flatten a nested dict or list to a list.

For example, given a dict {

a: 1 b: {

c: 2

} c: 3

}

the function would return [(a, 1), (b_c, 2), (c, 3)]

Parameters
  • d (dict, list) – a nested dict or list to be flattened

  • parent_key (str) – recursion helper

  • sep (str) – separator for nesting keys

  • item_key (str) – recursion helper

Returns

a list of (key, value) tuples

Return type

list

robomimic.utils.tensor_utils.flatten_single(x, begin_axis=1)#

Flatten a tensor in all dimensions from @begin_axis onwards.

Parameters
  • x (torch.Tensor) – tensor to flatten

  • begin_axis (int) – which axis to flatten from

Returns

flattened tensor

Return type

y (torch.Tensor)

robomimic.utils.tensor_utils.gather_along_dim_with_dim(x, target_dim, source_dim, indices)#

Apply @gather_along_dim_with_dim_single to all tensors in a nested dictionary or list or tuple.

Parameters
  • x (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • target_dim (int) – dimension to gather values along

  • source_dim (int) – dimension to hold constant and use for gathering values from the other dimensions

  • indices (torch.Tensor) – flat index tensor with same shape as tensor @x along @source_dim

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.gather_along_dim_with_dim_single(x, target_dim, source_dim, indices)#

This function indexes out a target dimension of a tensor in a structured way, by allowing a different value to be selected for each member of a flat index tensor (@indices) corresponding to a source dimension. This can be interpreted as moving along the source dimension, using the corresponding index value in @indices to select values for all other dimensions outside of the source and target dimensions. A common use case is to gather values in target dimension 1 for each batch member (target dimension 0).

Parameters
  • x (torch.Tensor) – tensor to gather values for

  • target_dim (int) – dimension to gather values along

  • source_dim (int) – dimension to hold constant and use for gathering values from the other dimensions

  • indices (torch.Tensor) – flat index tensor with same shape as tensor @x along @source_dim

Returns

gathered tensor, with dimension @target_dim indexed out

Return type

y (torch.Tensor)

robomimic.utils.tensor_utils.gather_sequence(seq, indices)#

Given a nested dictionary or list or tuple, gathers an element from each sequence of the batch for tensors with leading dimensions [B, T, …].

Parameters
  • seq (dict or list or tuple) – a possibly nested dictionary or list or tuple with tensors of leading dimensions [B, T, …]

  • indices (torch.Tensor) – tensor indices of shape [B]

Returns

new nested dict-list-tuple with tensors of shape [B, …]

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.gather_sequence_single(seq, indices)#

Given a tensor with leading dimensions [B, T, …], gather an element from each sequence in the batch given an index for each sequence.

Parameters
  • seq (torch.Tensor) – tensor with leading dimensions [B, T, …]

  • indices (torch.Tensor) – tensor indices of shape [B]

Returns

indexed tensor of shape [B, ….]

Return type

y (torch.Tensor)

robomimic.utils.tensor_utils.get_shape(x)#

Get all shapes of arrays and tensors in nested dictionary or list or tuple.

Parameters

x (dict or list or tuple) – a possibly nested dictionary or list or tuple

Returns

new nested dict-list-tuple that contains each array or

tensor’s shape

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.index_at_time(x, ind)#

Indexes all torch tensors and numpy arrays in dimension 1 with index @ind in nested dictionary or list or tuple and returns a new nested structure.

Parameters
  • x (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • ind (int) – index

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.join_dimensions(x, begin_axis, end_axis)#

Joins all dimensions between dimensions (@begin_axis, @end_axis) into a flat dimension, for all tensors in nested dictionary or list or tuple.

Parameters
  • x (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • begin_axis (int) – begin dimension

  • end_axis (int) – end dimension

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.list_of_flat_dict_to_dict_of_list(list_of_dict)#

Helper function to go from a list of flat dictionaries to a dictionary of lists. By “flat” we mean that none of the values are dictionaries, but are numpy arrays, floats, etc.

Parameters

list_of_dict (list) – list of flat dictionaries

Returns

dictionary of lists

Return type

dict_of_list (dict)

robomimic.utils.tensor_utils.map_ndarray(x, func)#

Apply function @func to np.ndarray objects in a nested dictionary or list or tuple.

Parameters
  • x (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • func (function) – function to apply to each array

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.map_tensor(x, func)#

Apply function @func to torch.Tensor objects in a nested dictionary or list or tuple.

Parameters
  • x (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • func (function) – function to apply to each tensor

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.map_tensor_ndarray(x, tensor_func, ndarray_func)#

Apply function @tensor_func to torch.Tensor objects and @ndarray_func to np.ndarray objects in a nested dictionary or list or tuple.

Parameters
  • x (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • tensor_func (function) – function to apply to each tensor

  • ndarray_Func (function) – function to apply to each array

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.named_reduce(x, reduction, dim)#

Reduces all tensors in nested dictionary or list or tuple at a dimension using a named reduction function.

Parameters
  • x (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • reduction (str) – one of [“sum”, “max”, “mean”, “flatten”]

  • dim (int) – dimension to be reduced (or begin axis for flatten)

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.named_reduce_single(x, reduction, dim)#

Reduce tensor at a dimension by named reduction functions.

Parameters
  • x (torch.Tensor) – tensor to be reduced

  • reduction (str) – one of [“sum”, “max”, “mean”, “flatten”]

  • dim (int) – dimension to be reduced (or begin axis for flatten)

Returns

reduced tensor

Return type

y (torch.Tensor)

robomimic.utils.tensor_utils.pad_sequence(seq, padding, batched=False, pad_same=True, pad_values=None)#

Pad a nested dictionary or list or tuple of sequence tensors in the time dimension (dimension 1).

Parameters
  • seq (dict or list or tuple) – a possibly nested dictionary or list or tuple with tensors of leading dimensions [B, T, …]

  • padding (tuple) – begin and end padding, e.g. [1, 1] pads both begin and end of the sequence by 1

  • batched (bool) – if sequence has the batch dimension

  • pad_same (bool) – if pad by duplicating

  • pad_values (scalar or (ndarray, Tensor)) – values to be padded if not pad_same

Returns

padded sequence (dict or list or tuple)

robomimic.utils.tensor_utils.pad_sequence_single(seq, padding, batched=False, pad_same=True, pad_values=None)#

Pad input tensor or array @seq in the time dimension (dimension 1).

Parameters
  • seq (np.ndarray or torch.Tensor) – sequence to be padded

  • padding (tuple) – begin and end padding, e.g. [1, 1] pads both begin and end of the sequence by 1

  • batched (bool) – if sequence has the batch dimension

  • pad_same (bool) – if pad by duplicating

  • pad_values (scalar or (ndarray, Tensor)) – values to be padded if not pad_same

Returns

padded sequence (np.ndarray or torch.Tensor)

robomimic.utils.tensor_utils.recursive_dict_list_tuple_apply(x, type_func_dict)#

Recursively apply functions to a nested dictionary or list or tuple, given a dictionary of {data_type: function_to_apply}.

Parameters
  • x (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • type_func_dict (dict) – a mapping from data types to the functions to be applied for each data type.

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.repeat_by_expand_at(x, repeats, dim)#

Repeat a dimension by combining expand and reshape operations.

Parameters
  • x (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • repeats (int) – number of times to repeat the target dimension

  • dim (int) – dimension to repeat on

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.reshape_dimensions(x, begin_axis, end_axis, target_dims)#

Reshape selected dimensions for all tensors in nested dictionary or list or tuple to a target dimension.

Parameters
  • x (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • begin_axis (int) – begin dimension

  • end_axis (int) – end dimension

  • target_dims (tuple or list) – target shape for the range of dimensions (@begin_axis, @end_axis)

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.reshape_dimensions_single(x, begin_axis, end_axis, target_dims)#

Reshape selected dimensions in a tensor to a target dimension.

Parameters
  • x (torch.Tensor) – tensor to reshape

  • begin_axis (int) – begin dimension

  • end_axis (int) – end dimension

  • target_dims (tuple or list) – target shape for the range of dimensions (@begin_axis, @end_axis)

Returns

reshaped tensor

Return type

y (torch.Tensor)

robomimic.utils.tensor_utils.time_distributed(inputs, op, activation=None, inputs_as_kwargs=False, inputs_as_args=False, **kwargs)#

Apply function @op to all tensors in nested dictionary or list or tuple @inputs in both the batch (B) and time (T) dimension, where the tensors are expected to have shape [B, T, …]. Will do this by reshaping tensors to [B * T, …], passing through the op, and then reshaping outputs to [B, T, …].

Parameters
  • inputs (list or tuple or dict) – a possibly nested dictionary or list or tuple with tensors of leading dimensions [B, T, …]

  • op – a layer op that accepts inputs

  • activation – activation to apply at the output

  • inputs_as_kwargs (bool) – whether to feed input as a kwargs dict to the op

  • inputs_as_args (bool) –

  • kwargs (dict) – other kwargs to supply to the op

Returns

new nested dict-list-tuple with tensors of leading dimension [B, T].

Return type

outputs (dict or list or tuple)

robomimic.utils.tensor_utils.to_batch(x)#

Introduces a leading batch dimension of 1 for all torch tensors and numpy arrays in nested dictionary or list or tuple and returns a new nested structure.

Parameters

x (dict or list or tuple) – a possibly nested dictionary or list or tuple

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.to_device(x, device)#

Sends all torch tensors in nested dictionary or list or tuple to device @device, and returns a new nested structure.

Parameters
  • x (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • device (torch.Device) – device to send tensors to

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.to_float(x)#

Converts all torch tensors and numpy arrays in nested dictionary or list or tuple to float type entries, and returns a new nested structure.

Parameters

x (dict or list or tuple) – a possibly nested dictionary or list or tuple

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.to_list(x)#

Converts all torch tensors and numpy arrays in nested dictionary or list or tuple to a list, and returns a new nested structure. Useful for json encoding.

Parameters

x (dict or list or tuple) – a possibly nested dictionary or list or tuple

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.to_numpy(x)#

Converts all torch tensors in nested dictionary or list or tuple to numpy (and leaves existing numpy arrays as-is), and returns a new nested structure.

Parameters

x (dict or list or tuple) – a possibly nested dictionary or list or tuple

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.to_one_hot(tensor, num_class)#

Convert all tensors in nested dictionary or list or tuple to one-hot representation, assuming a certain number of total class labels.

Parameters
  • tensor (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • num_class (int) – number of classes

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.to_one_hot_single(tensor, num_class)#

Convert tensor to one-hot representation, assuming a certain number of total class labels.

Parameters
  • tensor (torch.Tensor) – tensor containing integer labels

  • num_class (int) – number of classes

Returns

tensor containing one-hot representation of labels

Return type

x (torch.Tensor)

robomimic.utils.tensor_utils.to_sequence(x)#

Introduces a time dimension of 1 at dimension 1 for all torch tensors and numpy arrays in nested dictionary or list or tuple and returns a new nested structure.

Parameters

x (dict or list or tuple) – a possibly nested dictionary or list or tuple

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.to_tensor(x)#

Converts all numpy arrays in nested dictionary or list or tuple to torch tensors (and leaves existing torch Tensors as-is), and returns a new nested structure.

Parameters

x (dict or list or tuple) – a possibly nested dictionary or list or tuple

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.to_torch(x, device)#

Converts all numpy arrays and torch tensors in nested dictionary or list or tuple to torch tensors on device @device and returns a new nested structure.

Parameters
  • x (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • device (torch.Device) – device to send tensors to

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.to_uint8(x)#

Converts all torch tensors and numpy arrays in nested dictionary or list or tuple to uint8 type entries, and returns a new nested structure.

Parameters

x (dict or list or tuple) – a possibly nested dictionary or list or tuple

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.unsqueeze(x, dim)#

Adds dimension of size 1 at dimension @dim in all torch tensors and numpy arrays in nested dictionary or list or tuple and returns a new nested structure.

Parameters
  • x (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • dim (int) – dimension

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.tensor_utils.unsqueeze_expand_at(x, size, dim)#

Unsqueeze and expand a tensor at a dimension @dim by @size.

Parameters
  • x (dict or list or tuple) – a possibly nested dictionary or list or tuple

  • size (int) – size to expand

  • dim (int) – dimension to unsqueeze and expand

Returns

new nested dict-list-tuple

Return type

y (dict or list or tuple)

robomimic.utils.test_utils module#

Utilities for testing algorithm implementations - used mainly by scripts in tests directory.

robomimic.utils.test_utils.checkpoint_path_from_test_run()#

Helper function that gets the path of a model checkpoint after a test training run is finished.

robomimic.utils.test_utils.config_from_modifier(base_config, config_modifier)#

Helper function to load a base config, modify it using the passed @config modifier function, and finalize it for training.

Parameters
  • base_config (BaseConfig instance) – starting config object that is loaded (to change algorithm config defaults), and then modified with @config_modifier

  • config_modifier (function) – function that takes a config object as input, and modifies it

robomimic.utils.test_utils.example_dataset_path()#

Path to dataset to use for testing and example purposes. It should exist under the tests/assets directory, and will be downloaded from a server if it does not exist.

robomimic.utils.test_utils.example_momart_dataset_path()#

Path to momart dataset to use for testing and example purposes. It should exist under the tests/assets directory, and will be downloaded from a server if it does not exist.

robomimic.utils.test_utils.get_base_config(algo_name)#

Base config for testing algorithms.

Parameters

algo_name (str) – name of algorithm - loads the corresponding json from the config templates directory

robomimic.utils.test_utils.maybe_remove_dir(dir_to_remove)#

Remove directory if it exists.

Parameters

dir_to_remove (str) – path to directory to remove

robomimic.utils.test_utils.maybe_remove_file(file_to_remove)#

Remove file if it exists.

Parameters

file_to_remove (str) – path to file to remove

robomimic.utils.test_utils.temp_dataset_path()#

Defines default dataset path to write to for testing.

robomimic.utils.test_utils.temp_model_dir_path()#

Path to a temporary model directory to write to for testing and example purposes.

robomimic.utils.test_utils.temp_video_path()#

Defines default video path to write to for testing.

robomimic.utils.test_utils.test_eval_agent_from_checkpoint(ckpt_path, device)#

Test loading a model from checkpoint and running a rollout with the trained agent for a small number of steps.

Parameters
  • ckpt_path (str) – path to a checkpoint pth file

  • device (torch.Device) – torch device

robomimic.utils.test_utils.test_run(base_config, config_modifier)#

Takes a base_config and config_modifier (function that modifies a passed Config object) and runs training as a test. It also takes the trained checkpoint, tries to load the policy and environment from the checkpoint, and run an evaluation rollout. Returns a string that is colored green if the run finished successfully without any issues, and colored red if an error occurred. If an error occurs, the traceback is included in the string.

Parameters
  • base_config (BaseConfig instance) – starting config object that is loaded (to change algorithm config defaults), and then modified with @config_modifier

  • config_modifier (function) – function that takes a config object as input, and modifies it

Returns

a green “passed!” string, or a red “failed with error” string that contains

the traceback

Return type

ret (str)

robomimic.utils.torch_utils module#

This file contains some PyTorch utilities.

robomimic.utils.torch_utils.backprop_for_loss(net, optim, loss, max_grad_norm=None, retain_graph=False)#

Backpropagate loss and update parameters for network with name @name.

Parameters
  • net (torch.nn.Module) – network to update

  • optim (torch.optim.Optimizer) – optimizer to use

  • loss (torch.Tensor) – loss to use for backpropagation

  • max_grad_norm (float) – if provided, used to clip gradients

  • retain_graph (bool) – if True, graph is not freed after backward call

Returns

average gradient norms from backpropagation

Return type

grad_norms (float)

class robomimic.utils.torch_utils.dummy_context_mgr#

Bases: object

A dummy context manager - useful for having conditional scopes (such as @maybe_no_grad). Nothing happens in this scope.

robomimic.utils.torch_utils.get_torch_device(try_to_use_cuda)#

Return torch device. If using cuda (GPU), will also set cudnn.benchmark to True to optimize CNNs.

Parameters

try_to_use_cuda (bool) – if True and cuda is available, will use GPU

Returns

device to use for models

Return type

device (torch.Device)

robomimic.utils.torch_utils.hard_update(source, target)#

Hard update @target parameters to match @source.

Parameters
  • source (torch.nn.Module) – source network to provide parameters

  • target (torch.nn.Module) – target network to update parameters for

robomimic.utils.torch_utils.lr_scheduler_from_optim_params(net_optim_params, net, optimizer)#

Helper function to return a LRScheduler from the optim_params section of the config for a particular network. Returns None if a scheduler is not needed.

Parameters
  • optim_params (Config) – optim_params part of algo_config corresponding to @net. This determines whether a learning rate scheduler is created.

  • net (torch.nn.Module) – module whose parameters this optimizer will be responsible

  • optimizer (torch.optim.Optimizer) – optimizer for this net

Returns

learning rate scheduler

Return type

lr_scheduler (torch.optim.lr_scheduler or None)

robomimic.utils.torch_utils.maybe_no_grad(no_grad)#
Parameters

no_grad (bool) – if True, the returned context will be torch.no_grad(), otherwise it will be a dummy context

robomimic.utils.torch_utils.optimizer_from_optim_params(net_optim_params, net)#

Helper function to return a torch Optimizer from the optim_params section of the config for a particular network.

Parameters
  • optim_params (Config) – optim_params part of algo_config corresponding to @net. This determines the optimizer that is created.

  • net (torch.nn.Module) – module whose parameters this optimizer will be responsible

Returns

optimizer

Return type

optimizer (torch.optim.Optimizer)

robomimic.utils.torch_utils.reparameterize(mu, logvar)#

Reparameterize for the backpropagation of z instead of q. This makes it so that we can backpropagate through the sampling of z from our encoder when feeding the sampled variable to the decoder.

(See “The reparameterization trick” section of https://arxiv.org/abs/1312.6114)

Parameters
  • mu (torch.Tensor) – batch of means from the encoder distribution

  • logvar (torch.Tensor) – batch of log variances from the encoder distribution

Returns

batch of sampled latents from the encoder distribution that

support backpropagation

Return type

z (torch.Tensor)

robomimic.utils.torch_utils.soft_update(source, target, tau)#

Soft update from the parameters of a @source torch module to a @target torch module with strength @tau. The update follows target = target * (1 - tau) + source * tau.

Parameters
  • source (torch.nn.Module) – source network to push target network parameters towards

  • target (torch.nn.Module) – target network to update

robomimic.utils.train_utils module#

This file contains several utility functions used to define the main training loop. It mainly consists of functions to assist with logging, rollouts, and the @run_epoch function, which is the core training logic for models in this repository.

robomimic.utils.train_utils.dataset_factory(config, obs_keys, filter_by_attribute=None, dataset_path=None)#

Create a SequenceDataset instance to pass to a torch DataLoader.

Parameters
  • config (BaseConfig instance) – config object

  • obs_keys (list) – list of observation modalities that are required for training (this will inform the dataloader on what modalities to load)

  • filter_by_attribute (str) – if provided, use the provided filter key to select a subset of demonstration trajectories to load

  • dataset_path (str) – if provided, the SequenceDataset instance should load data from this dataset path. Defaults to config.train.data.

Returns

dataset object

Return type

dataset (SequenceDataset instance)

robomimic.utils.train_utils.get_exp_dir(config, auto_remove_exp_dir=False)#

Create experiment directory from config. If an identical experiment directory exists and @auto_remove_exp_dir is False (default), the function will prompt the user on whether to remove and replace it, or keep the existing one and add a new subdirectory with the new timestamp for the current run.

Parameters

auto_remove_exp_dir (bool) – if True, automatically remove the existing experiment folder if it exists at the same path.

Returns

path to created log directory (sub-folder in experiment directory) output_dir (str): path to created models directory (sub-folder in experiment directory)

to store model checkpoints

video_dir (str): path to video directory (sub-folder in experiment directory)

to store rollout videos

Return type

log_dir (str)

robomimic.utils.train_utils.is_every_n_steps(interval, current_step, skip_zero=False)#

Convenient function to check whether current_step is at the interval. Returns True if current_step % interval == 0 and asserts a few corner cases (e.g., interval <= 0)

Parameters
  • interval (int) – target interval

  • current_step (int) – current step

  • skip_zero (bool) – whether to skip 0 (return False at 0)

Returns

whether current_step is at the interval

Return type

is_at_interval (bool)

robomimic.utils.train_utils.load_data_for_training(config, obs_keys)#

Data loading at the start of an algorithm.

Parameters
  • config (BaseConfig instance) – config object

  • obs_keys (list) – list of observation modalities that are required for training (this will inform the dataloader on what modalities to load)

Returns

train dataset object valid_dataset (SequenceDataset instance): valid dataset object (only if using validation)

Return type

train_dataset (SequenceDataset instance)

robomimic.utils.train_utils.rollout_with_stats(policy, envs, horizon, use_goals=False, num_episodes=None, render=False, video_dir=None, video_path=None, epoch=None, video_skip=5, terminate_on_success=False, verbose=False)#

A helper function used in the train loop to conduct evaluation rollouts per environment and summarize the results.

Can specify @video_dir (to dump a video per environment) or @video_path (to dump a single video for all environments).

Parameters
  • policy (RolloutPolicy instance) – policy to use for rollouts.

  • envs (dict) – dictionary that maps env_name (str) to EnvBase instance. The policy will be rolled out in each env.

  • horizon (int) – maximum number of steps to roll the agent out for

  • use_goals (bool) – if True, agent is goal-conditioned, so provide goal observations from env

  • num_episodes (int) – number of rollout episodes per environment

  • render (bool) – if True, render the rollout to the screen

  • video_dir (str) – if not None, dump rollout videos to this directory (one per environment)

  • video_path (str) – if not None, dump a single rollout video for all environments

  • epoch (int) – epoch number (used for video naming)

  • video_skip (int) – how often to write video frame

  • terminate_on_success (bool) – if True, terminate episode early as soon as a success is encountered

  • verbose (bool) – if True, print results of each rollout

Returns

dictionary of rollout statistics (e.g. return, success rate, …)

averaged across all rollouts

video_paths (dict): path to rollout videos for each environment

Return type

all_rollout_logs (dict)

robomimic.utils.train_utils.run_epoch(model, data_loader, epoch, validate=False, num_steps=None)#

Run an epoch of training or validation.

Parameters
  • model (Algo instance) – model to train

  • data_loader (DataLoader instance) – data loader that will be used to serve batches of data to the model

  • epoch (int) – epoch number

  • validate (bool) – whether this is a training epoch or validation epoch. This tells the model whether to do gradient steps or purely do forward passes.

  • num_steps (int) – if provided, this epoch lasts for a fixed number of batches (gradient steps), otherwise the epoch is a complete pass through the training dataset

Returns

dictionary of logged training metrics averaged across all batches

Return type

step_log_all (dict)

robomimic.utils.train_utils.run_rollout(policy, env, horizon, use_goals=False, render=False, video_writer=None, video_skip=5, terminate_on_success=False)#

Runs a rollout in an environment with the current network parameters.

Parameters
  • policy (RolloutPolicy instance) – policy to use for rollouts.

  • env (EnvBase instance) – environment to use for rollouts.

  • horizon (int) – maximum number of steps to roll the agent out for

  • use_goals (bool) – if True, agent is goal-conditioned, so provide goal observations from env

  • render (bool) – if True, render the rollout to the screen

  • video_writer (imageio Writer instance) – if not None, use video writer object to append frames at rate given by @video_skip

  • video_skip (int) – how often to write video frame

  • terminate_on_success (bool) – if True, terminate episode early as soon as a success is encountered

Returns

dictionary containing return, success rate, etc.

Return type

results (dict)

robomimic.utils.train_utils.save_model(model, config, env_meta, shape_meta, ckpt_path, obs_normalization_stats=None)#

Save model to a torch pth file.

Parameters
  • model (Algo instance) – model to save

  • config (BaseConfig instance) – config to save

  • env_meta (dict) – env metadata for this training run

  • shape_meta (dict) – shape metdata for this training run

  • ckpt_path (str) – writes model checkpoint to this path

  • obs_normalization_stats (dict) – optionally pass a dictionary for observation normalization. This should map observation keys to dicts with a “mean” and “std” of shape (1, …) where … is the default shape for the observation.

robomimic.utils.train_utils.should_save_from_rollout_logs(all_rollout_logs, best_return, best_success_rate, epoch_ckpt_name, save_on_best_rollout_return, save_on_best_rollout_success_rate)#

Helper function used during training to determine whether checkpoints and videos should be saved. It will modify input attributes appropriately (such as updating the best returns and success rates seen and modifying the epoch ckpt name), and returns a dict with the updated statistics.

Parameters
  • all_rollout_logs (dict) – dictionary of rollout results that should be consistent with the output of @rollout_with_stats

  • best_return (dict) – dictionary that stores the best average rollout return seen so far during training, for each environment

  • best_success_rate (dict) – dictionary that stores the best average success rate seen so far during training, for each environment

  • epoch_ckpt_name (str) – what to name the checkpoint file - this name might be modified by this function

  • save_on_best_rollout_return (bool) – if True, should save checkpoints that achieve a new best rollout return

  • save_on_best_rollout_success_rate (bool) – if True, should save checkpoints that achieve a new best rollout success rate

Returns

dictionary that contains updated input attributes @best_return,

@best_success_rate, @epoch_ckpt_name, along with two additional attributes @should_save_ckpt (True if should save this checkpoint), and @ckpt_reason (string that contains the reason for saving the checkpoint)

Return type

save_info (dict)

robomimic.utils.vis_utils module#

This file contains utility functions for visualizing image observations in the training pipeline. These functions can be a useful debugging tool.

robomimic.utils.vis_utils.image_tensor_to_disk(image, fname)#

Writes an image tensor to disk. Any leading batch dimensions are indexed out with the first element.

Parameters
  • image (torch.Tensor) – image of shape […, C, H, W]. All leading dimensions will be indexed out with the first element

  • fname (str) – path to save image to

robomimic.utils.vis_utils.image_tensor_to_numpy(image)#

Converts processed image tensors to numpy so that they can be saved to disk or video. A useful utility function for visualizing images in the middle of training.

Parameters

image (torch.Tensor) – images of shape […, C, H, W]

Returns

converted images of shape […, H, W, C] and type uint8

Return type

image (np.array)

robomimic.utils.vis_utils.image_to_disk(image, fname)#

Writes an image to disk.

Parameters
  • image (np.array) – image of shape [H, W, 3]

  • fname (str) – path to save image to

Module contents#