Skip to content

attachment

Attached

Bases: RelativeObjectState, BooleanState, ContactSubscribedStateMixin

Handles attachment between two rigid objects, by creating a fixed joint between self (child) and other (parent) At any given moment, an object can only be attached to at most one other object.

There are three types of attachment

STICKY: unidirectional, attach if in contact. SYMMETRIC: bidirectional, attach if in contact AND the other object has SYMMETRIC type with the same attachment_category (e.g. "magnet"). MALE/FEMALE: bidirectional, attach if in contact AND the other object has the opposite end (male / female) with the same attachment_category (e.g. "usb")

Subclasses ContactSubscribedStateMixin on_contact_found function attempts to attach two objects when a CONTACT_FOUND event happens

Source code in object_states/attachment.py
class Attached(RelativeObjectState, BooleanState, ContactSubscribedStateMixin):
    """
        Handles attachment between two rigid objects, by creating a fixed joint between self (child) and other (parent)
        At any given moment, an object can only be attached to at most one other object.
        There are three types of attachment:
            STICKY: unidirectional, attach if in contact.
            SYMMETRIC: bidirectional, attach if in contact AND the other object has SYMMETRIC type
                with the same attachment_category (e.g. "magnet").
            MALE/FEMALE: bidirectional, attach if in contact AND the other object has the opposite end (male / female)
                with the same attachment_category (e.g. "usb")

        Subclasses ContactSubscribedStateMixin
        on_contact_found function attempts to attach two objects when a CONTACT_FOUND event happens
    """

    @staticmethod
    def get_dependencies():
        return RelativeObjectState.get_dependencies() + [ContactBodies]

    def __init__(self, obj, attachment_type=AttachmentType.STICKY, attachment_category=None):
        super(Attached, self).__init__(obj)
        self.attachment_type = attachment_type
        self.attachment_category = attachment_category

        self.attached_obj = None

    # Attempts to attach two objects when a CONTACT_FOUND event happens
    def on_contact(self, other, contact_header, contact_data):
        if contact_header.type == ContactEventType.CONTACT_FOUND:
            self.set_value(other, True)

    def _set_value(self, other, new_value):
        # Attempt to attach
        if new_value:
            if self.attached_obj == other:
                # Already attached to this object. Do nothing.
                return True
            elif self.attached_obj is None:
                # If the attachment type and category match, and they are in contact, they should attach
                if self._can_attach(other) and check_collision(self.obj, other):
                    self._attach(other)

                    # Non-sticky attachment is bidirectional
                    if self.attachment_type != AttachmentType.STICKY:
                        other.states[Attached].attached_obj = self.obj

                    return True
                else:
                    logging.warning(f"Trying to attach object {self.obj.name} to object {other.name}, "
                                    f"but they have attachment type/category mismatch or they are not in contact.")
                    return False
            else:
                logging.warning(f"Trying to attach object {self.obj.name} to object {other.name}, "
                                f"but it is already attached to object {self.attached_obj.name}. Try detaching first.")
                return False

        # Attempt to detach
        else:
            if self.attached_obj == other:
                self._detach()

                # Non-sticky attachment is bidirectional
                if self.attachment_type != AttachmentType.STICKY:
                    other.states[Attached].attached_obj = None

                # Wake up objects so that passive forces like gravity can be applied.
                self.obj.wake()
                other.wake()

            return True

    def _get_value(self, other):
        return other == self.attached_obj

    def _can_attach(self, other):
        """
        Returns:
            bool: True if self is sticky or self and other have matching attachment type and category
        """
        if self.attachment_type == AttachmentType.STICKY:
            return True
        else:
            if Attached not in other.states or other.states[Attached].attachment_category != self.attachment_category:
                return False
            elif self.attachment_type == AttachmentType.SYMMETRIC:
                return other.states[Attached].attachment_type == AttachmentType.SYMMETRIC
            elif self.attachment_type == AttachmentType.MALE:
                return other.states[Attached].attachment_type == AttachmentType.FEMALE
            else:
                return other.states[Attached].attachment_type == AttachmentType.MALE

    def _attach(self, other):
        """
        Creates a fixed joint between self.obj and other (where other is the parent and self.obj is the child)
        """
        self.attached_obj = other

        attached_joint_path = f"{other.prim_path}/attachment_joint"
        attached_joint = create_joint(
            prim_path=attached_joint_path,
            joint_type="FixedJoint",
            body0=f"{other.prim_path}/base_link",
            body1=f"{self.obj.prim_path}/base_link",
        )

        # Set the local pose of the attachment joint.
        parent_pos, parent_quat = other.get_position_orientation()
        child_pos, child_quat = self.obj.get_position_orientation()

        # The child frame aligns with the joint frame.
        # Compute the transformation of the child frame in the parent frame
        rel_pos, rel_quat = T.relative_pose_transform(child_pos, child_quat, parent_pos, parent_quat)
        # The child frame position in the parent frame needs to be divided by the parent's scale
        rel_pos /= other.scale
        rel_quat = rel_quat[[3, 0, 1, 2]]

        attached_joint.GetAttribute("physics:localPos0").Set(Gf.Vec3f(*rel_pos))
        attached_joint.GetAttribute("physics:localRot0").Set(Gf.Quatf(*rel_quat))
        attached_joint.GetAttribute("physics:localPos1").Set(Gf.Vec3f(0.0, 0.0, 0.0))
        attached_joint.GetAttribute("physics:localRot1").Set(Gf.Quatf(1.0, 0.0, 0.0, 0.0))

    def _detach(self):
        """
        Removes the attachment joint
        """
        attached_joint_path = f"{self.attached_obj.prim_path}/attachment_joint"
        self._simulator.stage.RemovePrim(attached_joint_path)
        self.attached_obj = None

    @property
    def settable(self):
        return True

    @property
    def state_size(self):
        return 1

    def _dump_state(self):
        return OrderedDict(attached_obj_uuid=-1 if self.attached_obj is None else self.attached_obj.uuid)

    def _load_state(self, state):
        uuid = state["attached_obj_uuid"]
        attached_obj = None if uuid == -1 else og.sim.scene.object_registry("uuid", uuid)

        if self.attached_obj != attached_obj:
            # If it's currently attached to something, detach.
            if self.attached_obj is not None:
                self._detach()

            # If the loaded state requires new attachment, attach.
            if attached_obj is not None:
                self._attach(attached_obj)

    def _serialize(self, state):
        return np.array([state["attached_obj_uuid"]], dtype=float)

    def _deserialize(self, state):
        return OrderedDict(attached_obj_uuid=int(state[0])), 1