Skip to content

submission_rapper

RAPPERWrapper

Bases: EnvironmentWrapper

Source code in OmniGibson/omnigibson/learning/wrappers/challenge_submissions/submission_rapper.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
class RAPPERWrapper(EnvironmentWrapper):
    def __init__(
        self,
        env: Environment,
        target_sequence: Optional[List] = None,
        arrival_threshold: float = 0.001,
        waypoint_arrival_threshold: float = 0.01,
        task_name: str = None,
        erosion_radius: float = 0.4,  # Erosion radius for map (meters), includes robot size + safety
    ):
        super().__init__(env=env)
        self.task_idx = TASK_NAMES_TO_INDICES[task_name]
        self.base_dir = os.path.dirname(os.path.dirname(__file__))
        self.planner_dir = os.path.join(self.base_dir, "planner")
        list_path = os.path.join(self.planner_dir, f"{self.task_idx:04d}.txt")
        with open(list_path, "r", encoding="utf-8") as f:
            self.lines = [ln.strip() for ln in f if ln.strip() and not ln.lstrip().startswith("#")]
        self.csv_index = 0
        self.csv_path = self._resolve_csv_from_list(self.lines, self.csv_index)

        # Load replace_range.csv
        self.replace_range = {}  # key: (task_idx:int, skill:str, target:str) -> replace_length:int
        self._load_replace_range()

        if target_sequence is not None:
            self._target_sequence: List[dict] = list(target_sequence)
        else:
            self._target_sequence: List[dict] = self._load_planner_sequence()

        if target_sequence is not None:
            self._target_sequence: List[dict] = list(target_sequence)
        else:
            self._target_sequence: List[dict] = self._load_planner_sequence()

        self._cur_target_idx: int = 0
        self._arrival_threshold: float = float(arrival_threshold)
        self.only_replace_this_list = []

        # Erosion parameter for dynamic map
        self.erosion_radius: float = float(erosion_radius)

        # Action-skill replay
        self._action_active_idx: int = -1
        self._action_step_count: int = 0
        self._action_total_steps: int = 0
        self.moveto_max = 500
        self._done = False

        # A* path plan var
        self._planned_path: Optional[List[th.Tensor]] = None
        self._path_idx: int = 0
        self._waypoint_arrival_threshold: float = float(waypoint_arrival_threshold)
        self._final_target_pos_3d: Optional[th.Tensor] = None
        self._final_target_yaw: Optional[float] = None

        self._defer_ticks_after_reset = 0

        env.load_observation_space()
        print("[MyWrapper] Initialized and reloaded observation space.")

    def _resolve_csv_from_list(self, lines, csv_index: int) -> str:
        csv_path = os.path.expanduser(lines[csv_index])
        return csv_path

    def _load_replace_range(self) -> None:
        """
        Load planner_dir/replace_range.csv file and store in self.replace_range.
        CSV columns: task_idx(int), skill(str), target(str), replace_length(int)
        Key format: (task_idx, skill_lower, target_stripped)
        """
        path = os.path.join(self.planner_dir, "replace_range.csv")
        self.replace_range = {}
        if not os.path.exists(path):
            logger.info(f"[MyWrapper] replace_range.csv not found at {path}; using empty replace_range.")
            return
        try:
            with open(path, "r", encoding="utf-8") as f:
                reader = csv.DictReader(f)
                required = {"task_idx", "skill", "target", "replace_length"}
                if not required.issubset(set(h.strip().lower() for h in reader.fieldnames or [])):
                    logger.warning("[MyWrapper] replace_range.csv header missing required columns; ignoring file.")
                    return
                for i, row in enumerate(reader, start=1):
                    try:
                        t_idx = int(str(row.get("task_idx", "")).strip())
                        skill = str(row.get("skill", "")).strip().lower()
                        target = str(row.get("target", "")).strip()
                        repl = int(str(row.get("replace_length", "")).strip())
                        if not skill or not target:
                            continue
                        self.replace_range[(t_idx, skill, target)] = repl
                    except Exception as e:
                        logger.warning(f"[MyWrapper] replace_range.csv line {i} parse error: {e}")
        except Exception as e:
            logger.warning(f"[MyWrapper] Failed to read replace_range.csv: {e}")

    def _load_planner_sequence(self, csv_path: Optional[str] = None) -> List[dict]:
        steps: List[dict] = []
        csv_path = csv_path if csv_path is not None else self.csv_path
        print(f"[MyWrapper] Using CSV: {csv_path}")
        csv_path = os.path.join(self.planner_dir, csv_path)
        if not os.path.exists(csv_path):
            raise FileNotFoundError(csv_path)
        with open(csv_path, newline="") as f:
            reader = csv.DictReader(f)
            for row in reader:
                try:
                    skill = str((row.get("skill") or "")).strip().lower()
                    if not skill:
                        continue
                    target_obj = str((row.get("target_obj") or "")).strip()
                    target_robot_pos = (row.get("robot_pos") or "").strip()
                    target_obj_pos = (row.get("obj_pos") or "").strip()
                    action_path_raw = (row.get("action_path") or "").strip()
                    repeat = (row.get("repeat") or "").strip()
                    info = {}
                    if skill == "moveto":
                        info["target"] = target_obj
                        if target_robot_pos and target_obj_pos:
                            target_robot_pos_cleaned = re.sub(r",\s*\]", "]", target_robot_pos)
                            target_obj_pos_cleaned = re.sub(r",\s*\]", "]", target_obj_pos)
                            target_robot_pos_vec = ast.literal_eval(target_robot_pos_cleaned)
                            target_obj_pos_vec = ast.literal_eval(target_obj_pos_cleaned)
                            info["target_robot_pos_vec"] = [float(x) for x in target_robot_pos_vec[:9]]
                            info["target_obj_pos_vec"] = [float(x) for x in target_obj_pos_vec[:9]]
                            info["repeat"] = int(repeat)
                    elif skill == "action":
                        if not action_path_raw:
                            continue
                        info["action_path"] = action_path_raw
                        action_path = action_path_raw
                        if not os.path.isabs(action_path):
                            action_path = os.path.normpath(os.path.join(os.path.dirname(csv_path), action_path))
                        if os.path.exists(action_path):
                            info["action_path_resolved"] = action_path
                            length = self._npz_action_length(action_path)
                            if isinstance(length, int) and length > 0:
                                info["action_total_steps"] = length
                        info["repeat"] = int(repeat)
                    elif skill == "moveto_pose":
                        info["target"] = target_obj
                        if target_robot_pos:
                            target_robot_pos_cleaned = re.sub(r",\s*\]", "]", target_robot_pos)
                            target_robot_pos_vec = ast.literal_eval(target_robot_pos_cleaned)
                            info["target_robot_pos_vec"] = [float(x) for x in target_robot_pos_vec[:9]]
                            info["repeat"] = int(repeat)
                    if skill == "action":
                        desc = f"action {os.path.basename(info.get('action_path', ''))}"
                    elif skill == "moveto":
                        desc = f"move to {info.get('target', '?')}"
                    elif skill == "moveto_pose":
                        desc = f"move to {info.get('target_robot_pos_vec', '?')}"
                    info["desc"] = desc
                    step = {"skill": skill}
                    step.update(info)
                    steps.append(step)
                except Exception:
                    continue
        return steps

    def _switch_to_next_csv_after_current(
        self, only_replace_this: bool = False, replace_length: Optional[int] = None
    ) -> bool:
        if not self._target_sequence:
            return False
        cur = self._target_sequence[self._cur_target_idx % len(self._target_sequence)]
        cur_skill = str(cur.get("skill", "")).strip().lower()
        cur_target = str(cur.get("target", "")).strip()
        cur_repeat = int(cur.get("repeat", 0))

        next_index = self.csv_index + 1

        if next_index >= len(self.lines):
            logger.warning("[MyWrapper] No more CSVs to switch to.")
            return False

        next_csv_rel = self.lines[next_index]
        try:
            next_steps = self._load_planner_sequence(next_csv_rel)
        except Exception as e:
            logger.warning(f"[MyWrapper] Failed to load next CSV: {e}")
            return False

        match_idx = -1
        for i, st in enumerate(next_steps):
            if (
                str(st.get("skill", "")).strip().lower() == cur_skill
                and str(st.get("target", "")).strip() == cur_target
                and int(st.get("repeat", 0)) == cur_repeat
            ):
                match_idx = i
                break

        if match_idx == -1:
            logger.warning("[MyWrapper] Could not align with next CSV (no match).")
            return False

        # -------- 선택적 부분 교체 (replace_length 우선) --------
        if isinstance(replace_length, int) and replace_length > 0:
            # 현재 실패 스텝부터 replace_length 길이만큼만 교체
            slice_end = min(match_idx + replace_length, len(next_steps))
            new_chunk = next_steps[match_idx:slice_end]
            if not new_chunk:
                logger.warning("[MyWrapper] Replacement slice is empty.")
                return False

            old_tail = self._target_sequence[
                self._cur_target_idx + replace_length :
            ]  # 기존 꼬리는 동일 길이만큼 건너뛰고 이어붙임
            self.csv_index = next_index
            self.csv_path = next_csv_rel
            self._target_sequence = list(new_chunk) + old_tail
            self._cur_target_idx = 0

            # 상태 리셋
            self._planned_path = None
            self._path_idx = 0
            self._final_target_pos_3d = None
            self._final_target_yaw = None
            self._action_step_count = 0
            self._action_total_steps = 0
            self._action_active_idx = -1
            self._done = False
            print(
                f"[MyWrapper] Replaced {len(new_chunk)} step(s) from next CSV starting at the failed step; kept the remaining original tail."
            )
            return True

        if only_replace_this:
            # 현재 스텝을 next_steps의 매칭 스텝으로 '대체'하고,
            # 그 이후의 스텝은 기존 시퀀스의 (현재+1)부터 그대로 이어감
            old_tail = self._target_sequence[self._cur_target_idx + 1 :]
            replaced_current = next_steps[match_idx]
            self.csv_index = next_index
            self.csv_path = next_csv_rel
            self._target_sequence = [replaced_current] + old_tail
            self._cur_target_idx = 0
            # 상태 리셋
            self._planned_path = None
            self._path_idx = 0
            self._final_target_pos_3d = None
            self._final_target_yaw = None
            self._action_step_count = 0
            self._action_total_steps = 0
            self._action_active_idx = -1
            self._done = False
            print(
                "[MyWrapper] Replaced only the current step from next CSV; kept the following steps from the original plan."
            )
            return True

        # -------- 기본 동작: 매칭 지점부터 끝까지 전부 교체 --------
        if match_idx + 1 >= len(next_steps):
            logger.warning("[MyWrapper] Could not align or nothing after match.")
            return False
        self.csv_index = next_index
        self.csv_path = next_csv_rel
        self._target_sequence = next_steps[match_idx:]
        self._cur_target_idx = 0
        # 상태 리셋
        self._planned_path = None
        self._path_idx = 0
        self._final_target_pos_3d = None
        self._final_target_yaw = None
        self._action_step_count = 0
        self._action_total_steps = 0
        self._action_active_idx = -1
        self._done = False
        print(
            f"[MyWrapper] Switched to next CSV (entire): {self.csv_path} (start from {next_steps[match_idx].get('skill')} - {next_steps[match_idx].get('target')})"
        )
        return True

    def _npz_action_length(self, path: str) -> Optional[int]:
        try:
            import numpy as np

            p = os.path.expanduser(path)
            if not os.path.exists(p):
                return None
            arr = np.load(p, allow_pickle=True)
            if hasattr(arr, "files"):
                key = "actions" if "actions" in arr.files else (arr.files[0] if arr.files else None)
                if key is not None:
                    a = arr[key]
                    return int(a.shape[0]) if hasattr(a, "shape") and len(a.shape) > 0 else None
        except Exception:
            return None
        return None

    def step(self, action, n_render_iterations=1):
        obs, reward, terminated, truncated, info = self.env.step(action, n_render_iterations=n_render_iterations)
        if self._done:
            return obs, reward, True, truncated, info
        self._inject_robot_and_target(obs)
        return obs, reward, terminated, truncated, info

    def reset(self):
        self._done = False
        self._cur_target_idx = 0
        self._action_active_idx = -1
        self._action_step_count = 0
        self._action_total_steps = 0

        self.csv_index = 0
        self.csv_path = self._resolve_csv_from_list(self.lines, self.csv_index)
        self._target_sequence = self._load_planner_sequence(self.csv_path)

        self._defer_ticks_after_reset = 1

        self._planned_path = None
        self._path_idx = 0
        self._final_target_pos_3d = None
        self._final_target_yaw = None

        ret = self.env.reset()
        self._inject_robot_and_target(ret[0])
        return ret

    def _inject_robot_and_target(self, obs: dict) -> None:
        for k in (
            "aux::target_pos",
            "aux::target_yaw",
            "aux::action_path",
            "aux::target_robot_pos_vec",
            "aux::target_obj_pos_vec",
        ):
            obs.pop(k, None)
        if getattr(self, "_defer_ticks_after_reset", 0) > 0:
            self._defer_ticks_after_reset -= 1
            return
        robot = self.env.robots[0]
        robot_pos, robot_orn = robot.get_position_orientation()
        if not isinstance(robot_pos, th.Tensor):
            robot_pos = th.tensor(robot_pos, dtype=th.float32)
        obs["aux::robot_pos"] = robot_pos.float()
        eulers = T.quat2euler(robot_orn)
        yaw = eulers[2]
        if isinstance(yaw, th.Tensor):
            yaw = float(yaw.view(-1)[0])
        yaw_t = th.tensor(yaw, dtype=th.float32)
        obs["aux::robot_yaw"] = yaw_t
        current_step = self._target_sequence[self._cur_target_idx % len(self._target_sequence)]
        skill = str(current_step.get("skill", "")).strip().lower()
        info = {k: v for k, v in current_step.items() if k != "skill"}
        obs["aux::current_skill"] = skill
        obs["aux::current_spec"] = info.get("desc", str(current_step))
        if skill == "moveto":
            self._do_moveto_planned(info, obs, relative=True)
        elif skill == "moveto_pose":
            self._do_moveto_planned(info, obs, relative=False)
        elif skill == "action":
            act_path = info.get("action_path_resolved") or info.get("action_path")
            if isinstance(act_path, str) and len(act_path) > 0:
                obs["aux::action_path"] = act_path
            self._do_action(info)
        else:
            pass

    def _resolve_target_object(self, spec: str):
        idx = 1
        base = spec
        if "(" in spec and ")" in spec:
            try:
                prefix, rest = spec.split("(", 1)
                sel = rest.split(")", 1)[0].strip()
                base = prefix.strip()
                if sel.lower() == "another":
                    idx = 2
                else:
                    val = int(sel)
                    idx = max(1, val)
            except Exception:
                base = spec
        matches = []
        seen = set()

        def add_unique(objs):
            for o in objs:
                oid = getattr(o, "name", None)
                if oid is None:
                    oid = id(o)
                if oid in seen:
                    continue
                seen.add(oid)
                matches.append(o)

        all_objs = sorted(self.env.scene.objects, key=lambda o: getattr(o, "name", ""))
        base_lower = base.lower()
        add_unique([o for o in all_objs if getattr(o, "name", "") == base])
        add_unique([o for o in all_objs if base_lower in getattr(o, "name", "").lower()])
        add_unique(
            sorted(list(self.env.scene.object_registry("category", base, [])), key=lambda o: getattr(o, "name", ""))
        )
        add_unique([o for o in all_objs if base_lower in getattr(o, "category", "").lower()])
        if not matches:
            return None
        sel_idx = min(len(matches) - 1, idx - 1)
        return matches[sel_idx]

    def _advance_step(self):
        if self._cur_target_idx + 1 == len(self._target_sequence):
            self._done = True
        self._cur_target_idx = (self._cur_target_idx + 1) % len(self._target_sequence)

        self._planned_path = None
        self._path_idx = 0
        self._final_target_pos_3d = None
        self._final_target_yaw = None
        self._action_step_count = 0
        self._action_total_steps = 0
        self._action_active_idx = -1

        current_step = self._target_sequence[self._cur_target_idx % len(self._target_sequence)]
        skill = str(current_step.get("skill", "")).strip().lower()
        desc = str(current_step.get("desc", "")).strip().lower()
        print(f"---------- Doing {skill}: {desc} ----------")

    def _do_moveto_planned(self, info: dict, obs: dict, relative: bool) -> None:
        """
        A* 경로를 생성하고, Policy가 다음 경유지를 쫓아가도록
        Dynamic map을 사용하여 movable objects를 고려함
        """
        robot_pos = obs.get("aux::robot_pos")
        robot: BaseRobot = self.env.robots[0]
        current_map: BaseMap = self.env.scene.trav_map

        if self._planned_path is None:
            self._action_active_idx = self._cur_target_idx
            self._action_step_count = 0
            self._action_total_steps = self.moveto_max
            if relative:
                try:

                    def _yaw_from_pose9(vec9):
                        cos_y, sin_y = float(vec9[5]), float(vec9[8])
                        return math.atan2(sin_y, cos_y)

                    def _pos_from_pose9(vec9):
                        return float(vec9[0]), float(vec9[1]), float(vec9[2])

                    def _rot2d(a):
                        ca, sa = math.cos(a), math.sin(a)
                        return ca, -sa, sa, ca

                    target_name = info.get("target")
                    r_pose9 = info.get("target_robot_pos_vec", None)
                    o_pose9 = info.get("target_obj_pos_vec", None)
                    rx, ry, rz = _pos_from_pose9(r_pose9)
                    ox, oy, oz = _pos_from_pose9(o_pose9)
                    yaw_r_ref = _yaw_from_pose9(r_pose9)
                    yaw_o_ref = _yaw_from_pose9(o_pose9)
                    drx_ref, dry_ref, drz_ref = rx - ox, ry - oy, rz - oz
                    dyaw_ref = yaw_r_ref - yaw_o_ref
                    target_obj = self._resolve_target_object(target_name)
                    if target_obj is None:
                        print(f"[MyWrapper] Target object '{target_name}' not found!")
                        self._planned_path = []
                        return
                    o_pos_w, o_quat_w = target_obj.get_position_orientation()
                    if not isinstance(o_pos_w, th.Tensor):
                        o_pos_w = th.tensor(o_pos_w, dtype=th.float32)
                    yaw_o_w = float(T.quat2euler(o_quat_w)[2])
                    delta = yaw_o_w - yaw_o_ref
                    ca, nsa, sa, ca2 = _rot2d(delta)
                    dx_w = ca * drx_ref + nsa * dry_ref
                    dy_w = sa * drx_ref + ca2 * dry_ref
                    dz_w = drz_ref
                    target_pos = th.stack(
                        [
                            o_pos_w[0] + th.tensor(dx_w, dtype=th.float32),
                            o_pos_w[1] + th.tensor(dy_w, dtype=th.float32),
                            o_pos_w[2] + th.tensor(dz_w, dtype=th.float32),
                        ]
                    ).float()
                    self._final_target_pos_3d = (
                        target_pos.clone().detach().to(dtype=th.float32)
                        if isinstance(target_pos, th.Tensor)
                        else th.tensor(target_pos, dtype=th.float32)
                    )
                    target_yaw = yaw_o_w + dyaw_ref
                    self._final_target_yaw = th.tensor(float(target_yaw), dtype=th.float32)
                except Exception as e:
                    print(f"[MyWrapper] Failed to calculate relative target: {e}")
                    self._planned_path = []
                    return
            else:
                r_pose9 = info.get("target_robot_pos_vec")
                self._final_target_pos_3d = th.tensor(r_pose9[:3], dtype=th.float32)
                cy, sy = float(r_pose9[3 + 2]), float(r_pose9[6 + 2])
                target_yaw = math.atan2(sy, cy)
                self._final_target_yaw = th.tensor(float(target_yaw), dtype=th.float32)

            # ========== Dynamic Map 생성 + A* path plan ==========
            if self._final_target_pos_3d is not None and self._planned_path is None:
                print(f"[MyWrapper] Planning A* path with dynamic map...")
                print(f"            Start: {robot_pos[:2].tolist()}")
                print(f"            Goal:  {self._final_target_pos_3d[:2].tolist()}")

                # 1. Static map 가져오기
                static_map = th.clone(current_map.floor_map[0])

                # 2. Dynamic map 생성 (movable objects 반영)
                dynamic_map = self._update_dynamic_map(static_map, robot)
                print(f"[MyWrapper] Dynamic map updated with movable objects")

                # Save dynamic map visualization immediately after creation
                try:
                    step_desc = info.get("target", "unknown")
                    filename = f"dynamic_map_step_{self._cur_target_idx:03d}_{step_desc}.png"
                    self._save_map_visualization(
                        dynamic_map, robot_pos, self._final_target_pos_3d, None, filename
                    )  # No path yet
                except Exception as e:
                    print(f"[MyWrapper] Map visualization failed: {e}")

                # 3. Erosion 적용 (로봇 크기 고려) - use custom erosion radius
                eroded_map = self._erode_map_custom(dynamic_map, robot)
                try:
                    step_desc = info.get("target", "unknown")
                    filename = f"eroded_map_step_{self._cur_target_idx:03d}_{step_desc}.png"
                    self._save_map_visualization(
                        eroded_map, robot_pos, self._final_target_pos_3d, None, filename
                    )  # No path yet
                except Exception as e:
                    print(f"[MyWrapper] Map visualization failed: {e}")

                # 4. A* 실행
                from omnigibson.utils.motion_planning_utils import astar

                source_map = tuple(current_map.world_to_map(robot_pos[:2]).tolist())
                target_map = tuple(current_map.world_to_map(self._final_target_pos_3d[:2]).tolist())

                path_map = astar(eroded_map, source_map, target_map)

                if path_map is None or len(path_map) == 0:
                    print(f"[MyWrapper] A* failed! Trying to switch to next CSV aligned with current progress...")

                    # --- replace_range 조회: (task_idx, skill, target) -> replace_length ---
                    try:
                        cur_step = self._target_sequence[self._cur_target_idx % len(self._target_sequence)]
                        cur_skill_lookup = str(cur_step.get("skill", "")).strip().lower()
                    except Exception:
                        cur_skill_lookup = "moveto" if relative else "moveto_pose"
                    cur_target_lookup = str(info.get("target", "")).strip()
                    repl_len = self.replace_range.get((self.task_idx, cur_skill_lookup, cur_target_lookup), None)

                    # only_replace_this_list에 해당하고 repl_len이 없으면 현재 스텝만 교체
                    if self.task_idx in getattr(self, "only_replace_this_list", []) and repl_len is None:
                        print("[MyWrapper] Only replace this action (policy list hit, no replace_length override).")
                        possible = self._switch_to_next_csv_after_current(only_replace_this=True)
                    else:
                        # repl_len이 있으면 부분 교체, 없으면 전체 교체
                        if isinstance(repl_len, int) and repl_len > 0:
                            print(
                                f"[MyWrapper] Using replace_length={repl_len} from replace_range for ({self.task_idx}, {cur_skill_lookup}, {cur_target_lookup})."
                            )
                        possible = self._switch_to_next_csv_after_current(
                            only_replace_this=False, replace_length=repl_len
                        )
                    if possible:
                        print(f"[MyWrapper] Switched to next CSV. Will re-inject on next step.")
                        new_step = self._target_sequence[self._cur_target_idx % len(self._target_sequence)]
                        new_skill = str(new_step.get("skill", "")).strip().lower()
                        new_info = {k: v for k, v in new_step.items() if k != "skill"}
                        # 이전 타깃 흔적 정리
                        obs.pop("aux::target_pos", None)
                        obs.pop("aux::target_yaw", None)
                        # 새 스텝을 같은 틱에 주입
                        if new_skill == "moveto":
                            self._do_moveto_planned(new_info, obs, relative=True)
                        elif new_skill == "moveto_pose":
                            self._do_moveto_planned(new_info, obs, relative=False)
                        elif new_skill == "action":
                            self._do_action(new_info)
                        return  # 같은 틱 내 재주입 완료

                    print(f"[MyWrapper] A* failed! Using straight line fallback")
                    distance = th.norm(self._final_target_pos_3d[:2] - robot_pos[:2])
                    num_waypoints = max(30, int(distance / 0.05))  # 5cm 간격
                    current_z = robot_pos[2]
                    self._planned_path = []
                    for i in range(num_waypoints + 1):
                        t = i / num_waypoints
                        x = float(robot_pos[0] * (1 - t) + self._final_target_pos_3d[0] * t)
                        y = float(robot_pos[1] * (1 - t) + self._final_target_pos_3d[1] * t)
                        self._planned_path.append(th.tensor([x, y, current_z], dtype=th.float32))
                    print(f"  Created {len(self._planned_path)} interpolated waypoints")
                else:
                    # A* success - convert map coordinates to world coordinates
                    path_world = current_map.map_to_world(path_map)
                    geo_dist = th.sum(th.norm(path_world[1:] - path_world[:-1], dim=1))

                    current_z = robot_pos[2]
                    self._planned_path = [
                        th.tensor([float(p[0]), float(p[1]), current_z], dtype=th.float32) for p in path_world
                    ]
                    self._planned_path[-1] = self._final_target_pos_3d.clone()
                    print(f"            A* success! {len(self._planned_path)} waypoints, {geo_dist:.2f}m")

                self._path_idx = 0

        # step count
        self._action_step_count += 1
        # follow path
        if not self._planned_path or len(self._planned_path) == 0:
            obs["aux::target_pos"] = self._final_target_pos_3d.float()
            obs["aux::target_yaw"] = th.tensor(float(self._final_target_yaw), dtype=th.float32)
        elif self._path_idx == len(self._planned_path) - 1:
            obs["aux::target_pos"] = self._final_target_pos_3d.float()
            obs["aux::target_yaw"] = th.tensor(float(self._final_target_yaw), dtype=th.float32)
        else:
            waypoint_to_chase = self._planned_path[self._path_idx]
            obs["aux::target_pos"] = waypoint_to_chase.float()
            obs["aux::target_yaw"] = th.tensor(float(self._final_target_yaw), dtype=th.float32)
            # waypoint arrive
            dx = float(waypoint_to_chase[0] - robot_pos[0])
            dy = float(waypoint_to_chase[1] - robot_pos[1])
            dist_to_waypoint = math.sqrt(dx * dx + dy * dy)
            if dist_to_waypoint < self._waypoint_arrival_threshold and self._path_idx < len(self._planned_path) - 1:
                self._action_step_count = 0
                self._path_idx += 1
                print(f"[MyWrapper] Waypoint {self._path_idx}/{len(self._planned_path)}")

        # target arrive
        if self._final_target_pos_3d is None:
            if self._action_step_count >= 100:
                print(f"[MyWrapper] Moveto failed (no target)")
                self._advance_step()
            return

        dx = float(self._final_target_pos_3d[0] - robot_pos[0])
        dy = float(self._final_target_pos_3d[1] - robot_pos[1])
        final_dist = math.sqrt(dx * dx + dy * dy)

        cur_yaw = float(obs.get("aux::robot_yaw", th.tensor(0.0)))
        des_yaw = float(self._final_target_yaw)
        yaw_err = des_yaw - cur_yaw
        while yaw_err > math.pi:
            yaw_err -= 2 * math.pi
        while yaw_err < -math.pi:
            yaw_err += 2 * math.pi
        yaw_ok = abs(yaw_err) <= 0.005  # 0.10 rad ~ 5.7 deg

        if final_dist <= self._arrival_threshold and yaw_ok:
            print(f"[MyWrapper] ✓ Moveto SUCCESS ({final_dist:.3f}m, {yaw_err:.3f}, {self._action_step_count} steps)")
            self._advance_step()
        elif self._action_step_count >= self._action_total_steps:
            print(f"[MyWrapper] ⏱ Moveto TIMEOUT ({final_dist:.3f}m, {yaw_err:.3f})")
            self._advance_step()

    def _do_action(self, info: dict) -> None:
        """Gate an 'action' skill by replay length from an npz action file."""
        action_path = info.get("action_path_resolved") or info.get("action_path")
        # Initialize replay counters on step entry
        if self._action_active_idx != self._cur_target_idx:
            total = info.get("action_total_steps")
            if not isinstance(total, int) or total <= 0:
                total = self._npz_action_length(action_path) or 1
                info["action_total_steps"] = total
            self._action_active_idx = self._cur_target_idx
            self._action_step_count = 0
            self._action_total_steps = int(total)
        # Advance replay counter
        self._action_step_count += 1
        if self._action_step_count >= self._action_total_steps:
            print(f"[MyWrapper] Action(replay) finished after {self._action_step_count} steps !!\n")
            self._action_step_count = 0
            self._action_total_steps = 0
            self._action_active_idx = -1
            self._advance_step()

    def _update_dynamic_map(self, static_map: th.Tensor, robot: BaseRobot) -> th.Tensor:
        """
        Update dynamic traversability map with all objects in real-time
        Overlays current scene objects on top of static map

        Args:
            static_map: Original static traversability map
            robot: Robot object (excluded from obstacles)

        Returns:
            dynamic_map: Map with all objects reflected
        """
        from omnigibson.utils.constants import GROUND_CATEGORIES, STRUCTURE_CATEGORIES

        dynamic_map = static_map.clone()
        current_map: BaseMap = self.env.scene.trav_map

        # Get robot name to exclude it from obstacles
        robot_name = robot.name if hasattr(robot, "name") else None

        # Combine categories to exclude (already in static map)
        exclude_categories = GROUND_CATEGORIES | STRUCTURE_CATEGORIES

        # Additional outdoor/structure objects to exclude
        additional_exclude = {
            "paver",  # Paving stones (covers large areas)
            "rail_fence",  # Fencing (already in static map)
            "bush",  # Bushes (landscaping, already in map)
            "tree",  # Trees (already in static map)
            "downlight",  # Ceiling lights
            "track_light",  # Track lighting
            "room_light",  # Ceiling lights
            "garden_light",  # Garden lights (pole-mounted)
            "wall_mounted_light",  # Wall lights
            "electric_switch",  # Light switches
            "wall_socket",  # Electrical outlets
            "motion_sensor",  # Sensors
            "mirror",  # Wall mirrors
            "decorative_sign",  # Wall signs
            "hook",  # Wall hooks
        }

        exclude_categories = exclude_categories | additional_exclude

        # Iterate through all objects
        for obj in self.env.scene.objects:
            try:
                # Skip the robot itself to avoid blocking start position
                obj_name = getattr(obj, "name", None)
                if obj_name is not None and obj_name == robot_name:
                    continue

                if obj is robot:
                    continue

                # Skip structure and ground categories
                obj_category = getattr(obj, "category", None)
                if obj_category in exclude_categories:
                    continue

                # Get AABB (axis-aligned bounding box)
                try:
                    bbox_center = obj.aabb_center
                    bbox_extent = obj.aabb_extent
                    if bbox_center is None or bbox_extent is None:
                        continue

                    # Shrink AABB slightly to reduce over-blocking (95% of original size)
                    shrink_factor = 0.95
                    shrunk_extent = bbox_extent * shrink_factor

                except Exception:
                    continue

                # Convert world coordinates to map coordinates
                lower_corner = bbox_center - shrunk_extent / 2.0
                upper_corner = bbox_center + shrunk_extent / 2.0

                min_xy_world = lower_corner[:2]
                max_xy_world = upper_corner[:2]

                min_xy_map = current_map.world_to_map(min_xy_world)
                max_xy_map = current_map.world_to_map(max_xy_world)

                # Mark obstacle region in map
                map_h, map_w = dynamic_map.shape
                min_r = max(0, int(min_xy_map[0]))
                max_r = min(map_h, int(max_xy_map[0]))
                min_c = max(0, int(min_xy_map[1]))
                max_c = min(map_w, int(max_xy_map[1]))

                # Ensure minimum 1 pixel is marked (for very small objects)
                if max_r <= min_r:
                    max_r = min_r + 1
                if max_c <= min_c:
                    max_c = min_c + 1

                # Set as obstacle (0 = obstacle)
                dynamic_map[min_r:max_r, min_c:max_c] = 0

            except Exception:
                continue

        return dynamic_map

    def _erode_map_custom(self, trav_map: th.Tensor, robot: BaseRobot) -> th.Tensor:
        """
        Erode traversability map with custom erosion radius

        Args:
            trav_map: Traversability map to erode
            robot: Robot object (unused, kept for signature compatibility)

        Returns:
            Eroded traversability map
        """
        import cv2
        import math

        current_map: BaseMap = self.env.scene.trav_map

        # Use only the configured erosion radius (no robot size calculation)
        erosion_radius = self.erosion_radius

        # Convert to pixels
        radius_pixels = int(math.ceil(erosion_radius / current_map.map_resolution))

        print(f"[MyWrapper] Erosion: radius={erosion_radius:.3f}m ({radius_pixels}px)")

        # Apply erosion
        if radius_pixels > 0:
            kernel = th.ones((radius_pixels, radius_pixels))
            eroded_map = th.tensor(cv2.erode(trav_map.cpu().numpy(), kernel.cpu().numpy()))
        else:
            eroded_map = trav_map.clone()

        return eroded_map

    def _save_map_visualization(
        self,
        map_tensor: th.Tensor,
        robot_pos: th.Tensor,
        target_pos: th.Tensor,
        path: Optional[List[th.Tensor]],
        filename: str = "debug_map.png",
    ) -> None:
        """
        Save map as PNG for debugging

        Args:
            map_tensor: Traversability map (2D tensor)
            robot_pos: Robot position (world coordinates)
            target_pos: Target position (world coordinates)
            path: A* path (world coordinates list)
            filename: Filename to save
        """
        try:
            import cv2
            import numpy as np

            current_map: BaseMap = self.env.scene.trav_map

            # Convert tensor to numpy array
            map_np = map_tensor.cpu().numpy().astype(np.uint8)

            # Convert grayscale to BGR (for color visualization)
            map_bgr = cv2.cvtColor(map_np, cv2.COLOR_GRAY2BGR)

            # Mark robot position (green circle scaled to robot size)
            robot_map = current_map.world_to_map(robot_pos[:2])
            robot_r, robot_c = int(robot_map[0]), int(robot_map[1])

            # Calculate robot radius in pixels (approximate robot base radius ~0.2m)
            robot_radius_world = 0.2  # meters
            robot_radius_pixels = int(robot_radius_world / current_map.map_resolution)
            cv2.circle(map_bgr, (robot_c, robot_r), robot_radius_pixels, (0, 255, 0), 2)  # Green outline

            # Mark target position (red)
            target_map = current_map.world_to_map(target_pos[:2])
            target_r, target_c = int(target_map[0]), int(target_map[1])
            cv2.circle(map_bgr, (target_c, target_r), 5, (0, 0, 255), -1)  # Red dot

            # Mark path (blue)
            if path is not None and len(path) > 0:
                for i, wp in enumerate(path):
                    wp_map = current_map.world_to_map(wp[:2])
                    wp_r, wp_c = int(wp_map[0]), int(wp_map[1])
                    cv2.circle(map_bgr, (wp_c, wp_r), 3, (255, 0, 0), -1)  # Blue

                    # Draw path line
                    if i > 0:
                        prev_wp = path[i - 1]
                        prev_map = current_map.world_to_map(prev_wp[:2])
                        prev_r, prev_c = int(prev_map[0]), int(prev_map[1])
                        cv2.line(map_bgr, (prev_c, prev_r), (wp_c, wp_r), (255, 100, 0), 2)  # Blue line

            # Save file to map_debug directory
            map_debug_dir = os.path.join(self.base_dir, "map_debug")
            os.makedirs(map_debug_dir, exist_ok=True)
            save_path = os.path.join(map_debug_dir, filename)
            cv2.imwrite(save_path, map_bgr)
            print(f"[MyWrapper] Map saved to {save_path}")

        except Exception as e:
            print(f"[MyWrapper] Failed to save map visualization: {e}")