class Evaluator:
"""
Evaluator class for running and evaluating policies for behavior task.
This class manages the setup, execution, and evaluation of policy rollouts in OmniGibson environment,
tracking metrics such as the number of trials, successes, and total time. It supports loading environments,
robots, policies, and metrics, and provides methods for stepping through the environment, resetting state,
and handling video outputs and loggings.
"""
def __init__(self, cfg: DictConfig) -> None:
self.cfg = cfg
# record total number and success number of trials and trial time
self.n_trials = 0
self.n_success_trials = 0
self.total_time = 0
self.robot_action = dict()
self.env = self.load_env(env_wrapper=self.cfg.env_wrapper)
self.policy = self.load_policy()
self.robot = self.load_robot()
self.metrics = self.load_metrics()
self.reset()
# manually reset environment episode number
self.env._current_episode = 0
self._video_writer = None
def load_env(self, env_wrapper: DictConfig) -> EnvironmentWrapper:
"""
Read the environment config file and create the environment.
The config file is located in the configs/envs directory.
"""
# Disable a subset of transition rules for data collection
for rule in DISABLED_TRANSITION_RULES:
rule.ENABLED = False
# Load config file
available_tasks = load_available_tasks()
task_name = self.cfg.task.name
assert task_name in available_tasks, f"Got invalid task name: {task_name}"
# Now, get human stats of the task
task_idx = TASK_NAMES_TO_INDICES[task_name]
self.human_stats = {
"length": [],
"distance_traveled": [],
"left_eef_displacement": [],
"right_eef_displacement": [],
}
with open(os.path.join(gm.DATA_PATH, "2025-challenge-task-instances", "metadata", "episodes.jsonl"), "r") as f:
episodes = [json.loads(line) for line in f]
for episode in episodes:
if episode["episode_index"] // 1e4 == task_idx:
for k in self.human_stats.keys():
self.human_stats[k].append(episode[k])
# take a mean
for k in self.human_stats.keys():
self.human_stats[k] = sum(self.human_stats[k]) / len(self.human_stats[k])
# Load the seed instance by default
task_cfg = available_tasks[task_name][0]
robot_type = self.cfg.robot.type
assert robot_type == "R1Pro", f"Got invalid robot type: {robot_type}, only R1Pro is supported."
cfg = generate_basic_environment_config(task_name=task_name, task_cfg=task_cfg)
if self.cfg.partial_scene_load:
relevant_rooms = get_task_relevant_room_types(activity_name=task_name)
relevant_rooms = augment_rooms(relevant_rooms, task_cfg["scene_model"], task_name)
cfg["scene"]["load_room_types"] = relevant_rooms
cfg["robots"] = [
generate_robot_config(
task_name=task_name,
task_cfg=task_cfg,
)
]
# Update observation modalities
cfg["robots"][0]["obs_modalities"] = ["proprio", "rgb"]
cfg["robots"][0]["proprio_obs"] = list(PROPRIOCEPTION_INDICES["R1Pro"].keys())
if self.cfg.robot.controllers is not None:
cfg["robots"][0]["controller_config"].update(self.cfg.robot.controllers)
if self.cfg.max_steps is None:
logger.info(
f"Setting timeout to be 2x the average length of human demos: {int(self.human_stats['length'] * 2)}"
)
cfg["task"]["termination_config"]["max_steps"] = int(self.human_stats["length"] * 2)
else:
logger.info(f"Setting timeout to be {self.cfg.max_steps} steps through config.")
cfg["task"]["termination_config"]["max_steps"] = self.cfg.max_steps
cfg["task"]["include_obs"] = False
env = og.Environment(configs=cfg)
# instantiate env wrapper
env = instantiate(env_wrapper, env=env)
return env
def load_robot(self) -> BaseRobot:
"""
Loads and returns the robot instance from the environment.
Returns:
BaseRobot: The robot instance loaded from the environment.
"""
robot = self.env.scene.object_registry("name", "robot_r1")
return robot
def load_policy(self) -> Any:
"""
Loads and returns the policy instance.
"""
policy = instantiate(self.cfg.model)
logger.info("")
logger.info("=" * 50)
logger.info(f"Loaded policy: {self.cfg.policy_name}")
logger.info("=" * 50)
logger.info("")
return policy
def load_metrics(self) -> List[MetricBase]:
"""
Load agent and task metrics.
"""
return [AgentMetric(self.human_stats), TaskMetric(self.human_stats)]
def step(self) -> Tuple[bool, bool]:
"""
Performs a single step of the task by executing the policy, interacting with the environment,
processing observations, updating metrics, and tracking trial success.
Returns:
Tuple[bool, bool]:
- terminated (bool): Whether the episode has terminated (i.e., reached a terminal state).
- truncated (bool): Whether the episode was truncated (i.e., stopped due to a time limit or other constraint).
Workflow:
1. Computes the next action using the policy based on the current observation.
2. Steps the environment with the computed action and retrieves the next observation,
termination and truncation flags, and additional info.
3. If the episode has ended (terminated or truncated), increments the trial counter and
updates the count of successful trials if the task was completed successfully.
4. Preprocesses the new observation.
5. Invokes step callbacks for all registered metrics to update their state.
6. Returns the termination and truncation status.
"""
self.robot_action = self.policy.forward(obs=self.obs)
obs, _, terminated, truncated, info = self.env.step(self.robot_action, n_render_iterations=1)
# process obs
self.obs = self._preprocess_obs(obs)
if terminated or truncated:
self.n_trials += 1
if info["done"]["success"]:
self.n_success_trials += 1
for metric in self.metrics:
metric.step_callback(self.env)
return terminated, truncated
@property
def video_writer(self) -> Tuple[Container, Stream]:
"""
Returns the video writer for the current evaluation step.
"""
return self._video_writer
@video_writer.setter
def video_writer(self, video_writer: Tuple[Container, Stream]) -> None:
if self._video_writer is not None:
(container, stream) = self._video_writer
# Flush any remaining packets
for packet in stream.encode():
container.mux(packet)
# Close the container
container.close()
self._video_writer = video_writer
def load_task_instance(self, instance_id: int, test_hidden: bool = False) -> None:
"""
Loads the configuration for a specific task instance.
Args:
instance_id (int): The ID of the task instance to load.
test_hidden (bool): [Interal use only] Whether to load the hidden test instance.
"""
scene_model = self.env.task.scene_name
tro_filename = self.env.task.get_cached_activity_scene_filename(
scene_model=scene_model,
activity_name=self.env.task.activity_name,
activity_definition_id=self.env.task.activity_definition_id,
activity_instance_id=instance_id,
)
if test_hidden:
tro_file_path = os.path.join(
gm.DATA_PATH,
"2025-challenge-test-instances",
self.env.task.activity_name,
f"{tro_filename}-tro_state.json",
)
else:
tro_file_path = os.path.join(
get_task_instance_path(scene_model),
f"json/{scene_model}_task_{self.env.task.activity_name}_instances/{tro_filename}-tro_state.json",
)
with open(tro_file_path, "r") as f:
tro_state = recursively_convert_to_torch(json.load(f))
for tro_key, tro_state in tro_state.items():
if tro_key == "robot_poses":
presampled_robot_poses = tro_state
robot_pos = presampled_robot_poses[self.robot.model_name][0]["position"]
robot_quat = presampled_robot_poses[self.robot.model_name][0]["orientation"]
self.robot.set_position_orientation(robot_pos, robot_quat)
# Write robot poses to scene metadata
self.env.scene.write_task_metadata(key=tro_key, data=tro_state)
else:
self.env.task.object_scope[tro_key].load_state(tro_state, serialized=False)
# Try to ensure that all task-relevant objects are stable
# They should already be stable from the sampled instance, but there is some issue where loading the state
# causes some jitter (maybe for small mass / thin objects?)
for _ in range(25):
og.sim.step_physics()
for entity in self.env.task.object_scope.values():
if not entity.is_system and entity.exists:
entity.keep_still()
self.env.scene.update_initial_file()
self.env.scene.reset()
def _preprocess_obs(self, obs: dict) -> dict:
"""
Preprocess the observation dictionary before passing it to the policy.
Args:
obs (dict): The observation dictionary to preprocess.
Returns:
dict: The preprocessed observation dictionary.
"""
obs = flatten_obs_dict(obs)
base_pose = self.robot.get_position_orientation()
cam_rel_poses = []
# The first time we query for camera parameters, it will return all zeros
# For this case, we use camera.get_position_orientation() instead.
# The reason we are not using camera.get_position_orientation() by defualt is because it will always return the most recent camera poses
# However, since og render is somewhat "async", it takes >= 3 render calls per step to actually get the up-to-date camera renderings
# Since we are using n_render_iterations=1 for speed concern, we need the correct corresponding camera poses instead of the most update-to-date one.
# Thus, we use camera parameters which are guaranteed to be in sync with the visual observations.
for camera_name in ROBOT_CAMERA_NAMES["R1Pro"].values():
camera = self.robot.sensors[camera_name.split("::")[1]]
direct_cam_pose = camera.camera_parameters["cameraViewTransform"]
if np.allclose(direct_cam_pose, np.zeros(16)):
cam_rel_poses.append(
th.cat(T.relative_pose_transform(*(camera.get_position_orientation()), *base_pose))
)
else:
cam_pose = T.mat2pose(th.tensor(np.linalg.inv(np.reshape(direct_cam_pose, [4, 4]).T), dtype=th.float32))
cam_rel_poses.append(th.cat(T.relative_pose_transform(*cam_pose, *base_pose)))
obs["robot_r1::cam_rel_poses"] = th.cat(cam_rel_poses, axis=-1)
# append task id to obs
obs["task_id"] = th.tensor([TASK_NAMES_TO_INDICES[self.cfg.task.name]], dtype=th.int64)
return obs
def _write_video(self) -> None:
"""
Write the current robot observations to video.
"""
if ROBOT_CAMERA_NAMES["R1Pro"]["head"] + "::rgb" not in self.obs:
return
# concatenate obs
left_wrist_rgb = cv2.resize(
self.obs[ROBOT_CAMERA_NAMES["R1Pro"]["left_wrist"] + "::rgb"].numpy(),
(224, 224),
)
right_wrist_rgb = cv2.resize(
self.obs[ROBOT_CAMERA_NAMES["R1Pro"]["right_wrist"] + "::rgb"].numpy(),
(224, 224),
)
head_rgb = cv2.resize(
self.obs[ROBOT_CAMERA_NAMES["R1Pro"]["head"] + "::rgb"].numpy(),
(448, 448),
)
write_video(
np.expand_dims(np.hstack([np.vstack([left_wrist_rgb, right_wrist_rgb]), head_rgb]), 0),
video_writer=self.video_writer,
batch_size=1,
mode="rgb",
)
def reset(self) -> None:
"""
Reset the environment, policy, and compute metrics.
"""
self.obs = self._preprocess_obs(self.env.reset()[0])
# run metric start callbacks
for metric in self.metrics:
metric.start_callback(self.env)
self.policy.reset()
self.n_success_trials, self.n_trials = 0, 0
def __enter__(self):
signal(SIGINT, self._sigint_handler)
return self
def __exit__(self, exc_type, exc_value, exc_tb):
logger.info("")
logger.info("=" * 50)
logger.info(f"Total success trials: {self.n_success_trials}")
logger.info(f"Total trials: {self.n_trials}")
if self.n_trials > 0:
logger.info(f"Success rate: {self.n_success_trials / self.n_trials}")
logger.info("=" * 50)
logger.info("")
if exc_type is not None:
traceback.print_exception(exc_type, exc_value, exc_tb)
self.video_writer = None
self.env.close()
og.shutdown()
def _sigint_handler(self, signal_received, frame):
logger.warning("SIGINT or CTRL-C detected.\n")
self.__exit__(None, None, None)
sys.exit(0)