|
| 1 | +# Copyright (c) 2025, The Nav-Suite Project Developers (https://github.com/leggedrobotics/nav-suite/blob/main/CONTRIBUTORS.md). |
| 2 | +# All rights reserved. |
| 3 | +# |
| 4 | +# SPDX-License-Identifier: Apache-2.0 |
| 5 | + |
| 6 | +from __future__ import annotations |
| 7 | + |
| 8 | +import itertools |
| 9 | +import math |
| 10 | +import os |
| 11 | +import pickle |
| 12 | +import random |
| 13 | +import torch |
| 14 | + |
| 15 | +import omni.log |
| 16 | +from isaaclab.scene import InteractiveScene |
| 17 | + |
| 18 | +from .trajectory_sampling_cfg import TrajectorySamplingCfg |
| 19 | + |
| 20 | + |
| 21 | +class TrajectorySampling: |
| 22 | + def __init__(self, cfg: TrajectorySamplingCfg, scene: InteractiveScene): |
| 23 | + # save cfg and env |
| 24 | + self.cfg = cfg |
| 25 | + self.scene = scene |
| 26 | + |
| 27 | + def sample_paths(self, num_paths, min_path_length, max_path_length, seed: int = 1) -> torch.Tensor: |
| 28 | + """ |
| 29 | + Sample Trajectories over the entire terrain. |
| 30 | +
|
| 31 | + Args: |
| 32 | + num_paths: Number of paths to sample per terrain. |
| 33 | + min_path_length: Minimum path length. |
| 34 | + max_path_length: Maximum path length. |
| 35 | + seed: Random seed. |
| 36 | + filter_target_within_terrain: If True, the target point will be within the same terrain as the start point. |
| 37 | +
|
| 38 | + Returns: |
| 39 | + A tensor of shape [num_paths, 7] containing the sampled paths. |
| 40 | + """ |
| 41 | + |
| 42 | + # load paths if they exist |
| 43 | + if self.cfg.enable_saved_paths_loading: |
| 44 | + filename = self._get_save_path_trajectories(seed, num_paths, min_path_length, max_path_length) |
| 45 | + if os.path.isfile(filename): |
| 46 | + with open(filename, "rb") as f: |
| 47 | + saved_paths = pickle.load(f) |
| 48 | + # add loaded path dict to data dict |
| 49 | + omni.log.info( |
| 50 | + f"Loaded {num_paths} with [{min_path_length},{max_path_length}] length generated with seed {seed}." |
| 51 | + ) |
| 52 | + return saved_paths |
| 53 | + |
| 54 | + # analyse terrain if not done yet |
| 55 | + if not hasattr(self, "terrain_analyser"): |
| 56 | + # check if singleton is used and available |
| 57 | + if ( |
| 58 | + hasattr(self.cfg.terrain_analysis.class_type, "instance") |
| 59 | + and self.cfg.terrain_analysis.class_type.instance() is not None |
| 60 | + ): |
| 61 | + self.terrain_analyser = self.cfg.terrain_analysis.class_type.instance() |
| 62 | + else: |
| 63 | + self.terrain_analyser = self.cfg.terrain_analysis.class_type( |
| 64 | + self.cfg.terrain_analysis, scene=self.scene |
| 65 | + ) |
| 66 | + if not self.terrain_analyser.complete: |
| 67 | + self.terrain_analyser.analyse() |
| 68 | + |
| 69 | + # map distance to idx pairs |
| 70 | + random.seed(seed) |
| 71 | + |
| 72 | + # get index of samples within length |
| 73 | + within_length = (self.terrain_analyser.samples[:, 2] > min_path_length) & ( |
| 74 | + self.terrain_analyser.samples[:, 2] <= max_path_length |
| 75 | + ) |
| 76 | + |
| 77 | + # apply the within_length filter first |
| 78 | + filtered_samples = self.terrain_analyser.samples[within_length] |
| 79 | + |
| 80 | + # randomly permute the filtered samples |
| 81 | + rand_idx = torch.randperm(filtered_samples.shape[0], device=self.terrain_analyser.device) |
| 82 | + |
| 83 | + # select the samples |
| 84 | + selected_samples = filtered_samples[rand_idx][:num_paths] |
| 85 | + |
| 86 | + # filter edge cases |
| 87 | + if selected_samples.shape[0] == 0: |
| 88 | + raise ValueError(f"No paths found with length [{min_path_length},{max_path_length}]") |
| 89 | + if selected_samples.shape[0] < num_paths: |
| 90 | + omni.log.warn( |
| 91 | + f"Only {selected_samples.shape[0]} paths found with length [{min_path_length},{max_path_length}]" |
| 92 | + f" instead of {num_paths}" |
| 93 | + ) |
| 94 | + |
| 95 | + # get start, goal and path length |
| 96 | + data = torch.zeros((selected_samples.shape[0], 7)) |
| 97 | + data[:, :3] = self.terrain_analyser.points[selected_samples[:, 0].type(torch.int64)] |
| 98 | + data[:, 3:6] = self.terrain_analyser.points[selected_samples[:, 1].type(torch.int64)] |
| 99 | + data[:, 6] = selected_samples[:, 2] |
| 100 | + |
| 101 | + # save data as pickle |
| 102 | + if self.cfg.enable_saved_paths_loading: |
| 103 | + filename = self._get_save_path_trajectories(seed, num_paths, min_path_length, max_path_length) |
| 104 | + with open(filename, "wb") as f: |
| 105 | + pickle.dump(data, f) |
| 106 | + |
| 107 | + # define start points |
| 108 | + return data |
| 109 | + |
| 110 | + def sample_paths_by_terrain( |
| 111 | + self, |
| 112 | + num_paths, |
| 113 | + min_path_length, |
| 114 | + max_path_length, |
| 115 | + seed: int = 1, |
| 116 | + filter_target_within_terrain: bool = True, |
| 117 | + terrain_level_sampling: bool = False, |
| 118 | + ) -> torch.Tensor: |
| 119 | + """ |
| 120 | + Sample Trajectories by subterrains. |
| 121 | +
|
| 122 | + Args: |
| 123 | + num_paths: Number of paths to sample per terrain. |
| 124 | + min_path_length: Minimum path length. |
| 125 | + max_path_length: Maximum path length. |
| 126 | + seed: Random seed. |
| 127 | + filter_target_within_terrain: If True, the target point will be within the same terrain as the start point. |
| 128 | + terrain_level_sampling: If True, num_paths paths will be sampled for each terrain level instead of num_paths paths for the entire terrain. |
| 129 | +
|
| 130 | + Returns: |
| 131 | + A tensor of shape [row, col, num_paths, 7] containing the sampled paths. |
| 132 | + """ |
| 133 | + |
| 134 | + # load paths if they exist |
| 135 | + if self.cfg.enable_saved_paths_loading: |
| 136 | + if self.scene.terrain.cfg.terrain_type == "generator": |
| 137 | + omni.log.warn( |
| 138 | + "You are loading pre-computed paths for a terrain that is being generated live. " |
| 139 | + "Make sure the same random seed has been set." |
| 140 | + ) |
| 141 | + filename = self._get_save_path_trajectories(seed, num_paths, min_path_length, max_path_length) |
| 142 | + if os.path.isfile(filename): |
| 143 | + with open(filename, "rb") as f: |
| 144 | + saved_paths = pickle.load(f) |
| 145 | + omni.log.info( |
| 146 | + f"Loaded {num_paths} with [{min_path_length},{max_path_length}] length generated with" |
| 147 | + f" seed {seed}." |
| 148 | + ) |
| 149 | + return saved_paths |
| 150 | + |
| 151 | + assert self.scene.terrain.terrain_origins is not None, ( |
| 152 | + "Sampling paths by terrains needs terrain origins. If you are using a USD, make sure you have a " |
| 153 | + "version of IsaacLab-Internal that assigns terrain_origins for USDs in terrain_importer." |
| 154 | + ) |
| 155 | + |
| 156 | + # analyse terrain if not done yet |
| 157 | + if not hasattr(self, "terrain_analyser"): |
| 158 | + # check if singleton is used and available |
| 159 | + if ( |
| 160 | + hasattr(self.cfg.terrain_analysis.class_type, "instance") |
| 161 | + and self.cfg.terrain_analysis.class_type.instance() is not None |
| 162 | + ): |
| 163 | + self.terrain_analyser = self.cfg.terrain_analysis.class_type.instance() |
| 164 | + else: |
| 165 | + self.terrain_analyser = self.cfg.terrain_analysis.class_type( |
| 166 | + self.cfg.terrain_analysis, scene=self.scene |
| 167 | + ) |
| 168 | + if not self.terrain_analyser.complete: |
| 169 | + self.terrain_analyser.analyse() |
| 170 | + |
| 171 | + # map distance to idx pairs |
| 172 | + random.seed(seed) |
| 173 | + |
| 174 | + # get index of samples within length |
| 175 | + within_length = (self.terrain_analyser.samples[:, 2] > min_path_length) & ( |
| 176 | + self.terrain_analyser.samples[:, 2] <= max_path_length |
| 177 | + ) |
| 178 | + |
| 179 | + # apply the within_length filter |
| 180 | + filtered_samples = self.terrain_analyser.samples[within_length] |
| 181 | + # returns a tensor [row_idx, col_idx] |
| 182 | + filtered_samples_subterrains_origins = self.terrain_analyser.sample_terrain_origins[within_length] |
| 183 | + |
| 184 | + # filter if start and end point within the same terrain |
| 185 | + if filter_target_within_terrain: |
| 186 | + filtered_samples_subterrains_targets = self.terrain_analyser.sample_terrain_targets[within_length] |
| 187 | + |
| 188 | + # filter target points within the same terrain as the start points |
| 189 | + same_terrain = torch.all( |
| 190 | + filtered_samples_subterrains_origins == filtered_samples_subterrains_targets, dim=-1 |
| 191 | + ) |
| 192 | + filtered_samples = filtered_samples[same_terrain] |
| 193 | + filtered_samples_subterrains_origins = filtered_samples_subterrains_origins[same_terrain] |
| 194 | + |
| 195 | + # randomly permute the filtered samples |
| 196 | + rand_idx = torch.randperm(filtered_samples.shape[0], device=self.terrain_analyser.device) |
| 197 | + |
| 198 | + # select the samples |
| 199 | + randomized_samples = filtered_samples[rand_idx] |
| 200 | + randomized_samples_subterrains_origins = filtered_samples_subterrains_origins[rand_idx] |
| 201 | + |
| 202 | + # filter edge cases |
| 203 | + assert ( |
| 204 | + randomized_samples.shape[0] > 0 |
| 205 | + ), f"[ERROR] No paths found with length [{min_path_length},{max_path_length}]" |
| 206 | + if randomized_samples.shape[0] < num_paths: |
| 207 | + omni.log.warn( |
| 208 | + f"Only {randomized_samples.shape[0]} paths found with length" |
| 209 | + f" [{min_path_length},{max_path_length}] instead of {num_paths}" |
| 210 | + ) |
| 211 | + |
| 212 | + # Make a samples by terrain tensor for easy indexing in goal_command. We need the equivalent number of paths |
| 213 | + # per terrain, so we take the min number of paths in the terrains and trim each terrain's paths to that number. |
| 214 | + num_rows, num_cols = self.scene.terrain.terrain_origins.shape[:2] |
| 215 | + if terrain_level_sampling: |
| 216 | + terrain_levels, samples_per_terrain_level = torch.unique( |
| 217 | + randomized_samples_subterrains_origins[:, 0], return_counts=True |
| 218 | + ) |
| 219 | + assert len(terrain_levels) == num_rows, "Not all terrain levels have paths." |
| 220 | + if samples_per_terrain_level.min().item() < num_paths: |
| 221 | + omni.log.warn( |
| 222 | + f"Only {samples_per_terrain_level.min().item()} paths found for terrain level " |
| 223 | + f"{terrain_levels[samples_per_terrain_level.min().item()]} instead of {num_paths}" |
| 224 | + ) |
| 225 | + samples_per_terrain_level = samples_per_terrain_level.min().item() |
| 226 | + else: |
| 227 | + samples_per_terrain_level = num_paths |
| 228 | + |
| 229 | + samples_by_terrain = torch.zeros(num_rows, samples_per_terrain_level, 7) |
| 230 | + for row in range(num_rows): |
| 231 | + mask = randomized_samples_subterrains_origins[:, 0] == row |
| 232 | + clipped = randomized_samples[mask][:samples_per_terrain_level] |
| 233 | + samples_by_terrain[row, :, :3] = self.terrain_analyser.points[clipped[:, 0].type(torch.int64)] |
| 234 | + samples_by_terrain[row, :, 3:6] = self.terrain_analyser.points[clipped[:, 1].type(torch.int64)] |
| 235 | + samples_by_terrain[row, :, 6] = clipped[:, 2] |
| 236 | + else: |
| 237 | + subterrain_idx_origins = ( |
| 238 | + randomized_samples_subterrains_origins[:, 0] * num_cols + randomized_samples_subterrains_origins[:, 1] |
| 239 | + ) |
| 240 | + env_samples, samples_per_terrain = torch.unique(subterrain_idx_origins, return_counts=True) |
| 241 | + assert len(env_samples) == num_rows * num_cols, "Not all terrains have paths." |
| 242 | + if samples_per_terrain.min().item() < num_paths / (num_rows * num_cols): |
| 243 | + omni.log.warn( |
| 244 | + f"Only {samples_per_terrain.min().item()} paths found per terrain instead of" |
| 245 | + f" {num_paths / (num_rows * num_cols)}" |
| 246 | + ) |
| 247 | + samples_per_terrain = samples_per_terrain.min().item() |
| 248 | + else: |
| 249 | + samples_per_terrain = math.floor(num_paths / (num_rows * num_cols)) |
| 250 | + |
| 251 | + # Make the return tensor, of shape [num_terrain_levels, num_terrain_types, num_paths, 7] |
| 252 | + samples_by_terrain = torch.zeros(num_rows, num_cols, samples_per_terrain, 7) |
| 253 | + for row, col in itertools.product(range(num_rows), range(num_cols)): |
| 254 | + mask = subterrain_idx_origins == int(row * num_cols + col) |
| 255 | + clipped = randomized_samples[mask][:samples_per_terrain] |
| 256 | + samples_by_terrain[row, col, :, :3] = self.terrain_analyser.points[clipped[:, 0].type(torch.int64)] |
| 257 | + samples_by_terrain[row, col, :, 3:6] = self.terrain_analyser.points[clipped[:, 1].type(torch.int64)] |
| 258 | + samples_by_terrain[row, col, :, 6] = clipped[:, 2] |
| 259 | + |
| 260 | + # save curr_data as pickle |
| 261 | + if self.cfg.enable_saved_paths_loading: |
| 262 | + filename = self._get_save_path_trajectories(seed, num_paths, min_path_length, max_path_length) |
| 263 | + with open(filename, "wb") as f: |
| 264 | + pickle.dump(samples_by_terrain, f) |
| 265 | + |
| 266 | + # define start points |
| 267 | + return samples_by_terrain |
| 268 | + |
| 269 | + ### |
| 270 | + # Save paths |
| 271 | + ### |
| 272 | + |
| 273 | + def _get_save_path_trajectories(self, seed, num_path: int, min_len: float, max_len: float) -> str: |
| 274 | + filename = f"paths_seed{seed}_paths{num_path}_min{min_len}_max{max_len}.pkl" |
| 275 | + # get env name |
| 276 | + if isinstance(self.scene.terrain.cfg.usd_path, str): |
| 277 | + terrain_file_path = self.scene.terrain.cfg.usd_path |
| 278 | + else: |
| 279 | + terrain_file_path = None |
| 280 | + omni.log.info("Terrain is generated, trajectories will be saved under 'logs' directory.") |
| 281 | + |
| 282 | + if terrain_file_path: |
| 283 | + env_name = os.path.splitext(terrain_file_path)[0] |
| 284 | + # create directory if necessary |
| 285 | + filedir = os.path.join(terrain_file_path, env_name) |
| 286 | + os.makedirs(filedir, exist_ok=True) |
| 287 | + return os.path.join(filedir, filename) |
| 288 | + else: |
| 289 | + os.makedirs("logs", exist_ok=True) |
| 290 | + log_path = os.path.join("logs", filename) |
| 291 | + return os.path.abspath(log_path) |
0 commit comments