Skip to content

eval

Evaluator

Evaluator class for running and evaluating policies for behavior task. This class manages the setup, execution, and evaluation of policy rollouts in OmniGibson environment, tracking metrics such as the number of trials, successes, and total time. It supports loading environments, robots, policies, and metrics, and provides methods for stepping through the environment, resetting state, and handling video outputs and loggings.

Source code in OmniGibson/omnigibson/learning/eval.py
class Evaluator:
    """
    Evaluator class for running and evaluating policies for behavior task.
    This class manages the setup, execution, and evaluation of policy rollouts in OmniGibson environment,
    tracking metrics such as the number of trials, successes, and total time. It supports loading environments,
    robots, policies, and metrics, and provides methods for stepping through the environment, resetting state,
    and handling video outputs and loggings.
    """

    def __init__(self, cfg: DictConfig) -> None:
        self.cfg = cfg

        # record total number and success number of trials and trial time
        self.n_trials = 0
        self.n_success_trials = 0
        self.total_time = 0
        self.robot_action = dict()

        self.env = self.load_env(env_wrapper=self.cfg.env_wrapper)
        self.policy = self.load_policy()
        self.robot = self.load_robot()
        self.metrics = self.load_metrics()

        self.reset()
        # manually reset environment episode number
        self.env._current_episode = 0
        self._video_writer = None

    def load_env(self, env_wrapper: DictConfig) -> EnvironmentWrapper:
        """
        Read the environment config file and create the environment.
        The config file is located in the configs/envs directory.
        """
        # Disable a subset of transition rules for data collection
        for rule in DISABLED_TRANSITION_RULES:
            rule.ENABLED = False
        # Load config file
        available_tasks = load_available_tasks()
        task_name = self.cfg.task.name
        assert task_name in available_tasks, f"Got invalid task name: {task_name}"
        # Now, get human stats of the task
        task_idx = TASK_NAMES_TO_INDICES[task_name]
        self.human_stats = {
            "length": [],
            "distance_traveled": [],
            "left_eef_displacement": [],
            "right_eef_displacement": [],
        }
        with open(os.path.join(gm.DATA_PATH, "2025-challenge-task-instances", "metadata", "episodes.jsonl"), "r") as f:
            episodes = [json.loads(line) for line in f]
        for episode in episodes:
            if episode["episode_index"] // 1e4 == task_idx:
                for k in self.human_stats.keys():
                    self.human_stats[k].append(episode[k])
        # take a mean
        for k in self.human_stats.keys():
            self.human_stats[k] = sum(self.human_stats[k]) / len(self.human_stats[k])

        # Load the seed instance by default
        task_cfg = available_tasks[task_name][0]
        robot_type = self.cfg.robot.type
        assert robot_type == "R1Pro", f"Got invalid robot type: {robot_type}, only R1Pro is supported."
        cfg = generate_basic_environment_config(task_name=task_name, task_cfg=task_cfg)
        if self.cfg.partial_scene_load:
            relevant_rooms = get_task_relevant_room_types(activity_name=task_name)
            relevant_rooms = augment_rooms(relevant_rooms, task_cfg["scene_model"], task_name)
            cfg["scene"]["load_room_types"] = relevant_rooms

        cfg["robots"] = [
            generate_robot_config(
                task_name=task_name,
                task_cfg=task_cfg,
            )
        ]
        # Update observation modalities
        cfg["robots"][0]["obs_modalities"] = ["proprio", "rgb"]
        cfg["robots"][0]["proprio_obs"] = list(PROPRIOCEPTION_INDICES["R1Pro"].keys())
        if self.cfg.robot.controllers is not None:
            cfg["robots"][0]["controller_config"].update(self.cfg.robot.controllers)
        if self.cfg.max_steps is None:
            logger.info(
                f"Setting timeout to be 2x the average length of human demos: {int(self.human_stats['length'] * 2)}"
            )
            cfg["task"]["termination_config"]["max_steps"] = int(self.human_stats["length"] * 2)
        else:
            logger.info(f"Setting timeout to be {self.cfg.max_steps} steps through config.")
            cfg["task"]["termination_config"]["max_steps"] = self.cfg.max_steps
        cfg["task"]["include_obs"] = False
        env = og.Environment(configs=cfg)
        # instantiate env wrapper
        env = instantiate(env_wrapper, env=env)
        return env

    def load_robot(self) -> BaseRobot:
        """
        Loads and returns the robot instance from the environment.
        Returns:
            BaseRobot: The robot instance loaded from the environment.
        """
        robot = self.env.scene.object_registry("name", "robot_r1")
        return robot

    def load_policy(self) -> Any:
        """
        Loads and returns the policy instance.
        """
        policy = instantiate(self.cfg.model)
        logger.info("")
        logger.info("=" * 50)
        logger.info(f"Loaded policy: {self.cfg.policy_name}")
        logger.info("=" * 50)
        logger.info("")
        return policy

    def load_metrics(self) -> List[MetricBase]:
        """
        Load agent and task metrics.
        """
        return [AgentMetric(self.human_stats), TaskMetric(self.human_stats)]

    def step(self) -> Tuple[bool, bool]:
        """
        Performs a single step of the task by executing the policy, interacting with the environment,
        processing observations, updating metrics, and tracking trial success.

        Returns:
            Tuple[bool, bool]:
                - terminated (bool): Whether the episode has terminated (i.e., reached a terminal state).
                - truncated (bool): Whether the episode was truncated (i.e., stopped due to a time limit or other constraint).

        Workflow:
            1. Computes the next action using the policy based on the current observation.
            2. Steps the environment with the computed action and retrieves the next observation,
               termination and truncation flags, and additional info.
            3. If the episode has ended (terminated or truncated), increments the trial counter and
               updates the count of successful trials if the task was completed successfully.
            4. Preprocesses the new observation.
            5. Invokes step callbacks for all registered metrics to update their state.
            6. Returns the termination and truncation status.
        """
        self.robot_action = self.policy.forward(obs=self.obs)

        obs, _, terminated, truncated, info = self.env.step(self.robot_action, n_render_iterations=1)

        # process obs
        self.obs = self._preprocess_obs(obs)

        if terminated or truncated:
            self.n_trials += 1
            if info["done"]["success"]:
                self.n_success_trials += 1

        for metric in self.metrics:
            metric.step_callback(self.env)
        return terminated, truncated

    @property
    def video_writer(self) -> Tuple[Container, Stream]:
        """
        Returns the video writer for the current evaluation step.
        """
        return self._video_writer

    @video_writer.setter
    def video_writer(self, video_writer: Tuple[Container, Stream]) -> None:
        if self._video_writer is not None:
            (container, stream) = self._video_writer
            # Flush any remaining packets
            for packet in stream.encode():
                container.mux(packet)
            # Close the container
            container.close()
        self._video_writer = video_writer

    def load_task_instance(self, instance_id: int, test_hidden: bool = False) -> None:
        """
        Loads the configuration for a specific task instance.

        Args:
            instance_id (int): The ID of the task instance to load.
            test_hidden (bool): [Interal use only] Whether to load the hidden test instance.
        """
        scene_model = self.env.task.scene_name
        tro_filename = self.env.task.get_cached_activity_scene_filename(
            scene_model=scene_model,
            activity_name=self.env.task.activity_name,
            activity_definition_id=self.env.task.activity_definition_id,
            activity_instance_id=instance_id,
        )
        if test_hidden:
            tro_file_path = os.path.join(
                gm.DATA_PATH,
                "2025-challenge-test-instances",
                self.env.task.activity_name,
                f"{tro_filename}-tro_state.json",
            )
        else:
            tro_file_path = os.path.join(
                get_task_instance_path(scene_model),
                f"json/{scene_model}_task_{self.env.task.activity_name}_instances/{tro_filename}-tro_state.json",
            )
        with open(tro_file_path, "r") as f:
            tro_state = recursively_convert_to_torch(json.load(f))
        for tro_key, tro_state in tro_state.items():
            if tro_key == "robot_poses":
                presampled_robot_poses = tro_state
                robot_pos = presampled_robot_poses[self.robot.model_name][0]["position"]
                robot_quat = presampled_robot_poses[self.robot.model_name][0]["orientation"]
                self.robot.set_position_orientation(robot_pos, robot_quat)
                # Write robot poses to scene metadata
                self.env.scene.write_task_metadata(key=tro_key, data=tro_state)
            else:
                self.env.task.object_scope[tro_key].load_state(tro_state, serialized=False)

        # Try to ensure that all task-relevant objects are stable
        # They should already be stable from the sampled instance, but there is some issue where loading the state
        # causes some jitter (maybe for small mass / thin objects?)
        for _ in range(25):
            og.sim.step_physics()
            for entity in self.env.task.object_scope.values():
                if not entity.is_system and entity.exists:
                    entity.keep_still()

        self.env.scene.update_initial_file()
        self.env.scene.reset()

    def _preprocess_obs(self, obs: dict) -> dict:
        """
        Preprocess the observation dictionary before passing it to the policy.
        Args:
            obs (dict): The observation dictionary to preprocess.

        Returns:
            dict: The preprocessed observation dictionary.
        """
        obs = flatten_obs_dict(obs)
        base_pose = self.robot.get_position_orientation()
        cam_rel_poses = []
        # The first time we query for camera parameters, it will return all zeros
        # For this case, we use camera.get_position_orientation() instead.
        # The reason we are not using camera.get_position_orientation() by defualt is because it will always return the most recent camera poses
        # However, since og render is somewhat "async", it takes >= 3 render calls per step to actually get the up-to-date camera renderings
        # Since we are using n_render_iterations=1 for speed concern, we need the correct corresponding camera poses instead of the most update-to-date one.
        # Thus, we use camera parameters which are guaranteed to be in sync with the visual observations.
        for camera_name in ROBOT_CAMERA_NAMES["R1Pro"].values():
            camera = self.robot.sensors[camera_name.split("::")[1]]
            direct_cam_pose = camera.camera_parameters["cameraViewTransform"]
            if np.allclose(direct_cam_pose, np.zeros(16)):
                cam_rel_poses.append(
                    th.cat(T.relative_pose_transform(*(camera.get_position_orientation()), *base_pose))
                )
            else:
                cam_pose = T.mat2pose(th.tensor(np.linalg.inv(np.reshape(direct_cam_pose, [4, 4]).T), dtype=th.float32))
                cam_rel_poses.append(th.cat(T.relative_pose_transform(*cam_pose, *base_pose)))
        obs["robot_r1::cam_rel_poses"] = th.cat(cam_rel_poses, axis=-1)
        # append task id to obs
        obs["task_id"] = th.tensor([TASK_NAMES_TO_INDICES[self.cfg.task.name]], dtype=th.int64)
        return obs

    def _write_video(self) -> None:
        """
        Write the current robot observations to video.
        """
        if ROBOT_CAMERA_NAMES["R1Pro"]["head"] + "::rgb" not in self.obs:
            return
        # concatenate obs
        left_wrist_rgb = cv2.resize(
            self.obs[ROBOT_CAMERA_NAMES["R1Pro"]["left_wrist"] + "::rgb"].numpy(),
            (224, 224),
        )
        right_wrist_rgb = cv2.resize(
            self.obs[ROBOT_CAMERA_NAMES["R1Pro"]["right_wrist"] + "::rgb"].numpy(),
            (224, 224),
        )
        head_rgb = cv2.resize(
            self.obs[ROBOT_CAMERA_NAMES["R1Pro"]["head"] + "::rgb"].numpy(),
            (448, 448),
        )
        write_video(
            np.expand_dims(np.hstack([np.vstack([left_wrist_rgb, right_wrist_rgb]), head_rgb]), 0),
            video_writer=self.video_writer,
            batch_size=1,
            mode="rgb",
        )

    def reset(self) -> None:
        """
        Reset the environment, policy, and compute metrics.
        """
        self.obs = self._preprocess_obs(self.env.reset()[0])
        # run metric start callbacks
        for metric in self.metrics:
            metric.start_callback(self.env)
        self.policy.reset()
        self.n_success_trials, self.n_trials = 0, 0

    def __enter__(self):
        signal(SIGINT, self._sigint_handler)
        return self

    def __exit__(self, exc_type, exc_value, exc_tb):
        logger.info("")
        logger.info("=" * 50)
        logger.info(f"Total success trials: {self.n_success_trials}")
        logger.info(f"Total trials: {self.n_trials}")
        if self.n_trials > 0:
            logger.info(f"Success rate: {self.n_success_trials / self.n_trials}")
        logger.info("=" * 50)
        logger.info("")
        if exc_type is not None:
            traceback.print_exception(exc_type, exc_value, exc_tb)
        self.video_writer = None
        self.env.close()
        og.shutdown()

    def _sigint_handler(self, signal_received, frame):
        logger.warning("SIGINT or CTRL-C detected.\n")
        self.__exit__(None, None, None)
        sys.exit(0)

video_writer property writable

Returns the video writer for the current evaluation step.

load_env(env_wrapper)

Read the environment config file and create the environment. The config file is located in the configs/envs directory.

Source code in OmniGibson/omnigibson/learning/eval.py
def load_env(self, env_wrapper: DictConfig) -> EnvironmentWrapper:
    """
    Read the environment config file and create the environment.
    The config file is located in the configs/envs directory.
    """
    # Disable a subset of transition rules for data collection
    for rule in DISABLED_TRANSITION_RULES:
        rule.ENABLED = False
    # Load config file
    available_tasks = load_available_tasks()
    task_name = self.cfg.task.name
    assert task_name in available_tasks, f"Got invalid task name: {task_name}"
    # Now, get human stats of the task
    task_idx = TASK_NAMES_TO_INDICES[task_name]
    self.human_stats = {
        "length": [],
        "distance_traveled": [],
        "left_eef_displacement": [],
        "right_eef_displacement": [],
    }
    with open(os.path.join(gm.DATA_PATH, "2025-challenge-task-instances", "metadata", "episodes.jsonl"), "r") as f:
        episodes = [json.loads(line) for line in f]
    for episode in episodes:
        if episode["episode_index"] // 1e4 == task_idx:
            for k in self.human_stats.keys():
                self.human_stats[k].append(episode[k])
    # take a mean
    for k in self.human_stats.keys():
        self.human_stats[k] = sum(self.human_stats[k]) / len(self.human_stats[k])

    # Load the seed instance by default
    task_cfg = available_tasks[task_name][0]
    robot_type = self.cfg.robot.type
    assert robot_type == "R1Pro", f"Got invalid robot type: {robot_type}, only R1Pro is supported."
    cfg = generate_basic_environment_config(task_name=task_name, task_cfg=task_cfg)
    if self.cfg.partial_scene_load:
        relevant_rooms = get_task_relevant_room_types(activity_name=task_name)
        relevant_rooms = augment_rooms(relevant_rooms, task_cfg["scene_model"], task_name)
        cfg["scene"]["load_room_types"] = relevant_rooms

    cfg["robots"] = [
        generate_robot_config(
            task_name=task_name,
            task_cfg=task_cfg,
        )
    ]
    # Update observation modalities
    cfg["robots"][0]["obs_modalities"] = ["proprio", "rgb"]
    cfg["robots"][0]["proprio_obs"] = list(PROPRIOCEPTION_INDICES["R1Pro"].keys())
    if self.cfg.robot.controllers is not None:
        cfg["robots"][0]["controller_config"].update(self.cfg.robot.controllers)
    if self.cfg.max_steps is None:
        logger.info(
            f"Setting timeout to be 2x the average length of human demos: {int(self.human_stats['length'] * 2)}"
        )
        cfg["task"]["termination_config"]["max_steps"] = int(self.human_stats["length"] * 2)
    else:
        logger.info(f"Setting timeout to be {self.cfg.max_steps} steps through config.")
        cfg["task"]["termination_config"]["max_steps"] = self.cfg.max_steps
    cfg["task"]["include_obs"] = False
    env = og.Environment(configs=cfg)
    # instantiate env wrapper
    env = instantiate(env_wrapper, env=env)
    return env

load_metrics()

Load agent and task metrics.

Source code in OmniGibson/omnigibson/learning/eval.py
def load_metrics(self) -> List[MetricBase]:
    """
    Load agent and task metrics.
    """
    return [AgentMetric(self.human_stats), TaskMetric(self.human_stats)]

load_policy()

Loads and returns the policy instance.

Source code in OmniGibson/omnigibson/learning/eval.py
def load_policy(self) -> Any:
    """
    Loads and returns the policy instance.
    """
    policy = instantiate(self.cfg.model)
    logger.info("")
    logger.info("=" * 50)
    logger.info(f"Loaded policy: {self.cfg.policy_name}")
    logger.info("=" * 50)
    logger.info("")
    return policy

load_robot()

Loads and returns the robot instance from the environment. Returns: BaseRobot: The robot instance loaded from the environment.

Source code in OmniGibson/omnigibson/learning/eval.py
def load_robot(self) -> BaseRobot:
    """
    Loads and returns the robot instance from the environment.
    Returns:
        BaseRobot: The robot instance loaded from the environment.
    """
    robot = self.env.scene.object_registry("name", "robot_r1")
    return robot

load_task_instance(instance_id, test_hidden=False)

Loads the configuration for a specific task instance.

Parameters:

Name Type Description Default
instance_id int

The ID of the task instance to load.

required
test_hidden bool

[Interal use only] Whether to load the hidden test instance.

False
Source code in OmniGibson/omnigibson/learning/eval.py
def load_task_instance(self, instance_id: int, test_hidden: bool = False) -> None:
    """
    Loads the configuration for a specific task instance.

    Args:
        instance_id (int): The ID of the task instance to load.
        test_hidden (bool): [Interal use only] Whether to load the hidden test instance.
    """
    scene_model = self.env.task.scene_name
    tro_filename = self.env.task.get_cached_activity_scene_filename(
        scene_model=scene_model,
        activity_name=self.env.task.activity_name,
        activity_definition_id=self.env.task.activity_definition_id,
        activity_instance_id=instance_id,
    )
    if test_hidden:
        tro_file_path = os.path.join(
            gm.DATA_PATH,
            "2025-challenge-test-instances",
            self.env.task.activity_name,
            f"{tro_filename}-tro_state.json",
        )
    else:
        tro_file_path = os.path.join(
            get_task_instance_path(scene_model),
            f"json/{scene_model}_task_{self.env.task.activity_name}_instances/{tro_filename}-tro_state.json",
        )
    with open(tro_file_path, "r") as f:
        tro_state = recursively_convert_to_torch(json.load(f))
    for tro_key, tro_state in tro_state.items():
        if tro_key == "robot_poses":
            presampled_robot_poses = tro_state
            robot_pos = presampled_robot_poses[self.robot.model_name][0]["position"]
            robot_quat = presampled_robot_poses[self.robot.model_name][0]["orientation"]
            self.robot.set_position_orientation(robot_pos, robot_quat)
            # Write robot poses to scene metadata
            self.env.scene.write_task_metadata(key=tro_key, data=tro_state)
        else:
            self.env.task.object_scope[tro_key].load_state(tro_state, serialized=False)

    # Try to ensure that all task-relevant objects are stable
    # They should already be stable from the sampled instance, but there is some issue where loading the state
    # causes some jitter (maybe for small mass / thin objects?)
    for _ in range(25):
        og.sim.step_physics()
        for entity in self.env.task.object_scope.values():
            if not entity.is_system and entity.exists:
                entity.keep_still()

    self.env.scene.update_initial_file()
    self.env.scene.reset()

reset()

Reset the environment, policy, and compute metrics.

Source code in OmniGibson/omnigibson/learning/eval.py
def reset(self) -> None:
    """
    Reset the environment, policy, and compute metrics.
    """
    self.obs = self._preprocess_obs(self.env.reset()[0])
    # run metric start callbacks
    for metric in self.metrics:
        metric.start_callback(self.env)
    self.policy.reset()
    self.n_success_trials, self.n_trials = 0, 0

step()

Performs a single step of the task by executing the policy, interacting with the environment, processing observations, updating metrics, and tracking trial success.

Returns:

Type Description
Tuple[bool, bool]
  • terminated (bool): Whether the episode has terminated (i.e., reached a terminal state).
  • truncated (bool): Whether the episode was truncated (i.e., stopped due to a time limit or other constraint).
Workflow
  1. Computes the next action using the policy based on the current observation.
  2. Steps the environment with the computed action and retrieves the next observation, termination and truncation flags, and additional info.
  3. If the episode has ended (terminated or truncated), increments the trial counter and updates the count of successful trials if the task was completed successfully.
  4. Preprocesses the new observation.
  5. Invokes step callbacks for all registered metrics to update their state.
  6. Returns the termination and truncation status.
Source code in OmniGibson/omnigibson/learning/eval.py
def step(self) -> Tuple[bool, bool]:
    """
    Performs a single step of the task by executing the policy, interacting with the environment,
    processing observations, updating metrics, and tracking trial success.

    Returns:
        Tuple[bool, bool]:
            - terminated (bool): Whether the episode has terminated (i.e., reached a terminal state).
            - truncated (bool): Whether the episode was truncated (i.e., stopped due to a time limit or other constraint).

    Workflow:
        1. Computes the next action using the policy based on the current observation.
        2. Steps the environment with the computed action and retrieves the next observation,
           termination and truncation flags, and additional info.
        3. If the episode has ended (terminated or truncated), increments the trial counter and
           updates the count of successful trials if the task was completed successfully.
        4. Preprocesses the new observation.
        5. Invokes step callbacks for all registered metrics to update their state.
        6. Returns the termination and truncation status.
    """
    self.robot_action = self.policy.forward(obs=self.obs)

    obs, _, terminated, truncated, info = self.env.step(self.robot_action, n_render_iterations=1)

    # process obs
    self.obs = self._preprocess_obs(obs)

    if terminated or truncated:
        self.n_trials += 1
        if info["done"]["success"]:
            self.n_success_trials += 1

    for metric in self.metrics:
        metric.step_callback(self.env)
    return terminated, truncated