Skip to content

covered

Covered

Bases: RelativeObjectState, BooleanState

Source code in omnigibson/object_states/covered.py
class Covered(RelativeObjectState, BooleanState):
    def __init__(self, obj):
        # Run super first
        super().__init__(obj)

        # Set internal values
        self._visual_particle_group = None
        self._n_initial_visual_particles = None

    @staticmethod
    def get_dependencies():
        # AABB needed for sampling visual particles on an object
        return RelativeObjectState.get_dependencies() + [AABB, ContactParticles]

    def remove(self):
        if self._initialized:
            self._clear_attachment_groups()

    def _clear_attachment_groups(self):
        """
        Utility function to destroy all corresponding attachment groups for this object
        """
        for system in VisualParticleSystem.get_active_systems().values():
            if self._visual_particle_group in system.groups:
                system.remove_attachment_group(self._visual_particle_group)

    def _initialize(self):
        super()._initialize()
        # Grab group name
        self._visual_particle_group = VisualParticleSystem.get_group_name(obj=self.obj)

    def _get_value(self, system):
        # Value is false by default
        value = False
        # First, we check what type of system
        # Currently, we support VisualParticleSystems and PhysicalParticleSystems
        if issubclass(system, VisualParticleSystem):
            if self._visual_particle_group in system.groups:
                # We check whether the current number of particles assigned to the group is greater than the threshold
                value = system.num_group_particles(group=self._visual_particle_group) >= m.VISUAL_PARTICLE_THRESHOLD
        elif issubclass(system, PhysicalParticleSystem):
            # We only check if we have particle instancers currently
            if len(system.particle_instancers) > 0:
                # We've already cached particle contacts, so we merely search through them to see if any particles are
                # touching the object and are visible (the non-visible ones are considered already "removed")
                n_near_particles = np.sum([len(idxs) for idxs in self.obj.states[ContactParticles].get_value(system).values()])
                # Heuristic: If the number of near particles is above the threshold, we consdier this covered
                value = n_near_particles >= m.PHYSICAL_PARTICLE_THRESHOLD
        else:
            raise ValueError(f"Invalid system {system} received for getting Covered state!"
                             f"Currently, only VisualParticleSystems and PhysicalParticleSystems are supported.")

        return value

    def _set_value(self, system, new_value):
        # Default success value is True
        success = True
        # First, we check what type of system
        # Currently, we support VisualParticleSystems and PhysicalParticleSystems
        if issubclass(system, VisualParticleSystem):
            # Create the group if it doesn't exist already
            if self._visual_particle_group not in system.groups:
                system.create_attachment_group(obj=self.obj)

            # Check current state and only do something if we're changing state
            if self.get_value(system) != new_value:
                if new_value:
                    # Generate particles
                    success = system.generate_group_particles_on_object(
                        group=self._visual_particle_group,
                        max_samples=m.MAX_VISUAL_PARTICLES,
                        min_samples_for_success=m.VISUAL_PARTICLE_THRESHOLD,
                    )
                else:
                    # We remove all of this group's particles
                    system.remove_all_group_particles(group=self._visual_particle_group)

        elif issubclass(system, PhysicalParticleSystem):
            # Check current state and only do something if we're changing state
            if self.get_value(system) != new_value:
                if new_value:
                    # Sample particles on top of the object
                    success = system.generate_particles_on_object(
                        obj=self.obj,
                        max_samples=m.MAX_PHYSICAL_PARTICLES,
                        min_samples_for_success=m.PHYSICAL_PARTICLE_THRESHOLD,
                    )
                else:
                    # We delete all particles touching this object
                    for inst, particle_idxs in self.obj.states[ContactParticles].get_value(system).items():
                        inst.remove_particles(idxs=list(particle_idxs))

        else:
            raise ValueError(f"Invalid system {system} received for setting Covered state!"
                             f"Currently, only VisualParticleSystems and PhysicalParticleSystems are supported.")

        return success