Skip to content

covered

Covered

Bases: RelativeObjectState, BooleanState

Source code in object_states/covered.py
class Covered(RelativeObjectState, BooleanState):
    def __init__(self, obj):
        # Run super first
        super().__init__(obj)

        # Set internal values
        self._visual_particle_groups = None
        self._n_initial_visual_particles = None

    @staticmethod
    def get_dependencies():
        # AABB needed for sampling visual particles on an object
        return RelativeObjectState.get_dependencies() + [AABB]

    def _initialize(self):
        # Create the visual particle groups
        self._visual_particle_groups = OrderedDict((get_element_name_from_system(system), system.create_attachment_group(obj=self.obj))
                                                   for system in get_visual_particle_systems().values())

        # Default initial particles is 0
        self._n_initial_visual_particles = OrderedDict((get_element_name_from_system(system), 0)
                                                       for system in get_visual_particle_systems().values())

    def _get_value(self, system):
        # Value is false by default
        value = False
        # First, we check what type of system
        # Currently, we support VisualParticleSystems and FluidSystems
        if issubclass(system, VisualParticleSystem):
            # We check whether the current number of particles assigned to the group is greater than the threshold
            name = get_element_name_from_system(system)
            value = system.num_group_particles(group=self._visual_particle_groups[name]) \
                   > m.VISUAL_PARTICLE_THRESHOLD * self._n_initial_visual_particles[name]
        elif issubclass(system, FluidSystem):
            # We only check if we have particle instancers currently
            if len(system.particle_instancers) > 0:
                # We've already cached particle contacts, so we merely search through them to see if any particles are
                # touching the object and are visible (the non-visible ones are considered already "removed")
                n_near_particles = 0
                for instancer, particle_idxs in system.state_cache["obj_particle_contacts"][self.obj].items():
                    particle_idxs = np.array(list(particle_idxs))
                    n_near_particles += np.sum(instancer.particle_visibilities[particle_idxs])
                # Heuristic: If the number of near particles is above the threshold, we consdier this covered
                value = n_near_particles >= m.FLUID_THRESHOLD
        else:
            raise ValueError(f"Invalid system {system} received for getting Covered state!"
                             f"Currently, only VisualParticleSystems and FluidSystems are supported.")

        return value

    def _set_value(self, system, new_value):
        # Default success value is True
        success = True
        # First, we check what type of system
        # Currently, we support VisualParticleSystems and FluidSystems
        if issubclass(system, VisualParticleSystem):
            # Check current state and only do something if we're changing state
            if self.get_value(system) != new_value:
                name = get_element_name_from_system(system)
                group = self._visual_particle_groups[name]
                if new_value:
                    # Generate particles
                    success = system.generate_group_particles_on_object(group=group)
                    # If we succeeded with generating particles (new_value = True), store additional info
                    if success:
                        # Store how many particles there are now -- this is the "maximum" number possible
                        self._n_initial_visual_particles[name] = system.num_group_particles(group=group)
                else:
                    # We remove all of this group's particles
                    system.remove_all_group_particles(group=group)

        elif issubclass(system, FluidSystem):
            # Check current state and only do something if we're changing state
            if self.get_value(system) != new_value:
                if new_value:
                    # Sample particles on top of the object
                    system.generate_particle_instancer_on_object(obj=self.obj, max_samples=m.MAX_FLUID_PARTICLES)
                else:
                    # We hide all particles within range to be garbage collected by fluid system
                    for inst, particle_idxs in system.state_cache["obj_particle_contacts"][self.obj].items():
                        indices = np.array(list(particle_idxs))
                        current_visibilities = inst.particle_visibilities
                        current_visibilities[indices] = 0
                        inst.particle_visibilities = current_visibilities

        else:
            raise ValueError(f"Invalid system {system} received for setting Covered state!"
                             f"Currently, only VisualParticleSystems and FluidSystems are supported.")

        return success

    @property
    def state_size(self):
        # We have a single value for every visual particle system
        return len(get_visual_particle_systems())

    @classproperty
    def supported_systems(self):
        """
        Returns:
            list: All systems used in this state, ordered deterministically
        """
        return list(get_visual_particle_systems().values()) + list(get_fluid_systems().values())

    def _dump_state(self):
        # For fluid systems, we don't need to dump state, because the fluid systems themselves handle all state dumping
        # related to fluids
        # For every visual particle system, add the initial number of particles
        state = OrderedDict()
        for system in get_visual_particle_systems().values():
            name = get_element_name_from_system(system)
            state[f"{name}_initial_visual_particles"] = self._n_initial_visual_particles[name]

        return state

    def _load_state(self, state):
        # For fluid systems, we don't need to load state, because the fluid systems themselves handle all state loading
        # related to fluids
        # For every visual particle system, set the initial number of particles
        for system in get_visual_particle_systems().values():
            name = get_element_name_from_system(system)
            self._n_initial_visual_particles[name] = state[f"{name}_initial_visual_particles"]

    def _serialize(self, state):
        return np.array([val for val in state.values()], dtype=float)

    def _deserialize(self, state):
        state_dict = OrderedDict()
        for i, system in enumerate(get_visual_particle_systems().values()):
            name = get_element_name_from_system(system)
            state_dict[f"{name}_initial_visual_particles"] = int(state[i])

        return state_dict, len(state_dict)

supported_systems()

Returns:

Name Type Description
list

All systems used in this state, ordered deterministically

Source code in object_states/covered.py
@classproperty
def supported_systems(self):
    """
    Returns:
        list: All systems used in this state, ordered deterministically
    """
    return list(get_visual_particle_systems().values()) + list(get_fluid_systems().values())