새소식

논문 리뷰

DTG : inference

  • -

After 1 epoch

1에폭 학습 현황

 

After 5 epoch

5에폭 학습 현황

 

우선 데이터셋으로 제공해주는 pkl 파일 안에 뭐가 있는지 확인해보자. 

Train Dataset

모델 학습을 위한 raw 센서 데이터

  • pose: 로봇의 현재 위치/자세
  • vel: 속도 데이터 (50개 시퀀스)
  • imu: IMU 센서 데이터 (20개 시퀀스)
  • camera: 카메라 이미지
  • lidar: 3D LiDAR 데이터
  • lidar2d: 2D LiDAR 스캔 데이터
  • targets: 목표 위치들 (100개)
  • trajectories: gt trajectory
  • local_map: 로컬 맵 데이터
Data type: <class 'dict'> Dictionary keys: pose: type=<class 'numpy.ndarray'>, shape=(3, 4) First element type: <class 'numpy.ndarray'> First element shape: (4,) vel: type=<class 'list'>, shape=(50, 2) First element type: <class 'list'> First element shape: (2,) imu: type=<class 'list'>, shape=(20, 6) First element type: <class 'list'> First element shape: (6,) time: type=<class 'float'>, value=1681656095.9803696 camera: type=<class 'list'>, shape=(1, 720, 1280, 3) First element type: <class 'numpy.ndarray'> First element shape: (720, 1280, 3) lidar: type=<class 'list'>, shape=List of length 3 First element type: <class 'numpy.ndarray'> First element shape: (25532, 6) lidar2d: type=<class 'list'>, shape=(3, 16, 1824) First element type: <class 'numpy.ndarray'> First element shape: (16, 1824) targets: type=<class 'list'>, shape=List of length 100 First element type: <class 'list'> First element shape: List of length 2 trajectories: type=<class 'list'>, shape=(100, 16, 2) First element type: <class 'list'> First element shape: (16, 2) local_map: type=<class 'numpy.ndarray'>, shape=(600, 600) First element type: <class 'numpy.ndarray'> First element shape: (600,)

 

/data.pkl

Data type: <class 'dict'> Dictionary keys: file_names: type=<class 'list'>, shape=(1034, 2) First element type: <class 'list'> First element shape: (2,) all_positions: type=<class 'numpy.ndarray'>, shape=(2076, 3) First element type: <class 'numpy.ndarray'> First element shape: (3,) network: type=<class 'networkx.classes.graph.Graph'>, value=Graph with 2076 nodes and 18625 edge

내부 데이터 시각화 코드

import numpy as np import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3D def visualize_sample_data(data): fig = plt.figure(figsize=(20, 12)) ax1 = fig.add_subplot(231) vel_data = np.array(data['vel']) ax1.plot(vel_data[:, 0], label='X velocity') ax1.plot(vel_data[:, 1], label='Y velocity') ax1.set_title('Velocity Data') ax1.set_xlabel('Time step') ax1.set_ylabel('Velocity') ax1.legend() ax2 = fig.add_subplot(232) camera_img = data['camera'][0] ax2.imshow(camera_img) ax2.set_title('Camera Image') ax2.axis('off') ax3 = fig.add_subplot(233, projection='3d') lidar_points = data['lidar'][0] ax3.scatter(lidar_points[:, 0], lidar_points[:, 1], lidar_points[:, 2], c=lidar_points[:, 3], s=1) ax3.set_title('3D LiDAR Points') ax3.set_xlabel('X') ax3.set_ylabel('Y') ax3.set_zlabel('Z') ax4 = fig.add_subplot(234) lidar2d = data['lidar2d'][0] ax4.imshow(lidar2d, aspect='auto') ax4.set_title('2D LiDAR Scan') ax4.set_xlabel('Angle') ax4.set_ylabel('Channel') ax5 = fig.add_subplot(235) trajectories = np.array(data['trajectories']) for traj in trajectories: ax5.plot(traj[:, 0], traj[:, 1], 'b-', alpha=0.3) ax5.set_title('Ground Truth Trajectories') ax5.set_xlabel('X') ax5.set_ylabel('Y') ax6 = fig.add_subplot(236) ax6.imshow(data['local_map'], cmap='gray') ax6.set_title('Local Map') ax6.set_xlabel('X') ax6.set_ylabel('Y') plt.tight_layout() plt.show() import pickle with open('your_pickle_file.pkl', 'rb') as f: data = pickle.load(f) visualize_sample_data(data)

 

 

원래 코드에서 모델 출력을 담당하는 부분은 이곳이다. 

class HNav(nn.Module): def sample(self, input_dict): output = {} if DataDict.path in input_dict.keys(): output.update({DataDict.path: input_dict[DataDict.path]}) if DataDict.heuristic in input_dict.keys(): output.update({DataDict.heuristic: input_dict[DataDict.heuristic]}) if DataDict.local_map in input_dict.keys(): output.update({DataDict.local_map: input_dict[DataDict.local_map]}) observation = self.perception(lidar=input_dict[DataDict.lidar], vel=input_dict[DataDict.vel], target=input_dict[DataDict.target]) generator_output = self.generator.sample(observation=observation) output.update(generator_output) return output
# /models/difffusion.py @torch.no_grad() def sample(self, observation): h = self.encoder(observation) # B x 512 h_condition = self.trajectory_condition(h) B, C = h_condition.shape trajectory = torch.randn(size=(h_condition.shape[0], self.waypoints_num, self.waypoint_dim), dtype=h_condition.dtype, device=h_condition.device, generator=None) all_trajectories = [] scheduler = self.noise_scheduler scheduler.set_timesteps(self.time_steps) for t in scheduler.timesteps: if (self.sample_times >= 0) and (t < self.time_steps - 1 - self.sample_times): break t = t.to(h_condition.device) model_output = self.diff_model(trajectory, t.unsqueeze(0).repeat(B, ), local_cond=None, global_cond=h_condition) trajectory = scheduler.step(model_output, t, trajectory, generator=None).prev_sample.contiguous() if self.use_all_paths: all_trajectories.append(model_output.clone().detach().cpu().numpy()) output = { DataDict.prediction: trajectory, DataDict.all_trajectories: all_trajectories, } return output

 

gt를 출력해보면 다음과 같다. 

def plot_prediction(predictions, index=0, figsize=(12, 12)): data = predictions[index] plt.figure(figsize=figsize) if 'local_map' in data: local_map = data['local_map'][0] plt.imshow(local_map, cmap='gray', extent=[-30, 30, 30, -30], alpha=0.5) if 'path' in data: path = data['path'][0] plt.plot(path[:, 1].clip(-30, 30), path[:, 0].clip(-30, 30), 'g-', label='Ground Truth', linewidth=10) if 'prediction' in data: pred = data['prediction'][0] plt.plot(pred[:, 1].clip(-30, 30), pred[:, 0].clip(-30, 30), 'r--', label='Prediction', linewidth=2) plt.xlim(-30, 30) plt.ylim(-30, 30) plt.grid(True) plt.legend() plt.title(f'Trajectory Prediction (Sample {index})') plt.xlabel('X (m)') plt.ylabel('Y (m)') plt.show()

 

 

1 에폭으로 돌렸을 때 inference 결과는 이렇다. 

inference result 경로가 좀 이상하다.

모델 출력 scaling을 해 줘야 한다. 

import argparse import pickle import torch import numpy as np from tqdm import tqdm import os from src.utils.configs import TrainingConfig, GeneratorType, DataDict from src.models.model import get_model from src.utils.functions import to_device, get_device, release_cuda from src.data_loader.data_loader import evaluation_data_loader def get_args(): parser = argparse.ArgumentParser(description='Model Inference') parser.add_argument('--checkpoint', type=str, required=True, help='Path to model checkpoint') parser.add_argument('--data', type=str, required=True, help='Path to test data') parser.add_argument('--device', type=int, default=-1, help='GPU device ID (-1 for auto)') parser.add_argument('--batch_size', type=int, default=1, help='Batch size for inference') parser.add_argument('--output', type=str, default='predictions.pkl', help='Output file path') return parser.parse_args() def verify_data_format(data_dict): """Verify and print data format information""" print("\nInput data format:") for key, value in data_dict.items(): if isinstance(value, torch.Tensor): print(f"{key}: shape={value.shape}, type={value.dtype}, " f"range=[{value.min().item():.3f}, {value.max().item():.3f}]") class Inferencer: def __init__(self, cfg, checkpoint_path): self.cfg = cfg if cfg.gpus.device == "cuda": self.device = "cuda" else: self.device = get_device(device=cfg.gpus.device) self.generator_type = cfg.model.generator_type self.use_traversability = cfg.model.diffusion.use_traversability if self.generator_type == GeneratorType.diffusion else cfg.model.cvae.estimate_traversability self.train_poses = cfg.loss.train_poses self.scale_waypoints = 20.0 self.map_resolution = cfg.data.map_resolution if hasattr(cfg.data, 'map_resolution') else 1.0 self.map_range = cfg.data.map_range if hasattr(cfg.data, 'map_range') else 0.0 self.model = get_model(config=cfg.model, device=self.device) print(f'Loading checkpoint from "{checkpoint_path}"') state_dict = torch.load(checkpoint_path, map_location=torch.device(self.device)) if 'state_dict' in state_dict: model_dict = state_dict['state_dict'] else: model_dict = state_dict self.model.load_state_dict(model_dict, strict=False) self.model = self.model.to(self.device) self.model.eval() torch.set_grad_enabled(False) print(f"Model device: {next(self.model.parameters()).device}") print(f"Generator type: {self.generator_type}") def preprocess_data(self, data_dict): """Preprocess input data""" processed_dict = {} if DataDict.lidar in data_dict: lidar_data = data_dict[DataDict.lidar] if len(lidar_data.shape) == 3: lidar_data = lidar_data.unsqueeze(1) processed_dict[DataDict.lidar] = lidar_data if DataDict.vel in data_dict: vel_data = data_dict[DataDict.vel] if len(vel_data.shape) == 2: vel_data = vel_data.unsqueeze(1) processed_dict[DataDict.vel] = vel_data if DataDict.target in data_dict: processed_dict[DataDict.target] = data_dict[DataDict.target] for key in [DataDict.path, DataDict.local_map, DataDict.heuristic]: if key in data_dict: processed_dict[key] = data_dict[key] return processed_dict def postprocess_output(self, output_dict): processed_dict = {} if DataDict.path in output_dict: path = output_dict[DataDict.path] processed_dict[DataDict.path] = path path_min = path.min().item() path_max = path.max().item() path_range = path_max - path_min path_mean = path.mean().item() else: path_range = 20.0 path_mean = 0.0 if DataDict.prediction in output_dict: pred = output_dict[DataDict.prediction] path = output_dict[DataDict.path] if self.train_poses: path_diff = path[:, 1:] - path[:, :-1] path_dist = torch.sqrt((path_diff ** 2).sum(dim=-1)).sum() pred_diff = pred[:, 1:] - pred[:, :-1] pred_dist = torch.sqrt((pred_diff ** 2).sum(dim=-1)).sum() pred = pred * (path_dist / (pred_dist + 1e-6)) pred = pred + (path[:, 0:1, :] - pred[:, 0:1, :]) else: pred = torch.cumsum(pred, dim=1) path_diff = path[:, 1:] - path[:, :-1] path_dist = torch.sqrt((path_diff ** 2).sum(dim=-1)).sum() pred_diff = pred[:, 1:] - pred[:, :-1] pred_dist = torch.sqrt((pred_diff ** 2).sum(dim=-1)).sum() pred = pred * (path_dist / (pred_dist + 1e-6)) pred = pred + (path[:, 0:1, :] - pred[:, 0:1, :]) processed_dict[DataDict.prediction] = pred if DataDict.local_map in output_dict: processed_dict[DataDict.local_map] = output_dict[DataDict.local_map] if DataDict.heuristic in output_dict: processed_dict[DataDict.heuristic] = output_dict[DataDict.heuristic] return processed_dict def inference_step(self, data_dict): data_dict = self.preprocess_data(data_dict) data_dict = to_device(data_dict, device=self.device) with torch.no_grad(): output_dict = self.model(data_dict, sample=True) output_dict = self.postprocess_output(output_dict) return release_cuda(output_dict) def run_inference(self, data_loader): predictions = [] first_batch = True for data_dict in tqdm(data_loader, desc="Running inference"): output_dict = self.inference_step(data_dict) predictions.append(output_dict) if first_batch: print("\nFirst batch output ranges:") for key, value in output_dict.items(): if isinstance(value, (torch.Tensor, np.ndarray)): if isinstance(value, torch.Tensor): value = value.cpu().numpy() print(f"{key}: shape={value.shape}, " f"range=[{np.min(value):.3f}, {np.max(value):.3f}], " f"mean={np.mean(value):.3f}") first_batch = False return predictions def main(): args = get_args() cfg = TrainingConfig cfg.data.root = args.data cfg.data.batch_size = args.batch_size cfg.data.shuffle = False cfg.data.distributed = False if args.device >= 0: cfg.gpus.device = f"cuda:{args.device}" elif args.device == -1: cfg.gpus.device = "cuda" if torch.cuda.is_available() else "cpu" else: cfg.gpus.device = "cpu" print(f"Using device: {cfg.gpus.device}") data_loader = evaluation_data_loader(cfg=cfg.data) for batch in data_loader: verify_data_format(batch) break inferencer = Inferencer(cfg, args.checkpoint) predictions = inferencer.run_inference(data_loader) print(f"Saving predictions to {args.output}") with open(args.output, 'wb') as f: pickle.dump(predictions, f) if __name__ == "__main__": main()

 

output_dict.path

tensor([[[ 0.9600, -0.4000], [ 1.9300, -0.8100], [ 2.9000, -1.2200], [ 3.3600, -2.0400], [ 3.6400, -3.0100], [ 3.9200, -3.9800], [ 4.1900, -4.9500], [ 4.4700, -5.9200], [ 4.7500, -6.8900], [ 5.0300, -7.8600], [ 5.3000, -8.8300], [ 5.5800, -9.7900], [ 5.8500, -10.7500], [ 5.9800, -11.7100], [ 6.1100, -12.6700], [ 6.2400, -13.6300]]], device='cuda:1')

 

output_dict.prediction

tensor([[[ 0.1718, -0.0056], [ 0.1897, -0.0169], [ 0.1974, -0.0242], [ 0.2010, -0.0291], [ 0.2027, -0.0323], [ 0.2035, -0.0344], [ 0.2038, -0.0357], [ 0.2039, -0.0366], [ 0.2039, -0.0372], [ 0.2038, -0.0376], [ 0.2037, -0.0379], [ 0.2035, -0.0381], [ 0.2034, -0.0383], [ 0.2033, -0.0384], [ 0.2032, -0.0385], [ 0.2032, -0.0386]]], device='cuda:1')

 

 

 

 

 

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.