새소식

논문 리뷰

DTG : inference

  • -

Results

After 1 epoch

1에폭 학습 현황

 

After 5 epoch

5에폭 학습 현황

 

우선 데이터셋으로 제공해주는 pkl 파일 안에 뭐가 있는지 확인해보자. 

Train Dataset

모델 학습을 위한 raw 센서 데이터

  • pose: 로봇의 현재 위치/자세
  • vel: 속도 데이터 (50개 시퀀스)
  • imu: IMU 센서 데이터 (20개 시퀀스)
  • camera: 카메라 이미지
  • lidar: 3D LiDAR 데이터
  • lidar2d: 2D LiDAR 스캔 데이터
  • targets: 목표 위치들 (100개)
  • trajectories: gt trajectory
  • local_map: 로컬 맵 데이터
Data type: <class 'dict'>

Dictionary keys:
pose: type=<class 'numpy.ndarray'>, shape=(3, 4)
First element type: <class 'numpy.ndarray'>
First element shape: (4,)
vel: type=<class 'list'>, shape=(50, 2)
First element type: <class 'list'>
First element shape: (2,)
imu: type=<class 'list'>, shape=(20, 6)
First element type: <class 'list'>
First element shape: (6,)
time: type=<class 'float'>, value=1681656095.9803696
camera: type=<class 'list'>, shape=(1, 720, 1280, 3)
First element type: <class 'numpy.ndarray'>
First element shape: (720, 1280, 3)
lidar: type=<class 'list'>, shape=List of length 3
First element type: <class 'numpy.ndarray'>
First element shape: (25532, 6)
lidar2d: type=<class 'list'>, shape=(3, 16, 1824)
First element type: <class 'numpy.ndarray'>
First element shape: (16, 1824)
targets: type=<class 'list'>, shape=List of length 100
First element type: <class 'list'>
First element shape: List of length 2
trajectories: type=<class 'list'>, shape=(100, 16, 2)
First element type: <class 'list'>
First element shape: (16, 2)
local_map: type=<class 'numpy.ndarray'>, shape=(600, 600)
First element type: <class 'numpy.ndarray'>
First element shape: (600,)

 

/data.pkl

Data type: <class 'dict'>

Dictionary keys:
file_names: type=<class 'list'>, shape=(1034, 2)
First element type: <class 'list'>
First element shape: (2,)
all_positions: type=<class 'numpy.ndarray'>, shape=(2076, 3)
First element type: <class 'numpy.ndarray'>
First element shape: (3,)
network: type=<class 'networkx.classes.graph.Graph'>, value=Graph with 2076 nodes and 18625 edge

내부 데이터 시각화 코드

import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

def visualize_sample_data(data):
    fig = plt.figure(figsize=(20, 12))
    
    ax1 = fig.add_subplot(231)
    vel_data = np.array(data['vel'])
    ax1.plot(vel_data[:, 0], label='X velocity')
    ax1.plot(vel_data[:, 1], label='Y velocity')
    ax1.set_title('Velocity Data')
    ax1.set_xlabel('Time step')
    ax1.set_ylabel('Velocity')
    ax1.legend()
    
    ax2 = fig.add_subplot(232)
    camera_img = data['camera'][0]
    ax2.imshow(camera_img)
    ax2.set_title('Camera Image')
    ax2.axis('off')
    
    ax3 = fig.add_subplot(233, projection='3d')
    lidar_points = data['lidar'][0]  
    ax3.scatter(lidar_points[:, 0], 
                lidar_points[:, 1], 
                lidar_points[:, 2], 
                c=lidar_points[:, 3],  
                s=1)
    ax3.set_title('3D LiDAR Points')
    ax3.set_xlabel('X')
    ax3.set_ylabel('Y')
    ax3.set_zlabel('Z')
    
    ax4 = fig.add_subplot(234)
    lidar2d = data['lidar2d'][0] 
    ax4.imshow(lidar2d, aspect='auto')
    ax4.set_title('2D LiDAR Scan')
    ax4.set_xlabel('Angle')
    ax4.set_ylabel('Channel')
    
    ax5 = fig.add_subplot(235)
    trajectories = np.array(data['trajectories'])
    for traj in trajectories:
        ax5.plot(traj[:, 0], traj[:, 1], 'b-', alpha=0.3)
    ax5.set_title('Ground Truth Trajectories')
    ax5.set_xlabel('X')
    ax5.set_ylabel('Y')
    
    ax6 = fig.add_subplot(236)
    ax6.imshow(data['local_map'], cmap='gray')
    ax6.set_title('Local Map')
    ax6.set_xlabel('X')
    ax6.set_ylabel('Y')
    
    plt.tight_layout()
    plt.show()
    
import pickle
with open('your_pickle_file.pkl', 'rb') as f:
   data = pickle.load(f)
visualize_sample_data(data)

 

 

원래 코드에서 모델 출력을 담당하는 부분은 이곳이다. 

class HNav(nn.Module):
    def sample(self, input_dict):
        output = {}
        if DataDict.path in input_dict.keys():
            output.update({DataDict.path: input_dict[DataDict.path]})
        if DataDict.heuristic in input_dict.keys():
            output.update({DataDict.heuristic: input_dict[DataDict.heuristic]})
        if DataDict.local_map in input_dict.keys():
            output.update({DataDict.local_map: input_dict[DataDict.local_map]})
        observation = self.perception(lidar=input_dict[DataDict.lidar], vel=input_dict[DataDict.vel],
                                      target=input_dict[DataDict.target])
        generator_output = self.generator.sample(observation=observation)
        output.update(generator_output)
        return output
# /models/difffusion.py

    @torch.no_grad()
    def sample(self, observation):
        h = self.encoder(observation)  # B x 512
        h_condition = self.trajectory_condition(h)

        B, C = h_condition.shape
        trajectory = torch.randn(size=(h_condition.shape[0], self.waypoints_num, self.waypoint_dim),
                                 dtype=h_condition.dtype, device=h_condition.device, generator=None)
        all_trajectories = []
        scheduler = self.noise_scheduler
        scheduler.set_timesteps(self.time_steps)
        for t in scheduler.timesteps:
            if (self.sample_times >= 0) and (t < self.time_steps - 1 - self.sample_times):
                break
            t = t.to(h_condition.device)
            model_output = self.diff_model(trajectory, t.unsqueeze(0).repeat(B, ), local_cond=None,
                                           global_cond=h_condition)
            trajectory = scheduler.step(model_output, t, trajectory, generator=None).prev_sample.contiguous()
            if self.use_all_paths:
                all_trajectories.append(model_output.clone().detach().cpu().numpy())
        output = {
            DataDict.prediction: trajectory,
            DataDict.all_trajectories: all_trajectories,
        }
        return output

 

gt를 출력해보면 다음과 같다. 

def plot_prediction(predictions, index=0, figsize=(12, 12)):
    data = predictions[index]
    
    plt.figure(figsize=figsize)
    
    if 'local_map' in data:
        local_map = data['local_map'][0]  
        plt.imshow(local_map, cmap='gray', extent=[-30, 30, 30, -30], alpha=0.5)
    
    if 'path' in data:
        path = data['path'][0] 
        plt.plot(path[:, 1].clip(-30, 30), path[:, 0].clip(-30, 30), 
                'g-', label='Ground Truth', linewidth=10)
    
    if 'prediction' in data:
        pred = data['prediction'][0]  
        plt.plot(pred[:, 1].clip(-30, 30), pred[:, 0].clip(-30, 30), 
                'r--', label='Prediction', linewidth=2)
    
    plt.xlim(-30, 30)
    plt.ylim(-30, 30)
    plt.grid(True)
    plt.legend()
    plt.title(f'Trajectory Prediction (Sample {index})')
    plt.xlabel('X (m)')
    plt.ylabel('Y (m)')
    
    plt.show()

 

 

1 에폭으로 돌렸을 때 inference 결과는 이렇다. 

inference result 경로가 좀 이상하다.

 

Inference code

모델 출력 scaling을 해 줘야 한다. 

import argparse
import pickle
import torch
import numpy as np
from tqdm import tqdm
import os

from src.utils.configs import TrainingConfig, GeneratorType, DataDict
from src.models.model import get_model
from src.utils.functions import to_device, get_device, release_cuda
from src.data_loader.data_loader import evaluation_data_loader

def get_args():
    parser = argparse.ArgumentParser(description='Model Inference')
    parser.add_argument('--checkpoint', type=str, required=True, help='Path to model checkpoint')
    parser.add_argument('--data', type=str, required=True, help='Path to test data')
    parser.add_argument('--device', type=int, default=-1, help='GPU device ID (-1 for auto)')
    parser.add_argument('--batch_size', type=int, default=1, help='Batch size for inference')
    parser.add_argument('--output', type=str, default='predictions.pkl', help='Output file path')
    return parser.parse_args()

def verify_data_format(data_dict):
    """Verify and print data format information"""
    print("\nInput data format:")
    for key, value in data_dict.items():
        if isinstance(value, torch.Tensor):
            print(f"{key}: shape={value.shape}, type={value.dtype}, "
                  f"range=[{value.min().item():.3f}, {value.max().item():.3f}]")

class Inferencer:
    def __init__(self, cfg, checkpoint_path):
        self.cfg = cfg
        
        if cfg.gpus.device == "cuda":
            self.device = "cuda"
        else:
            self.device = get_device(device=cfg.gpus.device)
            
        self.generator_type = cfg.model.generator_type
        self.use_traversability = cfg.model.diffusion.use_traversability if self.generator_type == GeneratorType.diffusion else cfg.model.cvae.estimate_traversability
        self.train_poses = cfg.loss.train_poses  
        self.scale_waypoints = 20.0
        
        self.map_resolution = cfg.data.map_resolution if hasattr(cfg.data, 'map_resolution') else 1.0
        self.map_range = cfg.data.map_range if hasattr(cfg.data, 'map_range') else 0.0

        
        self.model = get_model(config=cfg.model, device=self.device)
        
        print(f'Loading checkpoint from "{checkpoint_path}"')
        state_dict = torch.load(checkpoint_path, map_location=torch.device(self.device))
        
        if 'state_dict' in state_dict:
            model_dict = state_dict['state_dict']
        else:
            model_dict = state_dict
            
        self.model.load_state_dict(model_dict, strict=False)
        self.model = self.model.to(self.device)
        
        self.model.eval()
        torch.set_grad_enabled(False)
        
        print(f"Model device: {next(self.model.parameters()).device}")
        print(f"Generator type: {self.generator_type}")
        
    def preprocess_data(self, data_dict):
        """Preprocess input data"""
        processed_dict = {}
        
        if DataDict.lidar in data_dict:
            lidar_data = data_dict[DataDict.lidar]
            if len(lidar_data.shape) == 3: 
                lidar_data = lidar_data.unsqueeze(1)
            processed_dict[DataDict.lidar] = lidar_data
            

        if DataDict.vel in data_dict:
            vel_data = data_dict[DataDict.vel]
            if len(vel_data.shape) == 2:  
                vel_data = vel_data.unsqueeze(1)
            processed_dict[DataDict.vel] = vel_data
            
        if DataDict.target in data_dict:
            processed_dict[DataDict.target] = data_dict[DataDict.target]
            
        for key in [DataDict.path, DataDict.local_map, DataDict.heuristic]:
            if key in data_dict:
                processed_dict[key] = data_dict[key]
                
        return processed_dict
        

    def postprocess_output(self, output_dict):
        processed_dict = {}
        
        if DataDict.path in output_dict:
            path = output_dict[DataDict.path]
            processed_dict[DataDict.path] = path
            
            path_min = path.min().item()
            path_max = path.max().item()
            path_range = path_max - path_min
            path_mean = path.mean().item()
        else:
            path_range = 20.0
            path_mean = 0.0
        
        if DataDict.prediction in output_dict:
            pred = output_dict[DataDict.prediction]
            path = output_dict[DataDict.path]
            
            if self.train_poses:
                path_diff = path[:, 1:] - path[:, :-1]
                path_dist = torch.sqrt((path_diff ** 2).sum(dim=-1)).sum()
                
                pred_diff = pred[:, 1:] - pred[:, :-1]
                pred_dist = torch.sqrt((pred_diff ** 2).sum(dim=-1)).sum()
                
                pred = pred * (path_dist / (pred_dist + 1e-6))
                
                pred = pred + (path[:, 0:1, :] - pred[:, 0:1, :])
                
            else:
                pred = torch.cumsum(pred, dim=1)
                
                path_diff = path[:, 1:] - path[:, :-1]
                path_dist = torch.sqrt((path_diff ** 2).sum(dim=-1)).sum()
                
                pred_diff = pred[:, 1:] - pred[:, :-1]
                pred_dist = torch.sqrt((pred_diff ** 2).sum(dim=-1)).sum()
                
                pred = pred * (path_dist / (pred_dist + 1e-6))
                
                pred = pred + (path[:, 0:1, :] - pred[:, 0:1, :])
                
            processed_dict[DataDict.prediction] = pred
                
        if DataDict.local_map in output_dict:
            processed_dict[DataDict.local_map] = output_dict[DataDict.local_map]
            
        if DataDict.heuristic in output_dict:
            processed_dict[DataDict.heuristic] = output_dict[DataDict.heuristic]
                    
        return processed_dict
    
    def inference_step(self, data_dict):

        data_dict = self.preprocess_data(data_dict)
        data_dict = to_device(data_dict, device=self.device)
        
        with torch.no_grad():
            output_dict = self.model(data_dict, sample=True)
            
        output_dict = self.postprocess_output(output_dict)
        return release_cuda(output_dict)
    
    def run_inference(self, data_loader):
        predictions = []
        first_batch = True
        
        for data_dict in tqdm(data_loader, desc="Running inference"):
            output_dict = self.inference_step(data_dict)
            predictions.append(output_dict)
            
            if first_batch:
                print("\nFirst batch output ranges:")
                for key, value in output_dict.items():
                    if isinstance(value, (torch.Tensor, np.ndarray)):
                        if isinstance(value, torch.Tensor):
                            value = value.cpu().numpy()
                        print(f"{key}: shape={value.shape}, "
                              f"range=[{np.min(value):.3f}, {np.max(value):.3f}], "
                              f"mean={np.mean(value):.3f}")
                first_batch = False
        
        return predictions

def main():
    args = get_args()
    
    cfg = TrainingConfig
    
    cfg.data.root = args.data
    cfg.data.batch_size = args.batch_size
    cfg.data.shuffle = False
    cfg.data.distributed = False
    
    if args.device >= 0:
        cfg.gpus.device = f"cuda:{args.device}"
    elif args.device == -1:
        cfg.gpus.device = "cuda" if torch.cuda.is_available() else "cpu"
    else:
        cfg.gpus.device = "cpu"
    
    print(f"Using device: {cfg.gpus.device}")
    
    data_loader = evaluation_data_loader(cfg=cfg.data)
    
    for batch in data_loader:
        verify_data_format(batch)
        break
    
    inferencer = Inferencer(cfg, args.checkpoint)
    
    predictions = inferencer.run_inference(data_loader)
    
    print(f"Saving predictions to {args.output}")
    with open(args.output, 'wb') as f:
        pickle.dump(predictions, f)

if __name__ == "__main__":
    main()

 

output_dict.path

tensor([[[  0.9600,  -0.4000],
         [  1.9300,  -0.8100],
         [  2.9000,  -1.2200],
         [  3.3600,  -2.0400],
         [  3.6400,  -3.0100],
         [  3.9200,  -3.9800],
         [  4.1900,  -4.9500],
         [  4.4700,  -5.9200],
         [  4.7500,  -6.8900],
         [  5.0300,  -7.8600],
         [  5.3000,  -8.8300],
         [  5.5800,  -9.7900],
         [  5.8500, -10.7500],
         [  5.9800, -11.7100],
         [  6.1100, -12.6700],
         [  6.2400, -13.6300]]], device='cuda:1')

 

output_dict.prediction

tensor([[[ 0.1718, -0.0056],
         [ 0.1897, -0.0169],
         [ 0.1974, -0.0242],
         [ 0.2010, -0.0291],
         [ 0.2027, -0.0323],
         [ 0.2035, -0.0344],
         [ 0.2038, -0.0357],
         [ 0.2039, -0.0366],
         [ 0.2039, -0.0372],
         [ 0.2038, -0.0376],
         [ 0.2037, -0.0379],
         [ 0.2035, -0.0381],
         [ 0.2034, -0.0383],
         [ 0.2033, -0.0384],
         [ 0.2032, -0.0385],
         [ 0.2032, -0.0386]]], device='cuda:1')

 

 

 

 

 

Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.