Skip to content

task_base

BaseTask

Bases: GymObservable, Registerable

Base Task class. Task-specific reset_scene, reset_agent, step methods are implemented in subclasses

Parameters:

Name Type Description Default
termination_config None or dict

Keyword-mapped configuration to use to generate termination conditions. This should be specific to the task class. Default is None, which corresponds to a default config being usd. Note that any keyword required by a specific task class but not specified in the config will automatically be filled in with the default config. See cls.default_termination_config for default values used

None
reward_config None or dict

Keyword-mapped configuration to use to generate reward functions. This should be specific to the task class. Default is None, which corresponds to a default config being usd. Note that any keyword required by a specific task class but not specified in the config will automatically be filled in with the default config. See cls.default_reward_config for default values used

None
Source code in omnigibson/tasks/task_base.py
class BaseTask(GymObservable, Registerable, metaclass=ABCMeta):
    """
    Base Task class.
    Task-specific reset_scene, reset_agent, step methods are implemented in subclasses

    Args:
        termination_config (None or dict): Keyword-mapped configuration to use to generate termination conditions. This
            should be specific to the task class. Default is None, which corresponds to a default config being usd.
            Note that any keyword required by a specific task class but not specified in the config will automatically
            be filled in with the default config. See cls.default_termination_config for default values used
        reward_config (None or dict): Keyword-mapped configuration to use to generate reward functions. This should be
            specific to the task class. Default is None, which corresponds to a default config being usd. Note that
            any keyword required by a specific task class but not specified in the config will automatically be filled
            in with the default config. See cls.default_reward_config for default values used
    """
    def __init__(self, termination_config=None, reward_config=None):
        # Make sure configs are dictionaries
        termination_config = dict() if termination_config is None else termination_config
        reward_config = dict() if reward_config is None else reward_config

        # Sanity check termination and reward conditions -- any keys found in the inputted config but NOT
        # found in the default config should raise an error
        unknown_termination_keys = set(termination_config.keys()) - set(self.default_termination_config.keys())
        assert len(unknown_termination_keys) == 0, \
            f"Got unknown termination config keys inputted: {unknown_termination_keys}"
        unknown_reward_keys = set(reward_config.keys()) - set(self.default_reward_config.keys())
        assert len(unknown_reward_keys) == 0, f"Got unknown reward config keys inputted: {unknown_reward_keys}"

        # Combine with defaults and store internally
        self._termination_config = self.default_termination_config
        self._termination_config.update(termination_config)
        self._reward_config = self.default_reward_config
        self._reward_config.update(reward_config)

        # Generate reward and termination functions
        self._termination_conditions = self._create_termination_conditions()
        self._reward_functions = self._create_reward_functions()

        # Store other internal vars that will be populated at runtime
        self._loaded = False
        self._reward = None
        self._done = None
        self._info = None
        self._low_dim_obs_dim = None

        # Run super init
        super().__init__()

    @abstractmethod
    def _load(self, env):
        """
        Load this task. Should be implemented by subclass. Can include functionality, e.g.: loading dynamic objects
        into the environment
        """
        raise NotImplementedError()

    @abstractmethod
    def _load_non_low_dim_observation_space(self):
        """
        Loads any non-low dim observation spaces for this task.

        Returns:
            dict: Keyword-mapped observation space for this object mapping non low dim task observation name to
                observation space
        """
        raise NotImplementedError()

    def _load_observation_space(self):
        # Create the non low dim obs space
        obs_space = self._load_non_low_dim_observation_space()

        # Create the low dim obs space and add to the main obs space dict -- make sure we're flattening low dim obs
        obs_space["low_dim"] = self._build_obs_box_space(shape=(self._low_dim_obs_dim,), low=-np.inf, high=np.inf, dtype=np.float64)

        return obs_space

    def load(self, env):
        """
        Load this task
        """
        # Make sure the scene is of the correct type!
        assert any([issubclass(env.scene.__class__, valid_cls) for valid_cls in self.valid_scene_types]), \
            f"Got incompatible scene type {env.scene.__class__.__name__} for task {self.__class__.__name__}! " \
            f"Scene class must be a subclass of at least one of: " \
            f"{[cls_type.__name__ for cls_type in self.valid_scene_types]}"

        # Run internal method
        self._load(env=env)

        # We're now initialized
        self._loaded = True

    @abstractmethod
    def _create_termination_conditions(self):
        """
        Creates the termination functions in the environment

        Returns:
            dict of BaseTerminationCondition: Termination functions created for this task
        """
        raise NotImplementedError()

    @abstractmethod
    def _create_reward_functions(self):
        """
        Creates the reward functions in the environment

        Returns:
            dict of BaseRewardFunction: Reward functions created for this task
        """
        raise NotImplementedError()

    def _reset_scene(self, env):
        """
        Task-specific scene reset. Default is the normal scene reset

        Args:
            env (Environment): environment instance
        """
        env.scene.reset()

    def _reset_agent(self, env):
        """
        Task-specific agent reset

        Args:
            env (Environment): environment instance
        """
        # Default is no-op
        pass

    def _reset_variables(self, env):
        """
        Task-specific internal variable reset

        Args:
            env (Environment): environment instance
        """
        # By default, reset reward, done, and info
        self._reward = None
        self._done = None
        self._info = None

    def reset(self, env):
        """
        Resets this task in the environment

        Args:
            env (Environment): environment instance to reset
        """
        # Reset the scene, agent, and variables
        self._reset_scene(env)
        self._reset_agent(env)
        self._reset_variables(env)

        # Also reset all termination conditions and reward functions
        for termination_condition in self._termination_conditions.values():
            termination_condition.reset(self, env)
        for reward_function in self._reward_functions.values():
            reward_function.reset(self, env)

        # Fill in low dim obs dim so we can use this to create the observation space later
        self._low_dim_obs_dim = len(self.get_obs(env=env, flatten_low_dim=True)["low_dim"])

    def _step_termination(self, env, action, info=None):
        """
        Step and aggregate termination conditions

        Args:
            env (Environment): Environment instance
            action (n-array): 1D flattened array of actions executed by all agents in the environment
            info (None or dict): Any info to return

        Returns:
            2-tuple:
                - float: aggregated termination at the current timestep
                - dict: any information passed through this function or generated by this function
        """
        # Get all dones and successes from individual termination conditions
        dones = []
        successes = []
        for termination_condition in self._termination_conditions.values():
            d, s = termination_condition.step(self, env, action)
            dones.append(d)
            successes.append(s)
        # Any True found corresponds to a done / success
        done = sum(dones) > 0
        success = sum(successes) > 0

        # Populate info
        info = dict() if info is None else info
        info["success"] = success
        return done, info

    def _step_reward(self, env, action, info=None):
        """
        Step and aggregate reward functions

        Args:
            env (Environment): Environment instance
            action (n-array): 1D flattened array of actions executed by all agents in the environment
            info (None or dict): Any info to return

        Returns:
            2-tuple:
                - float: aggregated reward at the current timestep
                - dict: any information passed through this function or generated by this function
        """
        # Make sure info is a dict
        total_info = dict() if info is None else info
        # We'll also store individual reward split as well
        breakdown_dict = dict()
        # Aggregate rewards over all reward functions
        total_reward = 0.0
        for reward_name, reward_function in self._reward_functions.items():
            reward, reward_info = reward_function.step(self, env, action)
            total_reward += reward
            breakdown_dict[reward_name] = reward
            total_info[reward_name] = reward_info

        # Store breakdown dict
        total_info["reward_breakdown"] = breakdown_dict

        return total_reward, total_info

    @abstractmethod
    def _get_obs(self, env):
        """
        Get task-specific observation

        Args:
            env (Environment): Environment instance

        Returns:
            2-tuple:
                - dict: Keyword-mapped low dimensional observations from this task
                - dict: All other keyword-mapped observations from this task
        """
        raise NotImplementedError()

    def _flatten_low_dim_obs(self, obs):
        """
        Flattens dictionary containing low-dimensional observations @obs and converts it from a dictionary into a
        1D numpy array

        Args:
            obs (dict): Low-dim observation dictionary where each value is a 1D array

        Returns:
            n-array: 1D-numpy array of flattened low-dim observations
        """
        # By default, we simply concatenate all values in our obs dict
        return np.concatenate([ob for ob in obs.values()]) if len(obs.values()) > 0 else np.array([])

    def get_obs(self, env, flatten_low_dim=True):
        # Args: env (Environment): environment instance
        # Args: flatten_low_dim (bool): Whether to flatten low-dimensional observations

        # Grab obs internally
        low_dim_obs, obs = self._get_obs(env=env)

        # Possibly flatten low dim and add to main observation dictionary
        obs["low_dim"] = self._flatten_low_dim_obs(obs=low_dim_obs) if flatten_low_dim else low_dim_obs

        return obs

    def step(self, env, action):
        """
        Perform task-specific step for every timestep

        Args:
            env (Environment): Environment instance
            action (n-array): 1D flattened array of actions executed by all agents in the environment

        Returns:
            3-tuple:
                - float: reward calculated after this step
                - bool: whether task is done or not
                - dict: nested dictionary of reward- and done-related info
        """
        # Make sure we're initialized
        assert self._loaded, "Task must be loaded using load() before calling step()!"

        # We calculate termination conditions first and then rewards
        # (since some rewards can rely on termination conditions to update)
        done, done_info = self._step_termination(env=env, action=action)
        reward, reward_info = self._step_reward(env=env, action=action)

        # Update the internal state of this task
        self._reward = reward
        self._done = done
        self._info = {
            "reward": reward_info,
            "done": done_info,
        }

        return self._reward, self._done, deepcopy(self._info)

    @property
    def name(self):
        """
        Returns:
            str: Name of this task. Defaults to class name
        """
        return self.__class__.__name__

    @property
    def reward(self):
        """
        Returns:
            float: Current reward for this task
        """
        assert self._reward is not None, "At least one step() must occur before reward can be calculated!"
        return self._reward

    @property
    def done(self):
        """
        Returns:
            bool: Whether this task is done or not
        """
        assert self._done is not None, "At least one step() must occur before done can be calculated!"
        return self._done

    @property
    def info(self):
        """
        Returns:
            dict: Nested dictionary of information for this task, including reward- and done-specific information
        """
        assert self._info is not None, "At least one step() must occur before info can be calculated!"
        return self._info

    @classproperty
    def valid_scene_types(cls):
        """
        Returns:
            set of Scene: Scene type(s) that are valid (i.e.: compatible) with this specific task. This will be
                used to sanity check the task + scene combination at runtime
        """
        raise NotImplementedError()

    @classproperty
    def default_reward_config(cls):
        """
        Returns:
            dict: Default reward configuration for this class. Should include any kwargs necessary for
                any of the reward classes generated in self._create_rewards(). Note: this default config
                should be fully verbose -- any keys inputted in the constructor but NOT found in this default config
                will raise an error!
        """
        raise NotImplementedError()

    @classproperty
    def default_termination_config(cls):
        """
        Returns:
            dict: Default termination configuration for this class. Should include any kwargs necessary for
                any of the termination classes generated in self._create_terminations(). Note: this default config
                should be fully verbose -- any keys inputted in the constructor but NOT found in this default config
                will raise an error!
        """
        raise NotImplementedError()

    @classproperty
    def _do_not_register_classes(cls):
        # Don't register this class since it's an abstract template
        classes = super()._do_not_register_classes
        classes.add("BaseTask")
        return classes

    @classproperty
    def _cls_registry(cls):
        # Global registry
        global REGISTERED_TASKS
        return REGISTERED_TASKS

done property

Returns:

Name Type Description
bool

Whether this task is done or not

info property

Returns:

Name Type Description
dict

Nested dictionary of information for this task, including reward- and done-specific information

name property

Returns:

Name Type Description
str

Name of this task. Defaults to class name

reward property

Returns:

Name Type Description
float

Current reward for this task

default_reward_config()

Returns:

Name Type Description
dict

Default reward configuration for this class. Should include any kwargs necessary for any of the reward classes generated in self._create_rewards(). Note: this default config should be fully verbose -- any keys inputted in the constructor but NOT found in this default config will raise an error!

Source code in omnigibson/tasks/task_base.py
@classproperty
def default_reward_config(cls):
    """
    Returns:
        dict: Default reward configuration for this class. Should include any kwargs necessary for
            any of the reward classes generated in self._create_rewards(). Note: this default config
            should be fully verbose -- any keys inputted in the constructor but NOT found in this default config
            will raise an error!
    """
    raise NotImplementedError()

default_termination_config()

Returns:

Name Type Description
dict

Default termination configuration for this class. Should include any kwargs necessary for any of the termination classes generated in self._create_terminations(). Note: this default config should be fully verbose -- any keys inputted in the constructor but NOT found in this default config will raise an error!

Source code in omnigibson/tasks/task_base.py
@classproperty
def default_termination_config(cls):
    """
    Returns:
        dict: Default termination configuration for this class. Should include any kwargs necessary for
            any of the termination classes generated in self._create_terminations(). Note: this default config
            should be fully verbose -- any keys inputted in the constructor but NOT found in this default config
            will raise an error!
    """
    raise NotImplementedError()

load(env)

Load this task

Source code in omnigibson/tasks/task_base.py
def load(self, env):
    """
    Load this task
    """
    # Make sure the scene is of the correct type!
    assert any([issubclass(env.scene.__class__, valid_cls) for valid_cls in self.valid_scene_types]), \
        f"Got incompatible scene type {env.scene.__class__.__name__} for task {self.__class__.__name__}! " \
        f"Scene class must be a subclass of at least one of: " \
        f"{[cls_type.__name__ for cls_type in self.valid_scene_types]}"

    # Run internal method
    self._load(env=env)

    # We're now initialized
    self._loaded = True

reset(env)

Resets this task in the environment

Parameters:

Name Type Description Default
env Environment

environment instance to reset

required
Source code in omnigibson/tasks/task_base.py
def reset(self, env):
    """
    Resets this task in the environment

    Args:
        env (Environment): environment instance to reset
    """
    # Reset the scene, agent, and variables
    self._reset_scene(env)
    self._reset_agent(env)
    self._reset_variables(env)

    # Also reset all termination conditions and reward functions
    for termination_condition in self._termination_conditions.values():
        termination_condition.reset(self, env)
    for reward_function in self._reward_functions.values():
        reward_function.reset(self, env)

    # Fill in low dim obs dim so we can use this to create the observation space later
    self._low_dim_obs_dim = len(self.get_obs(env=env, flatten_low_dim=True)["low_dim"])

step(env, action)

Perform task-specific step for every timestep

Parameters:

Name Type Description Default
env Environment

Environment instance

required
action n-array

1D flattened array of actions executed by all agents in the environment

required

Returns:

Type Description

3-tuple: - float: reward calculated after this step - bool: whether task is done or not - dict: nested dictionary of reward- and done-related info

Source code in omnigibson/tasks/task_base.py
def step(self, env, action):
    """
    Perform task-specific step for every timestep

    Args:
        env (Environment): Environment instance
        action (n-array): 1D flattened array of actions executed by all agents in the environment

    Returns:
        3-tuple:
            - float: reward calculated after this step
            - bool: whether task is done or not
            - dict: nested dictionary of reward- and done-related info
    """
    # Make sure we're initialized
    assert self._loaded, "Task must be loaded using load() before calling step()!"

    # We calculate termination conditions first and then rewards
    # (since some rewards can rely on termination conditions to update)
    done, done_info = self._step_termination(env=env, action=action)
    reward, reward_info = self._step_reward(env=env, action=action)

    # Update the internal state of this task
    self._reward = reward
    self._done = done
    self._info = {
        "reward": reward_info,
        "done": done_info,
    }

    return self._reward, self._done, deepcopy(self._info)

valid_scene_types()

Returns:

Type Description

set of Scene: Scene type(s) that are valid (i.e.: compatible) with this specific task. This will be used to sanity check the task + scene combination at runtime

Source code in omnigibson/tasks/task_base.py
@classproperty
def valid_scene_types(cls):
    """
    Returns:
        set of Scene: Scene type(s) that are valid (i.e.: compatible) with this specific task. This will be
            used to sanity check the task + scene combination at runtime
    """
    raise NotImplementedError()