Source code for packnet_sfm.datasets.dgp_dataset

# Copyright 2020 Toyota Research Institute.  All rights reserved.

import os
import torch
import numpy as np

from dgp.datasets.synchronized_dataset import SynchronizedSceneDataset
from dgp.utils.camera import Camera, generate_depth_map
from dgp.utils.geometry import Pose

from packnet_sfm.utils.misc import make_list
from packnet_sfm.utils.types import is_tensor, is_numpy, is_list

########################################################################################################################
#### FUNCTIONS
########################################################################################################################

[docs]def stack_sample(sample): """Stack a sample from multiple sensors""" # If there is only one sensor don't do anything if len(sample) == 1: return sample[0] # Otherwise, stack sample stacked_sample = {} for key in sample[0]: # Global keys (do not stack) if key in ['idx', 'dataset_idx', 'sensor_name', 'filename']: stacked_sample[key] = sample[0][key] else: # Stack torch tensors if is_tensor(sample[0][key]): stacked_sample[key] = torch.stack([s[key] for s in sample], 0) # Stack numpy arrays elif is_numpy(sample[0][key]): stacked_sample[key] = np.stack([s[key] for s in sample], 0) # Stack list elif is_list(sample[0][key]): stacked_sample[key] = [] # Stack list of torch tensors if is_tensor(sample[0][key][0]): for i in range(len(sample[0][key])): stacked_sample[key].append( torch.stack([s[key][i] for s in sample], 0)) # Stack list of numpy arrays if is_numpy(sample[0][key][0]): for i in range(len(sample[0][key])): stacked_sample[key].append( np.stack([s[key][i] for s in sample], 0)) # Return stacked sample return stacked_sample
######################################################################################################################## #### DATASET ########################################################################################################################
[docs]class DGPDataset: """ DGP dataset class Parameters ---------- path : str Path to the dataset split : str {'train', 'val', 'test'} Which dataset split to use cameras : list of str Which cameras to get information from depth_type : str Which lidar will be used to generate ground-truth information with_pose : bool If enabled pose estimates are also returned with_semantic : bool If enabled semantic estimates are also returned back_context : int Size of the backward context forward_context : int Size of the forward context data_transform : Function Transformations applied to the sample """ def __init__(self, path, split, cameras=None, depth_type=None, with_pose=False, with_semantic=False, back_context=0, forward_context=0, data_transform=None, ): self.path = path self.split = split self.dataset_idx = 0 self.bwd = back_context self.fwd = forward_context self.has_context = back_context + forward_context > 0 self.num_cameras = len(cameras) self.data_transform = data_transform self.depth_type = depth_type self.with_depth = depth_type is not None self.with_pose = with_pose self.with_semantic = with_semantic self.dataset = SynchronizedSceneDataset(path, split=split, datum_names=cameras, backward_context=back_context, forward_context=forward_context, requested_annotations=None, only_annotated_datums=False, )
[docs] def generate_depth_map(self, sample_idx, datum_idx, filename): """ Generates the depth map for a camera by projecting LiDAR information. It also caches the depth map following DGP folder structure, so it's not recalculated Parameters ---------- sample_idx : int sample index datum_idx : int Datum index filename : Filename used for loading / saving Returns ------- depth : np.array [H, W] Depth map for that datum in that sample """ # Generate depth filename filename = '{}/{}.npz'.format( os.path.dirname(self.path), filename.format('depth/{}'.format(self.depth_type))) # Load and return if exists if os.path.exists(filename): return np.load(filename)['depth'] # Otherwise, create, save and return else: # Get pointcloud scene_idx, sample_idx_in_scene, _ = self.dataset.dataset_item_index[sample_idx] pc_datum_idx_in_sample = self.dataset.get_datum_index_for_datum_name( scene_idx, sample_idx_in_scene, self.depth_type) pc_datum_data = self.dataset.get_point_cloud_from_datum( scene_idx, sample_idx_in_scene, pc_datum_idx_in_sample) # Create camera camera_rgb = self.get_current('rgb', datum_idx) camera_pose = self.get_current('pose', datum_idx) camera_intrinsics = self.get_current('intrinsics', datum_idx) camera = Camera(K=camera_intrinsics, p_cw=camera_pose.inverse()) # Generate depth map world_points = pc_datum_data['pose'] * pc_datum_data['point_cloud'] depth = generate_depth_map(camera, world_points, camera_rgb.size[::-1]) # Save depth map os.makedirs(os.path.dirname(filename), exist_ok=True) np.savez_compressed(filename, depth=depth) # Return depth map return depth
[docs] def get_current(self, key, sensor_idx): """Return current timestep of a key from a sensor""" return self.sample_dgp[self.bwd][sensor_idx][key]
[docs] def get_backward(self, key, sensor_idx): """Return backward timesteps of a key from a sensor""" return [] if self.bwd == 0 else \ [self.sample_dgp[i][sensor_idx][key] \ for i in range(0, self.bwd)]
[docs] def get_forward(self, key, sensor_idx): """Return forward timestep of a key from a sensor""" return [] if self.fwd == 0 else \ [self.sample_dgp[i][sensor_idx][key] \ for i in range(self.bwd + 1, self.bwd + self.fwd + 1)]
[docs] def get_context(self, key, sensor_idx): """Get both backward and forward contexts""" return self.get_backward(key, sensor_idx) + self.get_forward(key, sensor_idx)
[docs] def get_filename(self, sample_idx, datum_idx): """ Returns the filename for an index, following DGP structure Parameters ---------- sample_idx : int Sample index datum_idx : int Datum index Returns ------- filename : str Filename for the datum in that sample """ scene_idx, sample_idx_in_scene, datum_indices = self.dataset.dataset_item_index[sample_idx] scene_dir = self.dataset.get_scene_directory(scene_idx) filename = self.dataset.get_datum( scene_idx, sample_idx_in_scene, datum_indices[datum_idx]).datum.image.filename return os.path.splitext(os.path.join(os.path.basename(scene_dir), filename.replace('rgb', '{}')))[0]
def __len__(self): """Length of dataset""" return len(self.dataset) def __getitem__(self, idx): """Get a dataset sample""" # Get DGP sample (if single sensor, make it a list) self.sample_dgp = self.dataset[idx] self.sample_dgp = [make_list(sample) for sample in self.sample_dgp] # Loop over all cameras sample = [] for i in range(self.num_cameras): data = { 'idx': idx, 'dataset_idx': self.dataset_idx, 'sensor_name': self.get_current('datum_name', i), # 'filename': self.get_filename(idx, i), 'splitname': '%s_%010d' % (self.split, idx), # 'rgb': self.get_current('rgb', i), 'intrinsics': self.get_current('intrinsics', i), } # If depth is returned if self.with_depth: data.update({ 'depth': self.generate_depth_map(idx, i, data['filename']) }) # If pose is returned if self.with_pose: data.update({ 'extrinsics': self.get_current('extrinsics', i).matrix, 'pose': self.get_current('pose', i).matrix, }) # If context is returned if self.has_context: data.update({ 'rgb_context': self.get_context('rgb', i), }) # If context pose is returned if self.with_pose: # Get original values to calculate relative motion orig_extrinsics = Pose.from_matrix(data['extrinsics']) orig_pose = Pose.from_matrix(data['pose']) data.update({ 'extrinsics_context': [(orig_extrinsics.inverse() * extrinsics).matrix for extrinsics in self.get_context('extrinsics', i)], 'pose_context': [(orig_pose.inverse() * pose).matrix for pose in self.get_context('pose', i)], }) sample.append(data) # Apply same data transformations for all sensors if self.data_transform: sample = [self.data_transform(smp) for smp in sample] # Return sample (stacked if necessary) return stack_sample(sample)
########################################################################################################################