Skip to content

submission_the_north_star

TheNorthStarWrapper

Bases: EnvironmentWrapper

Parameters:

Name Type Description Default
env Environment

The environment to wrap.

required
Source code in OmniGibson/omnigibson/learning/wrappers/challenge_submissions/submission_the_north_star.py
class TheNorthStarWrapper(EnvironmentWrapper):
    """
    Args:
        env (og.Environment): The environment to wrap.
    """

    def step(self, robot_action: th.Tensor, n_render_iterations: int) -> Tuple[dict, float, bool, bool, dict]:
        if self.action_buffer is None or self.current_action_idx >= self.horizon:
            # We just resetted or exhausted the previous action buffer
            self.obs_buffer = []
            self.horizon = robot_action.shape[0]
            self.action_buffer = robot_action
            self.current_action_idx = 0
        current_action = self.action_buffer[self.current_action_idx]

        obs, _, terminated, truncated, info = self.env.step(current_action, n_render_iterations=n_render_iterations)

        # process obs
        if self.current_action_idx + 1 == self.horizon:
            # We exhausted the action buffer, need to get new action from policy
            # activate for nav module, inactivate for efficiency
            obs["past_obs"] = self.obs_buffer
            obs["robot_rel_lin_vel"] = self.env.robots[0].get_relative_linear_velocity()
            obs["robot_rel_ang_vel"] = self.env.robots[0].get_relative_angular_velocity()
            obs["seg_id_map"] = pickle.dumps(VisionSensor.INSTANCE_ID_REGISTRY)
            obs["need_new_action"] = True
        else:
            # store obs for future policy input
            pobs = self.process_obs_for_policy(obs)
            for topic in [
                "robot_r1::robot_r1:left_realsense_link:Camera:0::seg_semantic",
                "robot_r1::robot_r1:left_realsense_link:Camera:0::seg_instance_id",
                "robot_r1::robot_r1:left_realsense_link:Camera:0::rgb",
                "robot_r1::robot_r1:left_realsense_link:Camera:0::depth_linear",
                "robot_r1::robot_r1:right_realsense_link:Camera:0::seg_semantic",
                "robot_r1::robot_r1:right_realsense_link:Camera:0::seg_instance_id",
                "robot_r1::robot_r1:right_realsense_link:Camera:0::rgb",
                "robot_r1::robot_r1:right_realsense_link:Camera:0::depth_linear",
                # 'robot_r1::robot_r1:zed_link:Camera:0::seg_semantic', 'robot_r1::robot_r1:zed_link:Camera:0::seg_instance_id', 'robot_r1::robot_r1:zed_link:Camera:0::rgb', 'robot_r1::robot_r1:zed_link:Camera:0::depth_linear', 'robot_r1::proprio', 'robot_r1::cam_rel_poses' \
            ]:
                pobs.pop(topic)
            if (self.current_action_idx % (self.horizon / 2)) != (self.horizon / 2 - 1):
                for topic_head_imgs in [
                    "robot_r1::robot_r1:zed_link:Camera:0::seg_semantic",
                    "robot_r1::robot_r1:zed_link:Camera:0::seg_instance_id",
                    "robot_r1::robot_r1:zed_link:Camera:0::rgb",
                    "robot_r1::robot_r1:zed_link:Camera:0::depth_linear",
                ]:
                    pobs.pop(topic_head_imgs)
            self.obs_buffer.append(pobs)
            obs = {"need_new_action": False}

        # Increment action index
        self.current_action_idx += 1

        return obs, _, terminated, truncated, info

    def process_obs_for_policy(self, obs: dict) -> dict:
        """
        Preprocess the observation dictionary before passing it to the policy.
        Args:
            obs (dict): The observation dictionary to preprocess.

        Returns:
            dict: The preprocessed observation dictionary.
        """
        obs = flatten_obs_dict(obs)
        base_pose = self.env.robots[0].get_position_orientation()
        cam_rel_poses = []
        # The first time we query for camera parameters, it will return all zeros
        # For this case, we use camera.get_position_orientation() instead.
        # The reason we are not using camera.get_position_orientation() by defualt is because it will always return the most recent camera poses
        # However, since og render is somewhat "async", it takes >= 3 render calls per step to actually get the up-to-date camera renderings
        # Since we are using n_render_iterations=1 for speed concern, we need the correct corresponding camera poses instead of the most update-to-date one.
        # Thus, we use camera parameters which are guaranteed to be in sync with the visual observations.
        for camera_name in ROBOT_CAMERA_NAMES["R1Pro"].values():
            camera = self.env.robots[0].sensors[camera_name.split("::")[1]]
            direct_cam_pose = camera.camera_parameters["cameraViewTransform"]
            if np.allclose(direct_cam_pose, np.zeros(16)):
                cam_rel_poses.append(
                    th.cat(T.relative_pose_transform(*(camera.get_position_orientation()), *base_pose))
                )
            else:
                cam_pose = T.mat2pose(th.tensor(np.linalg.inv(np.reshape(direct_cam_pose, [4, 4]).T), dtype=th.float32))
                cam_rel_poses.append(th.cat(T.relative_pose_transform(*cam_pose, *base_pose)))
        obs["robot_r1::cam_rel_poses"] = th.cat(cam_rel_poses, axis=-1)
        obs["seg_id_map"] = pickle.dumps(VisionSensor.INSTANCE_ID_REGISTRY)
        obs["robot_rel_lin_vel"] = self.env.robots[0].get_relative_linear_velocity()
        obs["robot_rel_ang_vel"] = self.env.robots[0].get_relative_angular_velocity()
        return obs

    def __init__(self, env: Environment):
        super().__init__(env=env)
        self.cfg = env.config
        # Note that from eval.py we only set rgb modality, here we include more (depth + seg_instance_id)
        # Here, we change the camera resolution and head camera aperture to match the one we used in data collection
        robot = env.robots[0]
        # Update robot sensors:
        for camera_id, camera_name in ROBOT_CAMERA_NAMES["R1Pro"].items():
            sensor_name = camera_name.split("::")[1]
            if camera_id == "head":
                robot.sensors[sensor_name].horizontal_aperture = 40.0
                robot.sensors[sensor_name].image_height = HEAD_RESOLUTION[0]
                robot.sensors[sensor_name].image_width = HEAD_RESOLUTION[1]
            else:
                robot.sensors[sensor_name].image_height = WRIST_RESOLUTION[0]
                robot.sensors[sensor_name].image_width = WRIST_RESOLUTION[1]
            # add depth and segmentation
            robot.sensors[sensor_name].add_modality("depth_linear")
            robot.sensors[sensor_name].add_modality("seg_semantic")
            robot.sensors[sensor_name].add_modality("seg_instance_id")
        # reload observation space
        env.load_observation_space()
        logger.info("Reloaded observation space!")

        # Initialize action buffer and related variables
        self.action_buffer = None
        self.horizon = None
        self.current_action_idx = None
        self.obs_buffer = None

    def reset(self):
        self.action_buffer = None
        self.horizon = None
        self.current_action_idx = None
        self.obs_buffer = None
        return super().reset()

process_obs_for_policy(obs)

Preprocess the observation dictionary before passing it to the policy. Args: obs (dict): The observation dictionary to preprocess.

Returns:

Type Description
dict

The preprocessed observation dictionary.

Source code in OmniGibson/omnigibson/learning/wrappers/challenge_submissions/submission_the_north_star.py
def process_obs_for_policy(self, obs: dict) -> dict:
    """
    Preprocess the observation dictionary before passing it to the policy.
    Args:
        obs (dict): The observation dictionary to preprocess.

    Returns:
        dict: The preprocessed observation dictionary.
    """
    obs = flatten_obs_dict(obs)
    base_pose = self.env.robots[0].get_position_orientation()
    cam_rel_poses = []
    # The first time we query for camera parameters, it will return all zeros
    # For this case, we use camera.get_position_orientation() instead.
    # The reason we are not using camera.get_position_orientation() by defualt is because it will always return the most recent camera poses
    # However, since og render is somewhat "async", it takes >= 3 render calls per step to actually get the up-to-date camera renderings
    # Since we are using n_render_iterations=1 for speed concern, we need the correct corresponding camera poses instead of the most update-to-date one.
    # Thus, we use camera parameters which are guaranteed to be in sync with the visual observations.
    for camera_name in ROBOT_CAMERA_NAMES["R1Pro"].values():
        camera = self.env.robots[0].sensors[camera_name.split("::")[1]]
        direct_cam_pose = camera.camera_parameters["cameraViewTransform"]
        if np.allclose(direct_cam_pose, np.zeros(16)):
            cam_rel_poses.append(
                th.cat(T.relative_pose_transform(*(camera.get_position_orientation()), *base_pose))
            )
        else:
            cam_pose = T.mat2pose(th.tensor(np.linalg.inv(np.reshape(direct_cam_pose, [4, 4]).T), dtype=th.float32))
            cam_rel_poses.append(th.cat(T.relative_pose_transform(*cam_pose, *base_pose)))
    obs["robot_r1::cam_rel_poses"] = th.cat(cam_rel_poses, axis=-1)
    obs["seg_id_map"] = pickle.dumps(VisionSensor.INSTANCE_ID_REGISTRY)
    obs["robot_rel_lin_vel"] = self.env.robots[0].get_relative_linear_velocity()
    obs["robot_rel_ang_vel"] = self.env.robots[0].get_relative_angular_velocity()
    return obs