Skip to content

asset_utils

change_data_path()

Changes the data paths for this repo

Source code in omnigibson/utils/asset_utils.py
def change_data_path():
    """
    Changes the data paths for this repo
    """
    with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "global_config.yaml")) as f:
        global_config = yaml.load(f, Loader=yaml.FullLoader)
    print("Current dataset path:")
    for k, v in global_config.items():
        print("{}: {}".format(k, v))
    for k, v in global_config.items():
        new_path = input("Change {} from {} to: ".format(k, v))
        global_config[k] = new_path

    print("New dataset path:")
    for k, v in global_config.items():
        print("{}: {}".format(k, v))
    response = input("Save? [y/n]")
    if response == "y":
        with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "global_config.yaml"), "w") as f:
            yaml.dump(global_config, f)

download_assets()

Download OmniGibson assets

Source code in omnigibson/utils/asset_utils.py
def download_assets():
    """
    Download OmniGibson assets
    """
    if os.path.exists(gm.ASSET_PATH):
        print("Assets already downloaded.")
    else:
        with tempfile.TemporaryDirectory() as td:
            tmp_file = os.path.join(td, "og_assets.tar.gz")
            os.makedirs(gm.ASSET_PATH, exist_ok=True)
            path = "https://storage.googleapis.com/gibson_scenes/og_assets_1_0_0.tar.gz"
            log.info(f"Downloading and decompressing demo OmniGibson assets from {path}")
            assert urlretrieve(path, tmp_file, show_progress), "Assets download failed."
            assert subprocess.call(["tar", "-zxf", tmp_file, "--strip-components=1", "--directory", gm.ASSET_PATH]) == 0, "Assets extraction failed."

download_demo_data()

Download OmniGibson demo dataset

Source code in omnigibson/utils/asset_utils.py
def download_demo_data():
    """
    Download OmniGibson demo dataset
    """
    # Print user agreement
    if os.path.exists(gm.KEY_PATH):
        print("OmniGibson dataset encryption key already installed.")
    else:
        print("\n")
        print_user_agreement()
        while (
            input(
                "Do you agree to the above terms for using OmniGibson dataset? [y/n]"
            )
            != "y"
        ):
            print("You need to agree to the terms for using OmniGibson dataset.")

        download_key()

    if os.path.exists(gm.DATASET_PATH):
        print("OmniGibson dataset already installed.")
    else:
        tmp_file = os.path.join(tempfile.gettempdir(), "og_dataset.tar.gz")
        os.makedirs(gm.DATASET_PATH, exist_ok=True)
        path = "https://storage.googleapis.com/gibson_scenes/og_dataset_demo_1_0_0.tar.gz"
        log.info(f"Downloading and decompressing demo OmniGibson dataset from {path}")
        assert urlretrieve(path, tmp_file, show_progress), "Dataset download failed."
        assert subprocess.call(["tar", "-zxf", tmp_file, "--strip-components=1", "--directory", gm.DATASET_PATH]) == 0, "Dataset extraction failed."

download_og_dataset()

Download OmniGibson dataset

Source code in omnigibson/utils/asset_utils.py
def download_og_dataset():
    """
    Download OmniGibson dataset
    """
    # Print user agreement
    if os.path.exists(gm.KEY_PATH):
        print("OmniGibson dataset encryption key already installed.")
    else:
        print("\n")
        print_user_agreement()
        while (
            input(
                "Do you agree to the above terms for using OmniGibson dataset? [y/n]"
            )
            != "y"
        ):
            print("You need to agree to the terms for using OmniGibson dataset.")

        download_key()

    if os.path.exists(gm.DATASET_PATH):
        print("OmniGibson dataset already installed.")
    else:
        tmp_file = os.path.join(tempfile.gettempdir(), "og_dataset.tar.gz")
        os.makedirs(gm.DATASET_PATH, exist_ok=True)
        path = "https://storage.googleapis.com/gibson_scenes/og_dataset_1_0_0.tar.gz"
        log.info(f"Downloading and decompressing OmniGibson dataset from {path}")
        assert urlretrieve(path, tmp_file, show_progress), "Dataset download failed."
        assert subprocess.call(["tar", "-zxf", tmp_file, "--strip-components=1", "--directory", gm.DATASET_PATH]) == 0, "Dataset extraction failed."

get_all_object_categories()

Get OmniGibson all object categories

Returns:

Name Type Description
list

all object categories

Source code in omnigibson/utils/asset_utils.py
def get_all_object_categories():
    """
    Get OmniGibson all object categories

    Returns:
        list: all object categories
    """
    og_dataset_path = gm.DATASET_PATH
    og_categories_path = os.path.join(og_dataset_path, "objects")

    categories =[f for f in os.listdir(og_categories_path) if not is_dot_file(f)]
    return sorted(categories)

get_all_object_category_models(category)

Get all object models from @category

Parameters:

Name Type Description Default
category str

Object category name

required

Returns:

Type Description

list of str: all object models belonging to @category

Source code in omnigibson/utils/asset_utils.py
def get_all_object_category_models(category):
    """
    Get all object models from @category

    Args:
        category (str): Object category name

    Returns:
        list of str: all object models belonging to @category
    """
    og_dataset_path = gm.DATASET_PATH
    og_categories_path = os.path.join(og_dataset_path, "objects", category)
    return sorted(os.listdir(og_categories_path)) if os.path.exists(og_categories_path) else []

get_all_object_category_models_with_abilities(category, abilities)

Get all object models from @category whose assets are properly annotated with necessary requirements to support abilities @abilities

Parameters:

Name Type Description Default
category str

Object category name

required
abilities dict

Dictionary mapping requested abilities to keyword arguments to pass to the corresponding object state constructors. The abilities' required annotations will be guaranteed for the returned models

required

Returns:

Type Description

list of str: all object models belonging to @category which are properly annotated with necessary requirements to support the requested list of @abilities

Source code in omnigibson/utils/asset_utils.py
def get_all_object_category_models_with_abilities(category, abilities):
    """
    Get all object models from @category whose assets are properly annotated with necessary requirements to support
    abilities @abilities

    Args:
        category (str): Object category name
        abilities (dict): Dictionary mapping requested abilities to keyword arguments to pass to the corresponding
            object state constructors. The abilities' required annotations will be guaranteed for the returned
            models

    Returns:
        list of str: all object models belonging to @category which are properly annotated with necessary requirements
            to support the requested list of @abilities
    """
    # Avoid circular imports
    from omnigibson.objects.dataset_object import DatasetObject
    from omnigibson.object_states.factory import get_requirements_for_ability, get_states_for_ability

    # Get all valid models
    all_models = get_all_object_category_models(category=category)

    # Generate all object states required per object given the requested set of abilities
    abilities_info = {ability: [(state_type, params) for state_type in get_states_for_ability(ability)]
                              for ability, params in abilities.items()}

    # Get mapping for class init kwargs
    state_init_default_kwargs = dict()

    for ability, state_types_and_params in abilities_info.items():
        for state_type, _ in state_types_and_params:
            # Add each state's dependencies, too. Note that only required dependencies are added.
            for dependency in state_type.get_dependencies():
                if all(other_state != dependency for other_state, _ in state_types_and_params):
                    state_types_and_params.append((dependency, dict()))

        for state_type, _ in state_types_and_params:
            default_kwargs = inspect.signature(state_type.__init__).parameters
            state_init_default_kwargs[state_type] = \
                {kwarg: val.default for kwarg, val in default_kwargs.items()
                 if kwarg != "self" and val.default != inspect._empty}

    # Iterate over all models and sanity check each one, making sure they satisfy all the requested @abilities
    valid_models = []

    def supports_abilities(info, obj_prim):
        for ability, states_and_params in info.items():
            # Check ability requirements
            for requirement in get_requirements_for_ability(ability):
                if not requirement.is_compatible_asset(prim=obj_prim)[0]:
                    return False

            # Check all link states
            for state_type, params in states_and_params:
                kwargs = deepcopy(state_init_default_kwargs[state_type])
                kwargs.update(params)
                if not state_type.is_compatible_asset(prim=obj_prim, **kwargs)[0]:
                    return False
        return True

    for model in all_models:
        usd_path = DatasetObject.get_usd_path(category=category, model=model)
        usd_path = usd_path.replace(".usd", ".encrypted.usd")
        with decrypted(usd_path) as fpath:
            stage = lazy.pxr.Usd.Stage.Open(fpath)
            prim = stage.GetDefaultPrim()
            if supports_abilities(abilities_info, prim):
                valid_models.append(model)

    return valid_models

get_all_object_models()

Get OmniGibson all object models

Returns:

Name Type Description
list

all object model paths

Source code in omnigibson/utils/asset_utils.py
def get_all_object_models():
    """
    Get OmniGibson all object models

    Returns:
        list: all object model paths
    """
    og_dataset_path = gm.DATASET_PATH
    og_categories_path = os.path.join(og_dataset_path, "objects")

    categories = os.listdir(og_categories_path)
    categories = [item for item in categories if os.path.isdir(os.path.join(og_categories_path, item))]
    models = []
    for category in categories:
        category_models = os.listdir(os.path.join(og_categories_path, category))
        category_models = [
            item for item in category_models if os.path.isdir(os.path.join(og_categories_path, category, item))
        ]
        models.extend([os.path.join(og_categories_path, category, item) for item in category_models])
    return sorted(models)

get_all_system_categories()

Get OmniGibson all system categories

Returns:

Name Type Description
list

all system categories

Source code in omnigibson/utils/asset_utils.py
def get_all_system_categories():
    """
    Get OmniGibson all system categories

    Returns:
        list: all system categories
    """
    og_dataset_path = gm.DATASET_PATH
    og_categories_path = os.path.join(og_dataset_path, "systems")

    categories =[f for f in os.listdir(og_categories_path) if not is_dot_file(f)]
    return sorted(categories)

Get attachment metalinks for an object model

Parameters:

Name Type Description Default
category str

Object category name

required
model str

Object model name

required

Returns:

Type Description

list of str: all attachment metalinks for the object model

Source code in omnigibson/utils/asset_utils.py
def get_attachment_metalinks(category, model):
    """
    Get attachment metalinks for an object model

    Args:
        category (str): Object category name
        model (str): Object model name

    Returns:
        list of str: all attachment metalinks for the object model
    """
    # Avoid circular imports
    from omnigibson.objects.dataset_object import DatasetObject
    from omnigibson.object_states import AttachedTo

    usd_path = DatasetObject.get_usd_path(category=category, model=model)
    usd_path = usd_path.replace(".usd", ".encrypted.usd")
    with decrypted(usd_path) as fpath:
        stage = lazy.pxr.Usd.Stage.Open(fpath)
        prim = stage.GetDefaultPrim()
        attachment_metalinks = []
        for child in prim.GetChildren():
            if child.GetTypeName() == "Xform":
                if AttachedTo.metalink_prefix in child.GetName():
                    attachment_metalinks.append(child.GetName())
        return attachment_metalinks

get_available_g_scenes()

Returns:

Name Type Description
list

available Gibson scenes

Source code in omnigibson/utils/asset_utils.py
def get_available_g_scenes():
    """
    Returns:
        list: available Gibson scenes
    """
    data_path = og.g_dataset_path
    available_g_scenes = sorted([f for f in os.listdir(data_path) if not is_dot_file(f)])
    return available_g_scenes

get_available_og_scenes()

OmniGibson interactive scenes

Returns:

Name Type Description
list

Available OmniGibson interactive scenes

Source code in omnigibson/utils/asset_utils.py
def get_available_og_scenes():
    """
    OmniGibson interactive scenes

    Returns:
        list: Available OmniGibson interactive scenes
    """
    og_dataset_path = gm.DATASET_PATH
    og_scenes_path = os.path.join(og_dataset_path, "scenes")
    available_og_scenes = sorted(
        [f for f in os.listdir(og_scenes_path) if (not is_dot_file(f) and f != "background")]
    )
    return available_og_scenes

get_og_assets_version()

Returns:

Name Type Description
str

OmniGibson asset version

Source code in omnigibson/utils/asset_utils.py
def get_og_assets_version():
    """
    Returns:
        str: OmniGibson asset version
    """
    process = subprocess.Popen(
        ["git", "-C", gm.DATASET_PATH, "rev-parse", "HEAD"], shell=False, stdout=subprocess.PIPE
    )
    git_head_hash = str(process.communicate()[0].strip())
    return "{}".format(git_head_hash)

get_og_avg_category_specs()

Load average object specs (dimension and mass) for objects

Returns:

Name Type Description
dict

Average category specifications for all object categories

Source code in omnigibson/utils/asset_utils.py
def get_og_avg_category_specs():
    """
    Load average object specs (dimension and mass) for objects

    Returns:
        dict: Average category specifications for all object categories
    """
    avg_obj_dim_file = os.path.join(gm.DATASET_PATH, "metadata", "avg_category_specs.json")
    if os.path.exists(avg_obj_dim_file):
        with open(avg_obj_dim_file) as f:
            return json.load(f)
    else:
        log.warning(
            "Requested average specs of the object categories in the OmniGibson Dataset of objects, but the "
            "file cannot be found. Did you download the dataset? Returning an empty dictionary"
        )
        return dict()

get_og_category_ids()

Get OmniGibson object categories

Returns:

Name Type Description
str

file path to the scene name

Source code in omnigibson/utils/asset_utils.py
def get_og_category_ids():
    """
    Get OmniGibson object categories

    Returns:
        str: file path to the scene name
    """
    og_dataset_path = gm.DATASET_PATH
    og_categories_files = os.path.join(og_dataset_path, "metadata", "categories.txt")
    name_to_id = {}
    with open(og_categories_files, "r") as fp:
        for i, l in enumerate(fp.readlines()):
            name_to_id[l.rstrip()] = i
    return defaultdict(lambda: 255, name_to_id)

get_og_category_path(category_name)

Get OmniGibson object category path

Parameters:

Name Type Description Default
category_name str

object category

required

Returns:

Name Type Description
str

file path to the object category

Source code in omnigibson/utils/asset_utils.py
def get_og_category_path(category_name):
    """
    Get OmniGibson object category path

    Args:
        category_name (str): object category

    Returns:
        str: file path to the object category
    """
    og_dataset_path = gm.DATASET_PATH
    og_categories_path = os.path.join(og_dataset_path, "objects")
    assert category_name in os.listdir(og_categories_path), "Category {} does not exist".format(category_name)
    return os.path.join(og_categories_path, category_name)

get_og_model_path(category_name, model_name)

Get OmniGibson object model path

Parameters:

Name Type Description Default
category_name str

object category

required
model_name str

object model

required

Returns:

Name Type Description
str

file path to the object model

Source code in omnigibson/utils/asset_utils.py
def get_og_model_path(category_name, model_name):
    """
    Get OmniGibson object model path

    Args:
        category_name (str): object category
        model_name (str): object model

    Returns:
        str: file path to the object model
    """
    og_category_path = get_og_category_path(category_name)
    assert model_name in os.listdir(og_category_path), "Model {} from category {} does not exist".format(
        model_name, category_name
    )
    return os.path.join(og_category_path, model_name)

get_og_scene_path(scene_name)

Get OmniGibson scene path

Parameters:

Name Type Description Default
scene_name str

scene name, e.g., "Rs_int"

required

Returns:

Name Type Description
str

file path to the scene name

Source code in omnigibson/utils/asset_utils.py
def get_og_scene_path(scene_name):
    """
    Get OmniGibson scene path

    Args:
        scene_name (str): scene name, e.g., "Rs_int"

    Returns:
        str: file path to the scene name
    """
    og_dataset_path = gm.DATASET_PATH
    og_scenes_path = os.path.join(og_dataset_path, "scenes")
    log.info("Scene name: {}".format(scene_name))
    assert scene_name in os.listdir(og_scenes_path), "Scene {} does not exist".format(scene_name)
    return os.path.join(og_scenes_path, scene_name)

get_scene_path(scene_id)

Parameters:

Name Type Description Default
scene_id str

scene id, e.g., "Rs_int"

required

Returns:

Name Type Description
str

scene path for this scene_id

Source code in omnigibson/utils/asset_utils.py
def get_scene_path(scene_id):
    """
    Args:
        scene_id (str): scene id, e.g., "Rs_int"

    Returns:
        str: scene path for this scene_id
    """
    data_path = og.g_dataset_path
    assert scene_id in os.listdir(data_path), "Scene {} does not exist".format(scene_id)
    return os.path.join(data_path, scene_id)

get_texture_file(mesh_file)

Get texture file

Parameters:

Name Type Description Default
mesh_file str

path to mesh obj file

required

Returns:

Name Type Description
str

texture file path

Source code in omnigibson/utils/asset_utils.py
def get_texture_file(mesh_file):
    """
    Get texture file

    Args:
        mesh_file (str): path to mesh obj file

    Returns:
        str: texture file path
    """
    model_dir = os.path.dirname(mesh_file)
    with open(mesh_file, "r") as f:
        lines = [line.strip() for line in f.readlines() if "mtllib" in line]
        if len(lines) == 0:
            return
        mtl_file = lines[0].split()[1]
        mtl_file = os.path.join(model_dir, mtl_file)

    with open(mtl_file, "r") as f:
        lines = [line.strip() for line in f.readlines() if "map_Kd" in line]
        if len(lines) == 0:
            return
        texture_file = lines[0].split()[1]
        texture_file = os.path.join(model_dir, texture_file)

    return texture_file

is_dot_file(p)

Check if a filename starts with a dot. Note that while this does not actually correspond to checking for hidden files on Windows, the files we want to ignore will still start with a dot and thus this works.

Returns:

Name Type Description
bool

true if a folder is hidden in the OS

Source code in omnigibson/utils/asset_utils.py
def is_dot_file(p):
    """
    Check if a filename starts with a dot.
    Note that while this does not actually correspond to checking for hidden files on Windows, the
    files we want to ignore will still start with a dot and thus this works.

    Returns:
        bool: true if a folder is hidden in the OS
    """
    return p.startswith(".")