Skip to content

simulator

Simulator

Bases: SimulationContext, Serializable

Simulator class for directly interfacing with the physx physics engine.

This is a monolithic class.

All created Simulator() instances will reference the same underlying Simulator object

Parameters:

Name Type Description Default
gravity float

gravity on z direction.

9.81
physics_dt float

dt between physics steps. Defaults to 1.0 / 60.0.

1.0 / 60.0
rendering_dt float

dt between rendering steps. Note: rendering means rendering a frame of the current application and not only rendering a frame to the viewports/ cameras. So UI elements of Isaac Sim will be refereshed with this dt as well if running non-headless. Defaults to 1.0 / 60.0.

1.0 / 60.0
stage_units_in_meters float

The metric units of assets. This will affect gravity value..etc. Defaults to 0.01.

1.0
viewer_width int

width of the camera image, in pixels

gm.DEFAULT_VIEWER_WIDTH
viewer_height int

height of the camera image, in pixels

gm.DEFAULT_VIEWER_HEIGHT
device None or str

specifies the device to be used if running on the gpu with torch backend

None
Source code in omnigibson/simulator.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
class Simulator(SimulationContext, Serializable):
    """
    Simulator class for directly interfacing with the physx physics engine.

    NOTE: This is a monolithic class.
        All created Simulator() instances will reference the same underlying Simulator object

    Args:
        gravity (float): gravity on z direction.
        physics_dt (float): dt between physics steps. Defaults to 1.0 / 60.0.
        rendering_dt (float): dt between rendering steps. Note: rendering means rendering a frame of the current
            application and not only rendering a frame to the viewports/ cameras. So UI elements of Isaac Sim will
            be refereshed with this dt as well if running non-headless. Defaults to 1.0 / 60.0.
        stage_units_in_meters (float): The metric units of assets. This will affect gravity value..etc.
            Defaults to 0.01.
        viewer_width (int): width of the camera image, in pixels
        viewer_height (int): height of the camera image, in pixels
        device (None or str): specifies the device to be used if running on the gpu with torch backend
        """
    _world_initialized = False

    def __init__(
            self,
            gravity=9.81,
            physics_dt=1.0 / 60.0,
            rendering_dt=1.0 / 60.0,
            stage_units_in_meters=1.0,
            viewer_width=gm.DEFAULT_VIEWER_WIDTH,
            viewer_height=gm.DEFAULT_VIEWER_HEIGHT,
            device=None,
    ):
        super().__init__(
            physics_dt=physics_dt,
            rendering_dt=rendering_dt,
            stage_units_in_meters=stage_units_in_meters,
            device=device,
        )

        if self._world_initialized:
            return
        Simulator._world_initialized = True
        self._dc_interface = _dynamic_control.acquire_dynamic_control_interface()
        set_camera_view()
        self._data_logger = DataLogger()
        self._contact_callback = self._physics_context._physx_sim_interface.subscribe_contact_report_events(self._on_contact)

        # Store other internal vars
        self.gravity = gravity

        # Store other references to variables that will be initialized later
        self._viewer = None
        self._viewer_camera = None
        self._camera_mover = None
        self._scene = None

        # Initialize viewer
        # TODO: Make this toggleable so we don't always have a viewer if we don't want to
        self._set_viewer_settings()
        self.viewer_width = viewer_width
        self.viewer_height = viewer_height

        # List of objects that need to be initialized during whenever the next sim step occurs
        self._objects_to_initialize = []

        # Set of categories that can be grasped by assisted grasping
        self.object_state_types = get_states_by_dependency_order()

        # Set of all non-Omniverse transition rules to apply.
        self._transition_rules = DEFAULT_RULES

        # Toggle simulator state once so that downstream omni features can be used without bugs
        # e.g.: particle sampling, which for some reason requires sim.play() to be called at least once
        self.play()
        self.stop()

        # Finally, update the physics settings
        # This needs to be done now, after an initial step + stop for some reason if we want to use GPU
        # dynamics, otherwise we get very strange behavior, e.g., PhysX complains about invalid transforms
        # and crashes
        self._set_physics_engine_settings()

    def __new__(
        cls,
        gravity=9.81,
        physics_dt=1.0 / 60.0,
        rendering_dt=1.0 / 60.0,
        stage_units_in_meters=1.0,
        viewer_width=gm.DEFAULT_VIEWER_WIDTH,
        viewer_height=gm.DEFAULT_VIEWER_HEIGHT,
        device_idx=0,
    ):
        # Overwrite since we have different kwargs
        if Simulator._instance is None:
            Simulator._instance = object.__new__(cls)
        else:
            carb.log_info("Simulator is defined already, returning the previously defined one")
        return Simulator._instance

    def _set_viewer_camera(self, prim_path="/World/viewer_camera"):
        """
        Creates a camera prim dedicated for this viewer at @prim_path if it doesn't exist,
        and sets this camera as the active camera for the viewer

        Args:
            prim_path (str): Path to check for / create the viewer camera
        """
        vp = acquire_viewport_interface()
        viewers_to_names = {vp.get_viewport_window(h): vp.get_viewport_window_name(h) for h in vp.get_instance_list()}
        self._viewer_camera = VisionSensor(
            prim_path=prim_path,
            name=prim_path.split("/")[-1],                  # Assume name is the lowest-level name in the prim_path
            modalities="rgb",
            image_height=self.viewer_height,
            image_width=self.viewer_width,
            viewport_name=viewers_to_names[self._viewer],
        )
        if not self._viewer_camera.loaded:
            self._viewer_camera.load(simulator=self)

        # We update its clipping range and focal length so we get a good FOV and so that it doesn't clip
        # nearby objects (default min is 1 m)
        self._viewer_camera.clipping_range = [0.001, 10000000.0]
        self._viewer_camera.focal_length = 17.0

        # Initialize the sensor
        self._viewer_camera.initialize()

        # Also need to potentially update our camera mover if it already exists
        if self._camera_mover is not None:
            self._camera_mover.set_cam(cam=self._viewer_camera)

    def _set_physics_engine_settings(self):
        """
        Set the physics engine with specified settings
        """
        assert self.is_stopped(), f"Cannot set simulator physics settings while simulation is playing!"
        self._physics_context.set_gravity(value=-self.gravity)
        # Also make sure we invert the collision group filter settings so that different collision groups cannot
        # collide with each other, and modify settings for speed optimization
        self._physics_context.set_invert_collision_group_filter(True)
        self._physics_context.enable_ccd(gm.ENABLE_CCD)
        self._physics_context.enable_flatcache(gm.ENABLE_FLATCACHE)

        # Enable GPU dynamics based on whether we need omni particles feature
        if gm.USE_GPU_DYNAMICS:
            self._physics_context.enable_gpu_dynamics(True)
            self._physics_context.set_broadphase_type("GPU")
        else:
            self._physics_context.enable_gpu_dynamics(False)
            self._physics_context.set_broadphase_type("MBP")

        # Set GPU Pairs capacity and other GPU settings
        self._physics_context.set_gpu_found_lost_pairs_capacity(gm.GPU_PAIRS_CAPACITY)
        self._physics_context.set_gpu_found_lost_aggregate_pairs_capacity(gm.GPU_AGGR_PAIRS_CAPACITY)
        self._physics_context.set_gpu_total_aggregate_pairs_capacity(gm.GPU_AGGR_PAIRS_CAPACITY)
        self._physics_context.set_gpu_max_particle_contacts(gm.GPU_MAX_PARTICLE_CONTACTS)

    @property
    def viewer_visibility(self):
        """
        Returns:
            bool: Whether the viewer is visible or not
        """
        return self._viewer_camera.viewer_visibility

    @viewer_visibility.setter
    def viewer_visibility(self, visible):
        """
        Sets whether the viewer should be visible or not in the Omni UI

        Args:
            visible (bool): Whether the viewer should be visible or not
        """
        self._viewer_camera.viewer_visibility = visible

    @property
    def viewer_height(self):
        """
        Returns:
            int: viewer height of this sensor, in pixels
        """
        # If the viewer camera hasn't been created yet, utilize the default width
        return gm.DEFAULT_VIEWER_HEIGHT if self._viewer_camera is None else self._viewer_camera.image_height

    @viewer_height.setter
    def viewer_height(self, height):
        """
        Sets the viewer height @height for this sensor

        Args:
            height (int): viewer height, in pixels
        """
        self._viewer_camera.image_height = height

    @property
    def viewer_width(self):
        """
        Returns:
            int: viewer width of this sensor, in pixels
        """
        # If the viewer camera hasn't been created yet, utilize the default height
        return gm.DEFAULT_VIEWER_WIDTH if self._viewer_camera is None else self._viewer_camera.image_width

    @viewer_width.setter
    def viewer_width(self, width):
        """
        Sets the viewer width @width for this sensor

        Args:
            width (int): viewer width, in pixels
        """
        self._viewer_camera.image_width = width

    def _set_viewer_settings(self):
        """
        Initializes a reference to the viewer in the App, and sets the frame size
        """
        # Store reference to viewer (see https://docs.omniverse.nvidia.com/app_isaacsim/app_isaacsim/reference_python_snippets.html#get-camera-parameters)
        viewport = acquire_viewport_interface()
        viewport_handle = viewport.get_instance("Viewport")
        self._viewer = viewport.get_viewport_window(viewport_handle)

        # Set viewer camera and frame size
        self._set_viewer_camera()

    def enable_viewer_camera_teleoperation(self):
        """
        Enables keyboard control of the active viewer camera for this simulation
        """
        self._camera_mover = CameraMover(cam=self._viewer_camera)
        self._camera_mover.print_info()

    def import_scene(self, scene):
        """
        Import a scene into the simulator. A scene could be a synthetic one or a realistic Gibson Environment.

        Args:
            scene (Scene): a scene object to load
        """
        assert self.is_stopped(), "Simulator must be stopped while importing a scene!"
        assert isinstance(scene, Scene), "import_scene can only be called with Scene"

        # Clear the existing scene if any
        self.clear()

        self._scene = scene
        self._scene.load(self)

        # Make sure simulator is not running, then start it so that we can initialize the scene
        assert self.is_stopped(), "Simulator must be stopped after importing a scene!"
        self.play()

        # Initialize the scene
        self._scene.initialize()

        # Need to one more step for particle systems to work
        self.step()
        self.stop()

    def initialize_object_on_next_sim_step(self, obj):
        """
        Initializes the object upon the next simulation step

        Args:
            obj (BasePrim): Object to initialize as soon as a new sim step is called
        """
        self._objects_to_initialize.append(obj)

    def import_object(self, obj, register=True, auto_initialize=True):
        """
        Import an object into the simulator.

        Args:
            obj (BaseObject): an object to load
            register (bool): whether to register this object internally in the scene registry
            auto_initialize (bool): If True, will auto-initialize the requested object on the next simulation step.
                Otherwise, we assume that the object will call initialize() on its own!
        """
        assert isinstance(obj, BaseObject), "import_object can only be called with BaseObject"

        # Make sure scene is loaded -- objects should not be loaded unless we have a reference to a scene
        assert self.scene is not None, "import_object needs to be called after import_scene"

        # Load the object in omniverse by adding it to the scene
        self.scene.add_object(obj, self, register=register, _is_call_from_simulator=True)

        # Lastly, additionally add this object automatically to be initialized as soon as another simulator step occurs
        # if requested
        if auto_initialize:
            self.initialize_object_on_next_sim_step(obj=obj)

    def remove_object(self, obj):
        """
        Remove a non-robot object from the simulator.

        Args:
            obj (BaseObject): a non-robot object to load
        """
        self._scene.remove_object(obj, simulator=self)
        self.app.update()

    def _non_physics_step(self):
        """
        Complete any non-physics steps such as state updates.
        """
        assert not self.is_stopped(), f"Simulator must not be stopped in order to run non physics step!"
        # Check to see if any objects should be initialized (only done IF we're playing)
        if len(self._objects_to_initialize) > 0 and self.is_playing():
            for obj in self._objects_to_initialize:
                obj.initialize()
            self._objects_to_initialize = []
            # Also update the scene registry
            # TODO: A better place to put this perhaps?
            self._scene.object_registry.update(keys="root_handle")

        # Propagate states if the feature is enabled
        if gm.ENABLE_OBJECT_STATES:

            # Cache values from all of the micro and macro particle systems.
            # This is used to store system-wide state which can be queried
            # by the object state system.
            for system in self.scene.systems:
                system.cache()

            # Step the object states in global topological order (if the scene exists).
            if self.scene is not None:
                for state_type in self.object_state_types:
                    for obj in self.scene.get_objects_with_state(state_type):
                        # Only update objects that have been initialized so far
                        if obj.initialized:
                            obj.states[state_type].update()

            # Perform system level updates to the micro and macro particle systems.
            # This allows for the states to handle changes in response to changes
            # induced by the object state system.
            for system in self.scene.systems:
                system.update()

            for obj in self.scene.objects:
                # Only update visuals for objects that have been initialized so far
                if isinstance(obj, StatefulObject) and obj.initialized:
                    obj.update_visuals()

    def _omni_update_step(self):
        """
        Step any omni-related things
        """
        # Clear the bounding box cache so that it gets updated during the next time it's called
        BoundingBoxAPI.clear()

    def _transition_rule_step(self):
        """
        Applies all internal non-Omniverse transition rules.
        """
        # Create a dict from rule to filter to objects we care about.
        obj_dict = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
        for obj in self.scene.objects + self.scene.robots:
            for rule in self._transition_rules:
                for fname, f in rule.individual_filters.items():
                    if f(obj):
                        obj_dict[rule]["individual"][fname].append(obj)
                for fname, f in rule.group_filters.items():
                    if f(obj):
                        obj_dict[rule]["group"][fname].append(obj)

        # For each rule, create a subset of the dict and apply it if applicable.
        added_obj_attrs = []
        removed_objs = []
        for rule in self._transition_rules:
            # Skip any rule that has no objects
            if rule not in obj_dict:
                continue
            # Skip any rule that has no group filters if it requires group filters
            group_f_objs = dict()
            if rule.requires_group_filters:
                group_f_objs = obj_dict[rule]["group"]
                if len(group_f_objs) == 0:
                    continue

            # Skip any rule that is missing an individual filter if it requires individual filters
            if rule.requires_individual_filters:
                individual_f_objs = obj_dict[rule]["individual"]
                if not all(fname in individual_f_objs for fname in rule.individual_filters.keys()):
                    continue
                # Get all cartesian cross product over all individual filter objects, and then attempt the transition rule.
                # If objects are to be added / removed, the transition function is
                # expected to return an instance of TransitionResults containing
                # information about those objects.
                # TODO: Consider optimizing itertools.product.
                # TODO: Track what needs to be added / removed at the Scene object level.
                # Comments from a PR on possible changes:
                # - Addition / removal tracking on the Scene object.
                # - Check if the objects are still in the scene in each step.
                for obj_tuple in itertools.product(*list(individual_f_objs.values())):
                    individual_objects = {fname: obj for fname, obj in zip(individual_f_objs.keys(), obj_tuple)}
                    did_transition, transition_output = rule.process(individual_objects=individual_objects, group_objects=group_f_objs)
                    if transition_output is not None:
                        # Transition output is a TransitionResults object
                        added_obj_attrs.extend(transition_output.add)
                        removed_objs.extend(transition_output.remove)
            else:
                # We try the transition rule once, since there's no cartesian cross product of combinations from the
                # individual filters we need to handle
                did_transition, transition_output = rule.process(individual_objects=dict(), group_objects=group_f_objs)
                if transition_output is not None:
                    added_obj_attrs.extend(transition_output.add)
                    removed_objs.extend(transition_output.remove)

        # Process all transition results.
        if len(removed_objs) > 0:
            disclaimer(
                f"We are attempting to remove objects during the transition rule phase of the simulator step.\n"
                f"However, Omniverse currently has a bug when using GPU dynamics where a segfault will occur if an "
                f"object in contact with another object is attempted to be removed.\n"
                f"This bug should be fixed by the next Omniverse release.\n"
                f"In the meantime, we instead teleport these objects to a graveyard location located far outside of "
                f"the scene."
            )
        for i, removed_obj in enumerate(removed_objs):
            # TODO: Ideally we want to remove objects, but because of Omniverse's bug on GPU physics, we simply move
            # the objects into a graveyard for now
            # self.remove_object(removed_obj)
            removed_obj.set_position(np.array(m.OBJECT_GRAVEYARD_POS) + np.ones(3) * i)

        for added_obj_attr in added_obj_attrs:
            new_obj = added_obj_attr.obj
            self.import_object(added_obj_attr.obj)
            pos, orn = added_obj_attr.pos, added_obj_attr.orn
            new_obj.set_position_orientation(position=pos, orientation=orn)

    def reset_scene(self):
        """
        Resets ths scene (if it exists) and its corresponding objects
        """
        if self.scene is not None and self.scene.initialized:
            self.scene.reset()

    def play(self):
        super().play()

        # Update all object / robot handles
        if self.scene is not None and self.scene.initialized:
            for obj in self.scene.objects:
                # Only need to update handles if object is already initialized as well
                if obj.initialized:
                    obj.update_handles()

            for robot in self.scene.robots:
                # Only need to update handles if robot is already initialized as well
                if robot.initialized:
                    robot.update_handles()

        # Check to see if any objects should be initialized
        if len(self._objects_to_initialize) > 0:
            for obj in self._objects_to_initialize:
                obj.initialize()
            self._objects_to_initialize = []
            # Also update the scene registry
            # TODO: A better place to put this perhaps?
            self._scene.object_registry.update(keys="root_handle")

    @property
    def n_physics_timesteps_per_render(self):
        """
        Number of physics timesteps per rendering timestep. rendering_dt has to be a multiple of physics_dt.

        Returns:
            int: Discrete number of physics timesteps to take per step
        """
        n_physics_timesteps_per_render = self.get_rendering_dt() / self.get_physics_dt()
        assert n_physics_timesteps_per_render.is_integer(), "render_timestep must be a multiple of physics_timestep"
        return int(n_physics_timesteps_per_render)

    def step(self, render=True, force_playing=False):
        """
        Step the simulation at self.render_timestep

        Args:
            render (bool): Whether rendering should occur or not
            force_playing (bool): If True, will force physics to propagate (i.e.: set simulation, if paused / stopped,
                to "play" mode)
        """
        # Possibly force playing
        if force_playing and not self.is_playing():
            self.play()

        if render:
            super().step(render=True)
        else:
            for i in range(self.n_physics_timesteps_per_render):
                super().step(render=False)

        # Additionally run non physics things if we have a valid scene
        if self._scene is not None:
            self._omni_update_step()
            if self.is_playing():
                self._non_physics_step()
                if gm.ENABLE_TRANSITION_RULES:
                    self._transition_rule_step()

        # TODO (eric): After stage changes (e.g. pose, texture change), it will take two super().step(render=True) for
        #  the result to propagate to the rendering. We could have called super().render() here but it will introduce
        #  a big performance regression.

    def step_physics(self):
        """
        Step the physics a single step.
        """
        self._physics_context._step(current_time=self.current_time)

    def _on_contact(self, contact_headers, contact_data):
        """
        This callback will be invoked after every PHYSICS step if there is any contact.
        For each of the pair of objects in each contact, we invoke the on_contact function for each of its states
        that subclass ContactSubscribedStateMixin. These states update based on contact events.
        """
        for contact_header in contact_headers:
            actor0 = str(PhysicsSchemaTools.intToSdfPath(contact_header.actor0))
            actor1 = str(PhysicsSchemaTools.intToSdfPath(contact_header.actor1))
            # actor0/1 are prim paths for links that are in contact. Find the corresponding objects.
            actor0_obj = self._scene.object_registry("prim_path", "/".join(actor0.split("/")[:-1]))
            actor1_obj = self._scene.object_registry("prim_path", "/".join(actor1.split("/")[:-1]))
            if actor0_obj is None or actor1_obj is None or not actor0_obj.initialized or not actor1_obj.initialized:
                continue

            for obj0, obj1 in [(actor0_obj, actor1_obj), (actor1_obj, actor0_obj)]:
                if not isinstance(obj0, StatefulObject):
                    continue
                for state_type in obj0.states:
                    if not issubclass(state_type, ContactSubscribedStateMixin):
                        continue
                    obj0.states[state_type].on_contact(obj1, contact_header, contact_data)

    def is_paused(self):
        """
        Returns:
            bool: True if the simulator is paused, otherwise False
        """
        return not (self.is_stopped() or self.is_playing())

    @contextlib.contextmanager
    def stopped(self):
        """
        A context scope for making sure the simulator is stopped during execution within this scope.
        Upon leaving the scope, the prior simulator state is restored.
        """
        # Infer what state we're currently in, then stop, yield, and then restore the original state
        sim_is_playing, sim_is_paused = self.is_playing(), self.is_paused()
        if sim_is_playing or sim_is_paused:
            og.sim.stop()
        yield
        if sim_is_playing: og.sim.play()
        elif sim_is_paused: og.sim.pause()

    @contextlib.contextmanager
    def playing(self):
        """
        A context scope for making sure the simulator is playing during execution within this scope.
        Upon leaving the scope, the prior simulator state is restored.
        """
        # Infer what state we're currently in, then stop, yield, and then restore the original state
        sim_is_stopped, sim_is_paused = self.is_stopped(), self.is_paused()
        if sim_is_stopped or sim_is_paused:
            og.sim.play()
        yield
        if sim_is_stopped: og.sim.stop()
        elif sim_is_paused: og.sim.pause()

    @contextlib.contextmanager
    def paused(self):
        """
        A context scope for making sure the simulator is paused during execution within this scope.
        Upon leaving the scope, the prior simulator state is restored.
        """
        # Infer what state we're currently in, then stop, yield, and then restore the original state
        sim_is_stopped, sim_is_playing = self.is_stopped(), self.is_playing()
        if sim_is_stopped or sim_is_playing:
            og.sim.pause()
        yield
        if sim_is_stopped: og.sim.stop()
        elif sim_is_playing: og.sim.play()

    @contextlib.contextmanager
    def slowed(self, dt):
        """
        A context scope for making the simulator simulation dt slowed, e.g.: for taking micro-steps for propagating
        instantaneous kinematics with minimal impact on physics propagation.

        NOTE: This will set both the physics dt and rendering dt to the same value during this scope.

        Upon leaving the scope, the prior simulator state is restored.
        """
        # Set dt, yield, then restore the original dt
        physics_dt, rendering_dt = self.get_physics_dt(), self.get_rendering_dt()
        self.set_simulation_dt(physics_dt=dt, rendering_dt=dt)
        yield
        self.set_simulation_dt(physics_dt=physics_dt, rendering_dt=rendering_dt)

    @classmethod
    def clear_instance(cls):
        SimulationContext.clear_instance()
        Simulator._world_initialized = None
        return

    def __del__(self):
        SimulationContext.__del__(self)
        Simulator._world_initialized = None
        return

    @property
    def dc(self):
        """
        Returns:
            _dynamic_control.DynamicControl: Dynamic control interface
        """
        return self._dc_interface

    @property
    def scene(self):
        """
        Returns:
            None or Scene: Scene currently loaded in this simulator. If no scene is loaded, returns None
        """
        return self._scene

    @property
    def viewer(self):
        """
        Returns:
            ViewportWindow: Active viewport window instance shown in the omni UI
        """
        return self._viewer

    @property
    def viewer_camera(self):
        """
        Returns:
            VisionSensor: Active camera sensor corresponding to the active viewport window instance shown in the omni UI
        """
        return self._viewer_camera

    @property
    def world_prim(self):
        """
        Returns:
            Usd.Prim: Prim at /World
        """
        return get_prim_at_path(prim_path="/World")

    def _clear_state(self):
        """
        Clears the internal state of this simulation
        """
        # Clear uniquely named items and other internal states
        clear_pu()
        clear_uu()

    def clear(self) -> None:
        """
        Clears the stage leaving the PhysicsScene only if under /World.
        """
        # Stop the physics
        self.stop()

        self._scene = None
        self._data_logger = DataLogger()

        # Load dummy stage, but don't clear sim to prevent circular loops
        self.load_stage(usd_path=f"{og.assets_path}/models/misc/clear_stage.usd")

    def restore(self, json_path):
        """
        Restore a simulation environment from @json_path.

        Args:
            json_path (str): Full path of JSON file to load, which contains information
                to recreate a scene.
        """
        if not json_path.endswith(".json"):
            logging.error(f"You have to define the full json_path to load from. Got: {json_path}")
            return

        # Clear the current stage
        self.clear()

        # Load the info from the json
        with open(json_path, "r") as f:
            scene_info = json.load(f)
        init_info = scene_info["init_info"]
        state = scene_info["state"]

        # Override the init info with our json path
        init_info["args"]["scene_file"] = json_path

        # Also make sure we have any additional modifications necessary from the specific scene
        og.REGISTERED_SCENES[init_info["class_name"]].modify_init_info_for_restoring(init_info=init_info)

        # Recreate and import the saved scene
        recreated_scene = create_object_from_init_info(init_info)
        self.import_scene(scene=recreated_scene)

        # Start the simulation and restore the dynamic state of the scene and then pause again
        self.play()
        self.load_state(state, serialized=False)

        logging.info("The saved simulation environment loaded.")

        return

    def save(self, json_path):
        """
        Saves the current simulation environment to @json_path.

        Args:
            json_path (str): Full path of JSON file to save (should end with .json), which contains information
                to recreate the current scene.
        """
        # Make sure the sim is not stopped, since we need to grab joint states
        assert not self.is_stopped(), "Simulator cannot be stopped when saving to USD!"

        # Make sure there are no objects in the initialization queue, if not, terminate early and notify user
        # Also run other sanity checks before saving
        if len(self._objects_to_initialize) > 0:
            logging.error("There are still objects to initialize! Please take one additional sim step and then save.")
            return
        if not self.scene:
            logging.warning("Scene has not been loaded. Nothing to save.")
            return
        if not json_path.endswith(".json"):
            logging.error(f"You have to define the full json_path to save the scene to. Got: {json_path}")
            return

        # Update scene info
        self.scene.update_objects_info()

        # Dump saved current state and also scene init info
        scene_info = {
            "state": self.scene.dump_state(serialized=False),
            "init_info": self.scene.get_init_info(),
            "objects_info": self.scene.get_objects_info(),
        }

        # Write this to the json file
        Path(os.path.dirname(json_path)).mkdir(parents=True, exist_ok=True)
        with open(json_path, "w+") as f:
            json.dump(scene_info, f, cls=NumpyEncoder, indent=4)

        logging.info("The current simulation environment saved.")

        return

    def get_data_logger(self):
        """
        Returns the data logger of the world.

        Returns:
            DataLogger: Data logger associated with this world
        """
        return self._data_logger

    def load_stage(self, usd_path):
        """
        Open the stage specified by USD file at @usd_path

        Args:
            usd_path (str): Absolute filepath to USD stage that should be loaded
        """
        # Stop the physics if we're playing
        if not self.is_stopped():
            logging.warning("Stopping simulation in order to load stage.")
            self.stop()

        # Store physics dt and rendering dt to reuse later
        # Note that the stage may have been deleted previously; if so, we use the default values
        # of 1/60, 1/60
        try:
            physics_dt = self.get_physics_dt()
        except:
            print("WARNING: Invalid or non-existent physics scene found. Setting physics dt to 1/60.")
            physics_dt = 1/60.
        rendering_dt = self.get_rendering_dt()

        # Clear simulation state
        self._clear_state()

        open_stage(usd_path=usd_path)

        # Re-initialize necessary internal vars
        self._app = omni.kit.app.get_app_interface()
        self._framework = carb.get_framework()
        self._timeline = omni.timeline.get_timeline_interface()
        self._timeline.set_auto_update(True)
        self._dynamic_control = _dynamic_control.acquire_dynamic_control_interface()
        self._cached_rate_limit_enabled = self._settings.get_as_bool("/app/runLoops/main/rateLimitEnabled")
        self._cached_rate_limit_frequency = self._settings.get_as_int("/app/runLoops/main/rateLimitFrequency")
        self._cached_min_frame_rate = self._settings.get_as_int("persistent/simulation/minFrameRate")
        self._loop_runner = omni_loop.acquire_loop_interface()

        self._init_stage(
            physics_dt=physics_dt,
            rendering_dt=rendering_dt,
            stage_units_in_meters=self._initial_stage_units_in_meters,
        )
        self._set_physics_engine_settings()
        self._setup_default_callback_fns()
        self._stage_open_callback = (
            omni.usd.get_context().get_stage_event_stream().create_subscription_to_pop(self._stage_open_callback_fn)
        )
        self._contact_callback = self._physics_context._physx_sim_interface.subscribe_contact_report_events(self._on_contact)

        # Set the viewer camera, and then set its default pose
        self._set_viewer_camera()
        self.viewer_camera.set_position_orientation(
            position=np.array(m.DEFAULT_VIEWER_CAMERA_POS),
            orientation=np.array(m.DEFAULT_VIEWER_CAMERA_QUAT),
        )

    def close(self):
        """
        Shuts down the OmniGibson application
        """
        self._app.shutdown()

    @property
    def device(self):
        """
        Returns:
            device (None or str): Device used in simulation backend
        """
        return self._device

    @device.setter
    def device(self, device):
        """
        Sets the device used for sim backend

        Args:
            device (None or str): Device to set for the simulation backend
        """
        self._device = device
        if self._device is not None and "cuda" in self._device:
            device_id = self._settings.get_as_int("/physics/cudaDevice")
            self._device = f"cuda:{device_id}"

    @property
    def state_size(self):
        # Total state size is the state size of our scene
        return self._scene.state_size

    def _dump_state(self):
        # Default state is from the scene
        return self._scene.dump_state(serialized=False)

    def _load_state(self, state):
        # Default state is from the scene
        self._scene.load_state(state=state, serialized=False)

    def load_state(self, state, serialized=False):
        # We need to make sure the simulator is playing since joint states only get updated when playing
        assert self.is_playing()

        # Run super
        super().load_state(state=state, serialized=serialized)

        # Highlight that at the current step, the non-kinematic states are potentially inaccurate because a sim
        # step is needed to propagate specific states in physics backend
        # TODO: This should be resolved in a future omniverse release!
        disclaimer("Attempting to load simulator state.\n"
                   "Currently, omniverse does not support exclusively stepping kinematics, so we cannot update some "
                   "of our object states relying on updated kinematics until a simulator step is taken!\n"
                   "Object states such as OnTop, Inside, etc. relying on relative spatial information will inaccurate"
                   "until a single sim step is taken.\n"
                   "This should be resolved by the next NVIDIA Isaac Sim release.")

    def _serialize(self, state):
        # Default state is from the scene
        return self._scene.serialize(state=state)

    def _deserialize(self, state):
        # Default state is from the scene
        return self._scene.deserialize(state=state), self._scene.state_size

dc property

Returns:

Type Description

_dynamic_control.DynamicControl: Dynamic control interface

device property writable

Returns:

Name Type Description
device None or str

Device used in simulation backend

n_physics_timesteps_per_render property

Number of physics timesteps per rendering timestep. rendering_dt has to be a multiple of physics_dt.

Returns:

Name Type Description
int

Discrete number of physics timesteps to take per step

scene property

Returns:

Type Description

None or Scene: Scene currently loaded in this simulator. If no scene is loaded, returns None

viewer property

Returns:

Name Type Description
ViewportWindow

Active viewport window instance shown in the omni UI

viewer_camera property

Returns:

Name Type Description
VisionSensor

Active camera sensor corresponding to the active viewport window instance shown in the omni UI

viewer_height property writable

Returns:

Name Type Description
int

viewer height of this sensor, in pixels

viewer_visibility property writable

Returns:

Name Type Description
bool

Whether the viewer is visible or not

viewer_width property writable

Returns:

Name Type Description
int

viewer width of this sensor, in pixels

world_prim property

Returns:

Type Description

Usd.Prim: Prim at /World

clear()

Clears the stage leaving the PhysicsScene only if under /World.

Source code in omnigibson/simulator.py
def clear(self) -> None:
    """
    Clears the stage leaving the PhysicsScene only if under /World.
    """
    # Stop the physics
    self.stop()

    self._scene = None
    self._data_logger = DataLogger()

    # Load dummy stage, but don't clear sim to prevent circular loops
    self.load_stage(usd_path=f"{og.assets_path}/models/misc/clear_stage.usd")

close()

Shuts down the OmniGibson application

Source code in omnigibson/simulator.py
def close(self):
    """
    Shuts down the OmniGibson application
    """
    self._app.shutdown()

enable_viewer_camera_teleoperation()

Enables keyboard control of the active viewer camera for this simulation

Source code in omnigibson/simulator.py
def enable_viewer_camera_teleoperation(self):
    """
    Enables keyboard control of the active viewer camera for this simulation
    """
    self._camera_mover = CameraMover(cam=self._viewer_camera)
    self._camera_mover.print_info()

get_data_logger()

Returns the data logger of the world.

Returns:

Name Type Description
DataLogger

Data logger associated with this world

Source code in omnigibson/simulator.py
def get_data_logger(self):
    """
    Returns the data logger of the world.

    Returns:
        DataLogger: Data logger associated with this world
    """
    return self._data_logger

import_object(obj, register=True, auto_initialize=True)

Import an object into the simulator.

Parameters:

Name Type Description Default
obj BaseObject

an object to load

required
register bool

whether to register this object internally in the scene registry

True
auto_initialize bool

If True, will auto-initialize the requested object on the next simulation step. Otherwise, we assume that the object will call initialize() on its own!

True
Source code in omnigibson/simulator.py
def import_object(self, obj, register=True, auto_initialize=True):
    """
    Import an object into the simulator.

    Args:
        obj (BaseObject): an object to load
        register (bool): whether to register this object internally in the scene registry
        auto_initialize (bool): If True, will auto-initialize the requested object on the next simulation step.
            Otherwise, we assume that the object will call initialize() on its own!
    """
    assert isinstance(obj, BaseObject), "import_object can only be called with BaseObject"

    # Make sure scene is loaded -- objects should not be loaded unless we have a reference to a scene
    assert self.scene is not None, "import_object needs to be called after import_scene"

    # Load the object in omniverse by adding it to the scene
    self.scene.add_object(obj, self, register=register, _is_call_from_simulator=True)

    # Lastly, additionally add this object automatically to be initialized as soon as another simulator step occurs
    # if requested
    if auto_initialize:
        self.initialize_object_on_next_sim_step(obj=obj)

import_scene(scene)

Import a scene into the simulator. A scene could be a synthetic one or a realistic Gibson Environment.

Parameters:

Name Type Description Default
scene Scene

a scene object to load

required
Source code in omnigibson/simulator.py
def import_scene(self, scene):
    """
    Import a scene into the simulator. A scene could be a synthetic one or a realistic Gibson Environment.

    Args:
        scene (Scene): a scene object to load
    """
    assert self.is_stopped(), "Simulator must be stopped while importing a scene!"
    assert isinstance(scene, Scene), "import_scene can only be called with Scene"

    # Clear the existing scene if any
    self.clear()

    self._scene = scene
    self._scene.load(self)

    # Make sure simulator is not running, then start it so that we can initialize the scene
    assert self.is_stopped(), "Simulator must be stopped after importing a scene!"
    self.play()

    # Initialize the scene
    self._scene.initialize()

    # Need to one more step for particle systems to work
    self.step()
    self.stop()

initialize_object_on_next_sim_step(obj)

Initializes the object upon the next simulation step

Parameters:

Name Type Description Default
obj BasePrim

Object to initialize as soon as a new sim step is called

required
Source code in omnigibson/simulator.py
def initialize_object_on_next_sim_step(self, obj):
    """
    Initializes the object upon the next simulation step

    Args:
        obj (BasePrim): Object to initialize as soon as a new sim step is called
    """
    self._objects_to_initialize.append(obj)

is_paused()

Returns:

Name Type Description
bool

True if the simulator is paused, otherwise False

Source code in omnigibson/simulator.py
def is_paused(self):
    """
    Returns:
        bool: True if the simulator is paused, otherwise False
    """
    return not (self.is_stopped() or self.is_playing())

load_stage(usd_path)

Open the stage specified by USD file at @usd_path

Parameters:

Name Type Description Default
usd_path str

Absolute filepath to USD stage that should be loaded

required
Source code in omnigibson/simulator.py
def load_stage(self, usd_path):
    """
    Open the stage specified by USD file at @usd_path

    Args:
        usd_path (str): Absolute filepath to USD stage that should be loaded
    """
    # Stop the physics if we're playing
    if not self.is_stopped():
        logging.warning("Stopping simulation in order to load stage.")
        self.stop()

    # Store physics dt and rendering dt to reuse later
    # Note that the stage may have been deleted previously; if so, we use the default values
    # of 1/60, 1/60
    try:
        physics_dt = self.get_physics_dt()
    except:
        print("WARNING: Invalid or non-existent physics scene found. Setting physics dt to 1/60.")
        physics_dt = 1/60.
    rendering_dt = self.get_rendering_dt()

    # Clear simulation state
    self._clear_state()

    open_stage(usd_path=usd_path)

    # Re-initialize necessary internal vars
    self._app = omni.kit.app.get_app_interface()
    self._framework = carb.get_framework()
    self._timeline = omni.timeline.get_timeline_interface()
    self._timeline.set_auto_update(True)
    self._dynamic_control = _dynamic_control.acquire_dynamic_control_interface()
    self._cached_rate_limit_enabled = self._settings.get_as_bool("/app/runLoops/main/rateLimitEnabled")
    self._cached_rate_limit_frequency = self._settings.get_as_int("/app/runLoops/main/rateLimitFrequency")
    self._cached_min_frame_rate = self._settings.get_as_int("persistent/simulation/minFrameRate")
    self._loop_runner = omni_loop.acquire_loop_interface()

    self._init_stage(
        physics_dt=physics_dt,
        rendering_dt=rendering_dt,
        stage_units_in_meters=self._initial_stage_units_in_meters,
    )
    self._set_physics_engine_settings()
    self._setup_default_callback_fns()
    self._stage_open_callback = (
        omni.usd.get_context().get_stage_event_stream().create_subscription_to_pop(self._stage_open_callback_fn)
    )
    self._contact_callback = self._physics_context._physx_sim_interface.subscribe_contact_report_events(self._on_contact)

    # Set the viewer camera, and then set its default pose
    self._set_viewer_camera()
    self.viewer_camera.set_position_orientation(
        position=np.array(m.DEFAULT_VIEWER_CAMERA_POS),
        orientation=np.array(m.DEFAULT_VIEWER_CAMERA_QUAT),
    )

paused()

A context scope for making sure the simulator is paused during execution within this scope. Upon leaving the scope, the prior simulator state is restored.

Source code in omnigibson/simulator.py
@contextlib.contextmanager
def paused(self):
    """
    A context scope for making sure the simulator is paused during execution within this scope.
    Upon leaving the scope, the prior simulator state is restored.
    """
    # Infer what state we're currently in, then stop, yield, and then restore the original state
    sim_is_stopped, sim_is_playing = self.is_stopped(), self.is_playing()
    if sim_is_stopped or sim_is_playing:
        og.sim.pause()
    yield
    if sim_is_stopped: og.sim.stop()
    elif sim_is_playing: og.sim.play()

playing()

A context scope for making sure the simulator is playing during execution within this scope. Upon leaving the scope, the prior simulator state is restored.

Source code in omnigibson/simulator.py
@contextlib.contextmanager
def playing(self):
    """
    A context scope for making sure the simulator is playing during execution within this scope.
    Upon leaving the scope, the prior simulator state is restored.
    """
    # Infer what state we're currently in, then stop, yield, and then restore the original state
    sim_is_stopped, sim_is_paused = self.is_stopped(), self.is_paused()
    if sim_is_stopped or sim_is_paused:
        og.sim.play()
    yield
    if sim_is_stopped: og.sim.stop()
    elif sim_is_paused: og.sim.pause()

remove_object(obj)

Remove a non-robot object from the simulator.

Parameters:

Name Type Description Default
obj BaseObject

a non-robot object to load

required
Source code in omnigibson/simulator.py
def remove_object(self, obj):
    """
    Remove a non-robot object from the simulator.

    Args:
        obj (BaseObject): a non-robot object to load
    """
    self._scene.remove_object(obj, simulator=self)
    self.app.update()

reset_scene()

Resets ths scene (if it exists) and its corresponding objects

Source code in omnigibson/simulator.py
def reset_scene(self):
    """
    Resets ths scene (if it exists) and its corresponding objects
    """
    if self.scene is not None and self.scene.initialized:
        self.scene.reset()

restore(json_path)

Restore a simulation environment from @json_path.

Parameters:

Name Type Description Default
json_path str

Full path of JSON file to load, which contains information to recreate a scene.

required
Source code in omnigibson/simulator.py
def restore(self, json_path):
    """
    Restore a simulation environment from @json_path.

    Args:
        json_path (str): Full path of JSON file to load, which contains information
            to recreate a scene.
    """
    if not json_path.endswith(".json"):
        logging.error(f"You have to define the full json_path to load from. Got: {json_path}")
        return

    # Clear the current stage
    self.clear()

    # Load the info from the json
    with open(json_path, "r") as f:
        scene_info = json.load(f)
    init_info = scene_info["init_info"]
    state = scene_info["state"]

    # Override the init info with our json path
    init_info["args"]["scene_file"] = json_path

    # Also make sure we have any additional modifications necessary from the specific scene
    og.REGISTERED_SCENES[init_info["class_name"]].modify_init_info_for_restoring(init_info=init_info)

    # Recreate and import the saved scene
    recreated_scene = create_object_from_init_info(init_info)
    self.import_scene(scene=recreated_scene)

    # Start the simulation and restore the dynamic state of the scene and then pause again
    self.play()
    self.load_state(state, serialized=False)

    logging.info("The saved simulation environment loaded.")

    return

save(json_path)

Saves the current simulation environment to @json_path.

Parameters:

Name Type Description Default
json_path str

Full path of JSON file to save (should end with .json), which contains information to recreate the current scene.

required
Source code in omnigibson/simulator.py
def save(self, json_path):
    """
    Saves the current simulation environment to @json_path.

    Args:
        json_path (str): Full path of JSON file to save (should end with .json), which contains information
            to recreate the current scene.
    """
    # Make sure the sim is not stopped, since we need to grab joint states
    assert not self.is_stopped(), "Simulator cannot be stopped when saving to USD!"

    # Make sure there are no objects in the initialization queue, if not, terminate early and notify user
    # Also run other sanity checks before saving
    if len(self._objects_to_initialize) > 0:
        logging.error("There are still objects to initialize! Please take one additional sim step and then save.")
        return
    if not self.scene:
        logging.warning("Scene has not been loaded. Nothing to save.")
        return
    if not json_path.endswith(".json"):
        logging.error(f"You have to define the full json_path to save the scene to. Got: {json_path}")
        return

    # Update scene info
    self.scene.update_objects_info()

    # Dump saved current state and also scene init info
    scene_info = {
        "state": self.scene.dump_state(serialized=False),
        "init_info": self.scene.get_init_info(),
        "objects_info": self.scene.get_objects_info(),
    }

    # Write this to the json file
    Path(os.path.dirname(json_path)).mkdir(parents=True, exist_ok=True)
    with open(json_path, "w+") as f:
        json.dump(scene_info, f, cls=NumpyEncoder, indent=4)

    logging.info("The current simulation environment saved.")

    return

slowed(dt)

A context scope for making the simulator simulation dt slowed, e.g.: for taking micro-steps for propagating instantaneous kinematics with minimal impact on physics propagation.

NOTE: This will set both the physics dt and rendering dt to the same value during this scope.

Upon leaving the scope, the prior simulator state is restored.

Source code in omnigibson/simulator.py
@contextlib.contextmanager
def slowed(self, dt):
    """
    A context scope for making the simulator simulation dt slowed, e.g.: for taking micro-steps for propagating
    instantaneous kinematics with minimal impact on physics propagation.

    NOTE: This will set both the physics dt and rendering dt to the same value during this scope.

    Upon leaving the scope, the prior simulator state is restored.
    """
    # Set dt, yield, then restore the original dt
    physics_dt, rendering_dt = self.get_physics_dt(), self.get_rendering_dt()
    self.set_simulation_dt(physics_dt=dt, rendering_dt=dt)
    yield
    self.set_simulation_dt(physics_dt=physics_dt, rendering_dt=rendering_dt)

step(render=True, force_playing=False)

Step the simulation at self.render_timestep

Parameters:

Name Type Description Default
render bool

Whether rendering should occur or not

True
force_playing bool

If True, will force physics to propagate (i.e.: set simulation, if paused / stopped, to "play" mode)

False
Source code in omnigibson/simulator.py
def step(self, render=True, force_playing=False):
    """
    Step the simulation at self.render_timestep

    Args:
        render (bool): Whether rendering should occur or not
        force_playing (bool): If True, will force physics to propagate (i.e.: set simulation, if paused / stopped,
            to "play" mode)
    """
    # Possibly force playing
    if force_playing and not self.is_playing():
        self.play()

    if render:
        super().step(render=True)
    else:
        for i in range(self.n_physics_timesteps_per_render):
            super().step(render=False)

    # Additionally run non physics things if we have a valid scene
    if self._scene is not None:
        self._omni_update_step()
        if self.is_playing():
            self._non_physics_step()
            if gm.ENABLE_TRANSITION_RULES:
                self._transition_rule_step()

step_physics()

Step the physics a single step.

Source code in omnigibson/simulator.py
def step_physics(self):
    """
    Step the physics a single step.
    """
    self._physics_context._step(current_time=self.current_time)

stopped()

A context scope for making sure the simulator is stopped during execution within this scope. Upon leaving the scope, the prior simulator state is restored.

Source code in omnigibson/simulator.py
@contextlib.contextmanager
def stopped(self):
    """
    A context scope for making sure the simulator is stopped during execution within this scope.
    Upon leaving the scope, the prior simulator state is restored.
    """
    # Infer what state we're currently in, then stop, yield, and then restore the original state
    sim_is_playing, sim_is_paused = self.is_playing(), self.is_paused()
    if sim_is_playing or sim_is_paused:
        og.sim.stop()
    yield
    if sim_is_playing: og.sim.play()
    elif sim_is_paused: og.sim.pause()