논문 리뷰
DTG : inference
- -
Results
After 1 epoch
After 5 epoch
우선 데이터셋으로 제공해주는 pkl 파일 안에 뭐가 있는지 확인해보자.
Train Dataset
모델 학습을 위한 raw 센서 데이터
- pose: 로봇의 현재 위치/자세
- vel: 속도 데이터 (50개 시퀀스)
- imu: IMU 센서 데이터 (20개 시퀀스)
- camera: 카메라 이미지
- lidar: 3D LiDAR 데이터
- lidar2d: 2D LiDAR 스캔 데이터
- targets: 목표 위치들 (100개)
- trajectories: gt trajectory
- local_map: 로컬 맵 데이터
Data type: <class 'dict'>
Dictionary keys:
pose: type=<class 'numpy.ndarray'>, shape=(3, 4)
First element type: <class 'numpy.ndarray'>
First element shape: (4,)
vel: type=<class 'list'>, shape=(50, 2)
First element type: <class 'list'>
First element shape: (2,)
imu: type=<class 'list'>, shape=(20, 6)
First element type: <class 'list'>
First element shape: (6,)
time: type=<class 'float'>, value=1681656095.9803696
camera: type=<class 'list'>, shape=(1, 720, 1280, 3)
First element type: <class 'numpy.ndarray'>
First element shape: (720, 1280, 3)
lidar: type=<class 'list'>, shape=List of length 3
First element type: <class 'numpy.ndarray'>
First element shape: (25532, 6)
lidar2d: type=<class 'list'>, shape=(3, 16, 1824)
First element type: <class 'numpy.ndarray'>
First element shape: (16, 1824)
targets: type=<class 'list'>, shape=List of length 100
First element type: <class 'list'>
First element shape: List of length 2
trajectories: type=<class 'list'>, shape=(100, 16, 2)
First element type: <class 'list'>
First element shape: (16, 2)
local_map: type=<class 'numpy.ndarray'>, shape=(600, 600)
First element type: <class 'numpy.ndarray'>
First element shape: (600,)
/data.pkl
Data type: <class 'dict'>
Dictionary keys:
file_names: type=<class 'list'>, shape=(1034, 2)
First element type: <class 'list'>
First element shape: (2,)
all_positions: type=<class 'numpy.ndarray'>, shape=(2076, 3)
First element type: <class 'numpy.ndarray'>
First element shape: (3,)
network: type=<class 'networkx.classes.graph.Graph'>, value=Graph with 2076 nodes and 18625 edge
내부 데이터 시각화 코드
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
def visualize_sample_data(data):
fig = plt.figure(figsize=(20, 12))
ax1 = fig.add_subplot(231)
vel_data = np.array(data['vel'])
ax1.plot(vel_data[:, 0], label='X velocity')
ax1.plot(vel_data[:, 1], label='Y velocity')
ax1.set_title('Velocity Data')
ax1.set_xlabel('Time step')
ax1.set_ylabel('Velocity')
ax1.legend()
ax2 = fig.add_subplot(232)
camera_img = data['camera'][0]
ax2.imshow(camera_img)
ax2.set_title('Camera Image')
ax2.axis('off')
ax3 = fig.add_subplot(233, projection='3d')
lidar_points = data['lidar'][0]
ax3.scatter(lidar_points[:, 0],
lidar_points[:, 1],
lidar_points[:, 2],
c=lidar_points[:, 3],
s=1)
ax3.set_title('3D LiDAR Points')
ax3.set_xlabel('X')
ax3.set_ylabel('Y')
ax3.set_zlabel('Z')
ax4 = fig.add_subplot(234)
lidar2d = data['lidar2d'][0]
ax4.imshow(lidar2d, aspect='auto')
ax4.set_title('2D LiDAR Scan')
ax4.set_xlabel('Angle')
ax4.set_ylabel('Channel')
ax5 = fig.add_subplot(235)
trajectories = np.array(data['trajectories'])
for traj in trajectories:
ax5.plot(traj[:, 0], traj[:, 1], 'b-', alpha=0.3)
ax5.set_title('Ground Truth Trajectories')
ax5.set_xlabel('X')
ax5.set_ylabel('Y')
ax6 = fig.add_subplot(236)
ax6.imshow(data['local_map'], cmap='gray')
ax6.set_title('Local Map')
ax6.set_xlabel('X')
ax6.set_ylabel('Y')
plt.tight_layout()
plt.show()
import pickle
with open('your_pickle_file.pkl', 'rb') as f:
data = pickle.load(f)
visualize_sample_data(data)
원래 코드에서 모델 출력을 담당하는 부분은 이곳이다.
class HNav(nn.Module):
def sample(self, input_dict):
output = {}
if DataDict.path in input_dict.keys():
output.update({DataDict.path: input_dict[DataDict.path]})
if DataDict.heuristic in input_dict.keys():
output.update({DataDict.heuristic: input_dict[DataDict.heuristic]})
if DataDict.local_map in input_dict.keys():
output.update({DataDict.local_map: input_dict[DataDict.local_map]})
observation = self.perception(lidar=input_dict[DataDict.lidar], vel=input_dict[DataDict.vel],
target=input_dict[DataDict.target])
generator_output = self.generator.sample(observation=observation)
output.update(generator_output)
return output
# /models/difffusion.py
@torch.no_grad()
def sample(self, observation):
h = self.encoder(observation) # B x 512
h_condition = self.trajectory_condition(h)
B, C = h_condition.shape
trajectory = torch.randn(size=(h_condition.shape[0], self.waypoints_num, self.waypoint_dim),
dtype=h_condition.dtype, device=h_condition.device, generator=None)
all_trajectories = []
scheduler = self.noise_scheduler
scheduler.set_timesteps(self.time_steps)
for t in scheduler.timesteps:
if (self.sample_times >= 0) and (t < self.time_steps - 1 - self.sample_times):
break
t = t.to(h_condition.device)
model_output = self.diff_model(trajectory, t.unsqueeze(0).repeat(B, ), local_cond=None,
global_cond=h_condition)
trajectory = scheduler.step(model_output, t, trajectory, generator=None).prev_sample.contiguous()
if self.use_all_paths:
all_trajectories.append(model_output.clone().detach().cpu().numpy())
output = {
DataDict.prediction: trajectory,
DataDict.all_trajectories: all_trajectories,
}
return output
gt를 출력해보면 다음과 같다.
def plot_prediction(predictions, index=0, figsize=(12, 12)):
data = predictions[index]
plt.figure(figsize=figsize)
if 'local_map' in data:
local_map = data['local_map'][0]
plt.imshow(local_map, cmap='gray', extent=[-30, 30, 30, -30], alpha=0.5)
if 'path' in data:
path = data['path'][0]
plt.plot(path[:, 1].clip(-30, 30), path[:, 0].clip(-30, 30),
'g-', label='Ground Truth', linewidth=10)
if 'prediction' in data:
pred = data['prediction'][0]
plt.plot(pred[:, 1].clip(-30, 30), pred[:, 0].clip(-30, 30),
'r--', label='Prediction', linewidth=2)
plt.xlim(-30, 30)
plt.ylim(-30, 30)
plt.grid(True)
plt.legend()
plt.title(f'Trajectory Prediction (Sample {index})')
plt.xlabel('X (m)')
plt.ylabel('Y (m)')
plt.show()
1 에폭으로 돌렸을 때 inference 결과는 이렇다.
Inference code
모델 출력 scaling을 해 줘야 한다.
import argparse
import pickle
import torch
import numpy as np
from tqdm import tqdm
import os
from src.utils.configs import TrainingConfig, GeneratorType, DataDict
from src.models.model import get_model
from src.utils.functions import to_device, get_device, release_cuda
from src.data_loader.data_loader import evaluation_data_loader
def get_args():
parser = argparse.ArgumentParser(description='Model Inference')
parser.add_argument('--checkpoint', type=str, required=True, help='Path to model checkpoint')
parser.add_argument('--data', type=str, required=True, help='Path to test data')
parser.add_argument('--device', type=int, default=-1, help='GPU device ID (-1 for auto)')
parser.add_argument('--batch_size', type=int, default=1, help='Batch size for inference')
parser.add_argument('--output', type=str, default='predictions.pkl', help='Output file path')
return parser.parse_args()
def verify_data_format(data_dict):
"""Verify and print data format information"""
print("\nInput data format:")
for key, value in data_dict.items():
if isinstance(value, torch.Tensor):
print(f"{key}: shape={value.shape}, type={value.dtype}, "
f"range=[{value.min().item():.3f}, {value.max().item():.3f}]")
class Inferencer:
def __init__(self, cfg, checkpoint_path):
self.cfg = cfg
if cfg.gpus.device == "cuda":
self.device = "cuda"
else:
self.device = get_device(device=cfg.gpus.device)
self.generator_type = cfg.model.generator_type
self.use_traversability = cfg.model.diffusion.use_traversability if self.generator_type == GeneratorType.diffusion else cfg.model.cvae.estimate_traversability
self.train_poses = cfg.loss.train_poses
self.scale_waypoints = 20.0
self.map_resolution = cfg.data.map_resolution if hasattr(cfg.data, 'map_resolution') else 1.0
self.map_range = cfg.data.map_range if hasattr(cfg.data, 'map_range') else 0.0
self.model = get_model(config=cfg.model, device=self.device)
print(f'Loading checkpoint from "{checkpoint_path}"')
state_dict = torch.load(checkpoint_path, map_location=torch.device(self.device))
if 'state_dict' in state_dict:
model_dict = state_dict['state_dict']
else:
model_dict = state_dict
self.model.load_state_dict(model_dict, strict=False)
self.model = self.model.to(self.device)
self.model.eval()
torch.set_grad_enabled(False)
print(f"Model device: {next(self.model.parameters()).device}")
print(f"Generator type: {self.generator_type}")
def preprocess_data(self, data_dict):
"""Preprocess input data"""
processed_dict = {}
if DataDict.lidar in data_dict:
lidar_data = data_dict[DataDict.lidar]
if len(lidar_data.shape) == 3:
lidar_data = lidar_data.unsqueeze(1)
processed_dict[DataDict.lidar] = lidar_data
if DataDict.vel in data_dict:
vel_data = data_dict[DataDict.vel]
if len(vel_data.shape) == 2:
vel_data = vel_data.unsqueeze(1)
processed_dict[DataDict.vel] = vel_data
if DataDict.target in data_dict:
processed_dict[DataDict.target] = data_dict[DataDict.target]
for key in [DataDict.path, DataDict.local_map, DataDict.heuristic]:
if key in data_dict:
processed_dict[key] = data_dict[key]
return processed_dict
def postprocess_output(self, output_dict):
processed_dict = {}
if DataDict.path in output_dict:
path = output_dict[DataDict.path]
processed_dict[DataDict.path] = path
path_min = path.min().item()
path_max = path.max().item()
path_range = path_max - path_min
path_mean = path.mean().item()
else:
path_range = 20.0
path_mean = 0.0
if DataDict.prediction in output_dict:
pred = output_dict[DataDict.prediction]
path = output_dict[DataDict.path]
if self.train_poses:
path_diff = path[:, 1:] - path[:, :-1]
path_dist = torch.sqrt((path_diff ** 2).sum(dim=-1)).sum()
pred_diff = pred[:, 1:] - pred[:, :-1]
pred_dist = torch.sqrt((pred_diff ** 2).sum(dim=-1)).sum()
pred = pred * (path_dist / (pred_dist + 1e-6))
pred = pred + (path[:, 0:1, :] - pred[:, 0:1, :])
else:
pred = torch.cumsum(pred, dim=1)
path_diff = path[:, 1:] - path[:, :-1]
path_dist = torch.sqrt((path_diff ** 2).sum(dim=-1)).sum()
pred_diff = pred[:, 1:] - pred[:, :-1]
pred_dist = torch.sqrt((pred_diff ** 2).sum(dim=-1)).sum()
pred = pred * (path_dist / (pred_dist + 1e-6))
pred = pred + (path[:, 0:1, :] - pred[:, 0:1, :])
processed_dict[DataDict.prediction] = pred
if DataDict.local_map in output_dict:
processed_dict[DataDict.local_map] = output_dict[DataDict.local_map]
if DataDict.heuristic in output_dict:
processed_dict[DataDict.heuristic] = output_dict[DataDict.heuristic]
return processed_dict
def inference_step(self, data_dict):
data_dict = self.preprocess_data(data_dict)
data_dict = to_device(data_dict, device=self.device)
with torch.no_grad():
output_dict = self.model(data_dict, sample=True)
output_dict = self.postprocess_output(output_dict)
return release_cuda(output_dict)
def run_inference(self, data_loader):
predictions = []
first_batch = True
for data_dict in tqdm(data_loader, desc="Running inference"):
output_dict = self.inference_step(data_dict)
predictions.append(output_dict)
if first_batch:
print("\nFirst batch output ranges:")
for key, value in output_dict.items():
if isinstance(value, (torch.Tensor, np.ndarray)):
if isinstance(value, torch.Tensor):
value = value.cpu().numpy()
print(f"{key}: shape={value.shape}, "
f"range=[{np.min(value):.3f}, {np.max(value):.3f}], "
f"mean={np.mean(value):.3f}")
first_batch = False
return predictions
def main():
args = get_args()
cfg = TrainingConfig
cfg.data.root = args.data
cfg.data.batch_size = args.batch_size
cfg.data.shuffle = False
cfg.data.distributed = False
if args.device >= 0:
cfg.gpus.device = f"cuda:{args.device}"
elif args.device == -1:
cfg.gpus.device = "cuda" if torch.cuda.is_available() else "cpu"
else:
cfg.gpus.device = "cpu"
print(f"Using device: {cfg.gpus.device}")
data_loader = evaluation_data_loader(cfg=cfg.data)
for batch in data_loader:
verify_data_format(batch)
break
inferencer = Inferencer(cfg, args.checkpoint)
predictions = inferencer.run_inference(data_loader)
print(f"Saving predictions to {args.output}")
with open(args.output, 'wb') as f:
pickle.dump(predictions, f)
if __name__ == "__main__":
main()
output_dict.path
tensor([[[ 0.9600, -0.4000],
[ 1.9300, -0.8100],
[ 2.9000, -1.2200],
[ 3.3600, -2.0400],
[ 3.6400, -3.0100],
[ 3.9200, -3.9800],
[ 4.1900, -4.9500],
[ 4.4700, -5.9200],
[ 4.7500, -6.8900],
[ 5.0300, -7.8600],
[ 5.3000, -8.8300],
[ 5.5800, -9.7900],
[ 5.8500, -10.7500],
[ 5.9800, -11.7100],
[ 6.1100, -12.6700],
[ 6.2400, -13.6300]]], device='cuda:1')
output_dict.prediction
tensor([[[ 0.1718, -0.0056],
[ 0.1897, -0.0169],
[ 0.1974, -0.0242],
[ 0.2010, -0.0291],
[ 0.2027, -0.0323],
[ 0.2035, -0.0344],
[ 0.2038, -0.0357],
[ 0.2039, -0.0366],
[ 0.2039, -0.0372],
[ 0.2038, -0.0376],
[ 0.2037, -0.0379],
[ 0.2035, -0.0381],
[ 0.2034, -0.0383],
[ 0.2033, -0.0384],
[ 0.2032, -0.0385],
[ 0.2032, -0.0386]]], device='cuda:1')
'논문 리뷰' 카테고리의 다른 글
SR-LIO: LiDAR-Inertial Odometry with Sweep Reconstruction (0) | 2025.01.18 |
---|---|
DTG : Diffusion-based Trajectory Generation for Mapless Global Navigation (0) | 2024.12.18 |
DTG : code explained (1) | 2024.12.05 |
T-PRM: Temporal Probabilistic Roadmap for Path Planning in Dynamic Environments (1) | 2024.11.27 |
Point-BERT: Pre-training 3D Point Cloud Transformers with Masked Point Modeling (0) | 2024.11.12 |
Contents
소중한 공감 감사합니다