내가 만든 데이터셋 잘 사용하기 : Diffusion Policy의 데이터 전처리 방법들
- -
Imitation Learning(모방 학습)을 사용할 때는 내가 만든 데이터셋을 사용해야 하는 경우가 많이 있다. 특히 로봇 imitation learning의 경우에는 문제 상황 모델링부터 데이터셋 구조, 손실 함수 및 평가 메트릭까지 다 정해야 하는 경우가 종종 있다. 이상한 연구를 할 수록 더 그런 것 같다.
만약 내가 시맨틱 세그멘테이션 성능을 높이는 연구를 한다면 잘 마련된 경기장에서 빨리 뛰기 위해 체력을 기르고 트랙을 꼼꼼히 살피는 방향으로 진행하는 것이 보통이다. (모두가 사용하는 데이터셋에서 범용적인 평가 메트릭을 사용해 좋은 성능 내기 - 개인적으로는 이게 훨씬 어려워 보여욧... )
반면 응용단에서 뭔가 새로운 시도를 하는 로보틱스 연구를 한다면 미개척지에 삽을 들고 냅다 뛰어드는 것과 비슷한 것 같다.
딱하게도 내가 만든 데이터셋은 그 신뢰도부터 의심스럽다. 모두가 사용하는 유명 데이터셋을 사용할 때는
내 모델이 좋지 않아도 말이 되는 결과가 나올 가능성이 있지만,
내가 만든 데이터셋으로는 학습을 돌린 후 결과를 살펴보면 뭔가 대단히 잘못되었는데 대체 어디서부터 손을 봐야 할 지 막막한 경우가 많다.
최근에 나도 내가 (함께) 만든 데이터셋과 뇌피셜의 이상한 구조를 Navigation 태스크에 적용해 본 적이 있는데,
비록 모든 단계를 잘 검증하면서 하려고 노력했지만 초반에는 이런 악마 소환진같은 경로가 나와서 다소 당황했다.
원하는 결과가 나오지 않는 데에는 다양한 원인이 있겠지만 자체제작 데이터셋의 경우에는 전처리만 잘 해주어도 저정도로 trash같은 결과는 좀 방지할 수 있는 것 같다.
https://diffusion-policy.cs.columbia.edu/
Diffusion Policy: Visuomotor Policy Learning via Action Diffusion
This paper introduces Diffusion Policy, a new way of generating robot behavior by representing a robot's visuomotor policy as a conditional denoising diffusion process. We benchmark Diffusion Policy across 12 different tasks from 4 different robot manipula
diffusion-policy.cs.columbia.edu
그런 의미에서 로보틱스 diffusion 기반 waypoint 생성의 신세기 바이블인 diffusion policy의 데이터 전처리 방법들을 살펴 보았다.
Diffusion Policy 코드 구조
diffusion_policy/
├── codecs/
│ └── imagecodecs_numcodecs.py
│
├── common/
│ ├── checkpoint_util.py
│ ├── cv2_util.py
│ ├── env_util.py
│ ├── json_logger.py
│ ├── nested_dict_util.py
│ ├── normalize_util.py
│ ├── pose_trajectory_interpolator.py
│ ├── precise_sleep.py
│ ├── pymunk_override.py
│ ├── pymunk_util.py
│ ├── pytorch_util.py
│ ├── replay_buffer.py
│ ├── robomimic_config_util.py
│ ├── robomimic_util.py
│ ├── sampler.py
│ └── timestamp_accumulator.py
│
├── config/
│ ├── task/
│ │ ├── blockpush_lowdim_seed.yaml
│ │ ├── blockpush_lowdim_seed_abs.yaml
│ │ ├── can_image.yaml
│ │ ├── can_image_abs.yaml
│ │ ├── can_lowdim.yaml
│ │ ├── kitchen_lowdim.yaml
│ │ ├── pusht_image.yaml
│ │ ├── pusht_lowdim.yaml
│ │ ├── real_pusht_image.yaml
│ │ └── ... (기타 태스크 설정)
│ ├── train_bet_lowdim_workspace.yaml
│ └── train_diffusion_transformer_hybrid_workspace.yaml
│
├── dataset/
│ ├── base_dataset.py
│ ├── blockpush_lowdim_dataset.py
│ ├── kitchen_lowdim_dataset.py
│ ├── kitchen_mjl_lowdim_dataset.py
│ ├── mujoco_image_dataset.py
│ ├── pusht_dataset.py
│ ├── pusht_image_dataset.py
│ ├── real_pusht_image_dataset.py
│ ├── robomimic_replay_image_dataset.py
│ └── robomimic_replay_lowdim_dataset.py
│
├── env/
│ ├── block_pushing/
│ ├── kitchen/
│ ├── pusht/
│ └── ... (기타 환경)
│
├── env_runner/
│ ├── base_env_runner.py
│ ├── block_pushing_runner.py
│ ├── kitchen_runner.py
│ └── pusht_runner.py
│
├── model/
│ ├── common/
│ │ ├── normalizer.py
│ │ └── ... (공통 모델 컴포넌트)
│ ├── diffusion/
│ │ └── ... (diffusion 모델)
│ └── policy/
│ └── ... (정책 모델)
│
├── policy/
│ ├── base_policy.py
│ ├── bet_policy.py
│ ├── diffusion_policy.py
│ └── ... (기타 정책)
│
├── real_world/
│ ├── real_env.py
│ ├── real_robot.py
│ └── ... (실제 로봇 관련)
│
├── scripts/
│ ├── eval_policy.py
│ ├── train_policy.py
│ └── ... (실행 스크립트)
│
└── workspace/
├── base_workspace.py
├── train_bet_lowdim_workspace.py
└── ... (학습 작업공간)
데이터로더들은 주로 /model/common/normalizer.py의 LinearNormalizer, SingleFieldLinearNormalizer 클래스를 사용하고 있다.
정규화 : LinearNormalizer 클래스
LinearNormalizer는 두 가지 정규화 모드를 제공하는데,
- limits: 데이터를 지정된 범위(default: -1, 1)로 스케일링
- gaussian: 가우시안 정규화 (평균 0, 표준편차 1)
모드를 제공한다.
- fit() : 데이터의 통계치를 계산하여 정규화 파라미터를 학습
- normalize() / unnormalize()
class LinearNormalizer(DictOfTensorMixin):
avaliable_modes = ['limits', 'gaussian']
@torch.no_grad()
def fit(self,
data: Union[Dict, torch.Tensor, np.ndarray, zarr.Array],
last_n_dims=1,
dtype=torch.float32,
mode='limits',
output_max=1.,
output_min=-1.,
range_eps=1e-4,
fit_offset=True):
if isinstance(data, dict):
for key, value in data.items():
self.params_dict[key] = _fit(value,
last_n_dims=last_n_dims,
dtype=dtype,
mode=mode,
output_max=output_max,
output_min=output_min,
range_eps=range_eps,
fit_offset=fit_offset)
else:
self.params_dict['_default'] = _fit(data,
last_n_dims=last_n_dims,
dtype=dtype,
mode=mode,
output_max=output_max,
output_min=output_min,
range_eps=range_eps,
fit_offset=fit_offset)
def __call__(self, x: Union[Dict, torch.Tensor, np.ndarray]) -> torch.Tensor:
return self.normalize(x)
def __getitem__(self, key: str):
return SingleFieldLinearNormalizer(self.params_dict[key])
def __setitem__(self, key: str , value: 'SingleFieldLinearNormalizer'):
self.params_dict[key] = value.params_dict
def _normalize_impl(self, x, forward=True):
if isinstance(x, dict):
result = dict()
for key, value in x.items():
params = self.params_dict[key]
result[key] = _normalize(value, params, forward=forward)
return result
else:
if '_default' not in self.params_dict:
raise RuntimeError("Not initialized")
params = self.params_dict['_default']
return _normalize(x, params, forward=forward)
def normalize(self, x: Union[Dict, torch.Tensor, np.ndarray]) -> torch.Tensor:
return self._normalize_impl(x, forward=True)
def unnormalize(self, x: Union[Dict, torch.Tensor, np.ndarray]) -> torch.Tensor:
return self._normalize_impl(x, forward=False)
def get_input_stats(self) -> Dict:
if len(self.params_dict) == 0:
raise RuntimeError("Not initialized")
if len(self.params_dict) == 1 and '_default' in self.params_dict:
return self.params_dict['_default']['input_stats']
result = dict()
for key, value in self.params_dict.items():
if key != '_default':
result[key] = value['input_stats']
return result
def get_output_stats(self, key='_default'):
input_stats = self.get_input_stats()
if 'min' in input_stats:
# no dict
return dict_apply(input_stats, self.normalize)
result = dict()
for key, group in input_stats.items():
this_dict = dict()
for name, value in group.items():
this_dict[name] = self.normalize({key:value})[key]
result[key] = this_dict
return result
그래서 각각의 데이터셋 클래스들은 get_normalizer() 메소드로 데이터를 정규화하게 된다. 예를 들어 real_pusht_image_dataset.py에서는
def get_normalizer(self, **kwargs) -> LinearNormalizer:
normalizer = LinearNormalizer()
# action 데이터 정규화
normalizer['action'] = SingleFieldLinearNormalizer.create_fit(
data=self.action_array)
# 다른 observation 데이터 정규화
for key in ['robot_obs']:
normalizer[key] = SingleFieldLinearNormalizer.create_fit(
data=self.obs_dict[key])
return normalizer
여기서 fit()함수는 데이터를 기반으로 정규화 파라미터를 계산하는데, limits 모드에서는 min/max를 사용하고 gaussian 모드의 경우 데이터의 평균과 표준편차를 계산해서 스케일과 offset을 계산한다.
여기서 그러면 어떤 데이터는 limits 모드를 사용하고, 어떤 데이터는 gaussian 모드를 사용할까?
- limits
- 명확한 물리적 한계나 경계가 있는 데이터
- ex. 로봇 관절 각도 (-180 ~ 180)
- 위치 좌표 (맵의 경계 내 좌표)
- 데이터가 비교적 균일하게 분포하는 경우
- 이상치(outlier)가 의미있는 값인 경우
- ex) 내가 다루는 waypoint 좌표 데이터
- 명확한 물리적 한계나 경계가 있는 데이터
- gaussian
- 자연적으로 정규분포를 따르는 데이터
- 센서 측정값
- 노이즈가 포함된 관측 데이터
- 이상치에 민감하지 않아야 하는 경우
- 절대적인 값보다 상대적이 변화가 중요한 경우
- 중심 경향성이 중요한 데이터
- ex) LiDAR나 카메라 센서 데이터
- 자연적으로 정규분포를 따르는 데이터
그래서 diffusion policy의 경우 모든 경우에 limits로 정규화하도록 코드가 되어 있기는 하지만 옵션으로 가우시안을 제공하고 있다.
Padding
내가 만든 데이터셋🍪은 안타깝게도 텐서의 크기가 다 다른 경우가 많다. 배치 학습을 하기 위한 1번째 단계는 텐서 크기를 야물딱지게 잘 맞춰 주는 것이다. 가장 접근하기 쉬운 방법은 1) 크기 맞춰주기 모델을 앞단에 하나 붙여서 찐모델에는 feature를 넣거나 2) 프로크루스테스의 침대마냥
긴 애들은 싹둑 자르고 짧은 애들은 패딩을 해 주는 방법이 있는데 이것이 최선일지는 좀 생각을 해볼 필요가 있다.
/common/replay_buffer.py에는 ReplayBuffer 클래스가 있는데, 데이터 구조는 다음과 같다.
데이터 구조
class ReplayBuffer:
def __init__(self, root):
# root는 두 개의 주요 그룹을 가짐
assert('data' in root) # 실제 시계열 데이터
assert('meta' in root) # 메타데이터
assert('episode_ends' in root['meta']) # 각 에피소드의 끝 지점
데이터 저장 방식
{
'data': {
'joint_positions': array(...), # [total_steps, n_joints]
'images': array(...), # [total_steps, H, W, C]
'actions': array(...) # [total_steps, action_dim]
},
'meta': {
'episode_ends': array([100, 250, 400]) # 각 에피소드의 끝 인덱스
}
}
주요 기능
# 새로운 에피소드 추가
buffer.add_episode({
'joint_positions': episode_joints,
'images': episode_images,
'actions': episode_actions
})
# 특정 에피소드 가져오기
episode = buffer.get_episode(idx)
# 특정 구간 가져오기
slice_data = buffer.get_steps_slice(start, end)
그래서 ReplayBuffer는 마치 여러 개의 서랍이 있는 수납장처럼, 각 서랍(data의 각 key)에는 특정 종류의 데이터가 시간 순서대로 저장되고, 서랍장 옆에 붙어있는 메모(metadata)에는 각 에피소드가 어디서 끝나는지 기록되어 있다.
그래서 실제 학습에 사용될 때에는 ReplayBuffer는 데이터를 저장하고 관리하고, SequenceSampler가 이 데이터에서 학습에 필요한 시퀀스를 추출하고, DataLoader가 이 시퀀스들로 배치를 구성하게 된다.
SequenceSampler
create_indices 함수는 다음과 같이 되어 있다.
def create_indices(episode_ends, sequence_length, episode_mask, pad_before=0, pad_after=0):
# 각 에피소드에 대해
for i in range(len(episode_ends)):
start_idx = episode_ends[i-1] if i > 0 else 0
end_idx = episode_ends[i]
episode_length = end_idx - start_idx
# 가능한 시작 인덱스 범위 계산
min_start = -pad_before # 패딩 허용
max_start = episode_length - sequence_length + pad_after
# 가능한 모든 시퀀스 위치에 대한 인덱스 생성
for idx in range(min_start, max_start+1):
# 실제 버퍼에서의 시작/끝 위치
buffer_start_idx = max(idx, 0) + start_idx
buffer_end_idx = min(idx+sequence_length, episode_length) + start_idx
그래서 여기는 시퀀스의 시작이나 끝에서 부족한 부분은 첫/마지막 프레임으로 채운다.
if (sample_start_idx > 0) or (sample_end_idx < self.sequence_length):
# 패딩이 필요한 경우
if sample_start_idx > 0:
data[:sample_start_idx] = sample[0] # 첫 프레임으로 앞쪽 패딩
if sample_end_idx < self.sequence_length:
data[sample_end_idx:] = sample[-1] # 마지막 프레임으로 뒤쪽 패딩
나는 처음에는 그냥 짧은 시퀀스의 데이터들을 그냥 0으로 패딩을 때렸는데, 그랬더니 이와 같이 경로가 크게 손상되었다.
좀 더 똑똑하게 패딩을 해서 내가 만든 딱한 데이터셋을 이해해야 하는 딥러닝 모델들을 배려해 주도록 하자.
Starting Point, Goal Point를 임베딩하는 방식
로봇 제어나 작업에서 저차원 상태 공간을 다룰 때(이미지가 없는 경우) 사용되는 Lowdim policy의 경우에는, starting_point와 goal_point는 obs_dim으로 local condition 혹은 global condition으로 사용된다.
class DiffusionUnetLowdimPolicy(BaseLowdimPolicy):
def __init__(self,
model: ConditionalUnet1D,
noise_scheduler: DDPMScheduler,
horizon,
obs_dim,
action_dim,
n_action_steps,
n_obs_steps,
num_inference_steps=None,
obs_as_local_cond=False,
obs_as_global_cond=False,
pred_action_steps_only=False,
oa_step_convention=False,
# parameters passed to step
**kwargs):
# kitchen_multitask_v0.py
def _get_obs(self):
t, qp, qv, obj_qp, obj_qv = self.robot.get_obs(
self, robot_noise_ratio=self.robot_noise_ratio)
self.obs_dict = {}
self.obs_dict['t'] = t
self.obs_dict['qp'] = qp # robot joint positions
self.obs_dict['qv'] = qv # robot joint velocities
self.obs_dict['obj_qp'] = obj_qp # object positions
self.obs_dict['obj_qv'] = obj_qv # object velocities
self.obs_dict['goal'] = self.goal
if self.goal_concat:
return np.concatenate([
self.obs_dict['qp'],
self.obs_dict['obj_qp'],
self.obs_dict['goal']
])
# kitchen_mjl_lowdim_dataset.py
obs = np.concatenate([
qpos[:,:9], # robot positions
qpos[:,-21:], # object positions
np.zeros((len(qpos),30),dtype=np.float32) # goal
], axis=-1)
따라서 robot position, object position과 같이 concat되어서 들어가고 있다.
근데 예를 들어 ViPlanner(https://github.com/leggedrobotics/viplanner)에서는
def forward(self, x, goal):
# goal point를 linear layer를 통해 임베딩
goal = self.fg(goal[:, 0:3]) # fg는 nn.Linear(3, goal_channels)
# 임베딩된 goal을 feature map 크기로 확장
goal = goal[:, :, None, None].expand(-1, -1, x.shape[2], x.shape[3])
# 기존 feature와 goal 임베딩을 채널 차원에서 연결
x = torch.cat((x, goal), dim=1)
이와 같이 벡터에서 concat하는 식이 아니라 채널 차원에서 연결을 해 주고 있다.
이런 방식들 (1. vector concat 혹은 2. Feature Map에서 Channel-wise concat)의 기능적인 장단점을 생각해보면,
장점 |
단점 | |
vector concat | 1. 불필요한 Spatial expansion이 없다 2. Transformer 구조와의 호환성 |
1. 공간적 구조 손실 2. spatial location 별 상호작용이 제한적 |
channel-wise concat | 1. 공간 정보 유지 2. CNN 구조와 호환성 3. local feature와 goal 정보의 상호 작용 |
1. 불필요한 중복 (동일한 goal 정보가 모든 spatial location에 복사됨) 2. 메모리 사용량 증가 |
이렇게 꼽아볼 수 있을 것 같다.
따라서 예를 들어 로봇 제어처럼 low-dimensional state space를 다룬다면 vector concat 방식이 적합할 것 같고, 이미지 기반의 spatial 정보가 중요한 태스크에서는 channel-wise concat이 더 적합할 것 같다. 실제로도 Diffusion policy에서는 1)을, 이미지 기반의 ViPlanner는 2) 번 방식을 채택하고 있다.
더욱이 Diffusion policy는 Goal과 state의 직접적인 관계 모델링이 중요하고 계산을 빨리 해야 하기 때문에 이 경우에는 1번 방식이 유리하다고 판단한 것 같다.
'딥러닝' 카테고리의 다른 글
Domain Generalization : Data Manipulation Methods (1) | 2023.12.06 |
---|---|
Domain Generalization (0) | 2023.12.06 |
Backdoor Attack on Self-Supervised Learning (1) | 2023.11.24 |
MAE : Masked AutoEncoder🤿 (1) | 2023.11.23 |
Contrastive Learning : BYOL (Bootstrap Your Own Latent) (2) | 2023.11.20 |
소중한 공감 감사합니다