SafeDiffuser : code explained
- -
https://github.com/Weixy21/SafeDiffuser
GitHub - Weixy21/SafeDiffuser: Safe Planning with Diffusion Probabilistic Models
Safe Planning with Diffusion Probabilistic Models. Contribute to Weixy21/SafeDiffuser development by creating an account on GitHub.
github.com
Filetree
# SafeDiffuser Project Structure
├── LICENSE
├── README.md
├── azure/ # Azure 클라우드 관련 설정
│ ├── Dockerfile
│ ├── config.py
│ ├── launch.py
│ └── ...
├── config/ # 설정 파일
│ ├── locomotion.py
│ └── maze2d.py
└── diffuser/ # 메인 코드
├── __init__.py
├── datasets/ # 데이터셋 관련 코드
│ ├── buffer.py
│ ├── d4rl.py
│ ├── normalization.py
│ ├── preprocessing.py
│ └── sequence.py
└── environments/ # 환경 관련 코드
├── ant.py
├── assets/ # 환경 에셋 파일들
│ ├── ant.xml
│ ├── half_cheetah.xml
│ ├── hopper.xml
│ └── walker2d.xml
└── ...
Install Mujoco 2.0.0
mkdir -p ~/.mujoco
# MuJoCo 2.0.0 다운로드 (Linux용)
wget https://www.roboti.us/download/mujoco200_linux.zip
unzip mujoco200_linux.zip -d ~/.mujoco/
wget https://www.roboti.us/file/mjkey.txt -P ~/.mujoco/
echo 'export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:~/.mujoco/mujoco200/bin' >> ~/.bashrc
echo 'export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/nvidia-515' >> ~/.bashrc
# nvidia driver 위치 확인하기 : 저기 없으면
echo 'export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/x86_64-linux-gnu' >> ~/.bashrc
source ~/.bashrc
sudo apt update
sudo apt install -y libgl1-mesa-dev libglew-dev patchelf python3-pip python3-numpy python3-scipy
pip install mujoco-py==2.0.2.13
pip install gdown typed-argument-parser sk-video d4rl
pip install git+https://github.com/Farama-Foundation/d4rl@master#egg=d4rl
pip install 에서 cython에러가 날 경우 소스를 다운받아서 수정해서 빌드하기
pip download mujoco-py==2.0.2.13
tar -xzf mujoco-py-2.0.2.13.tar.gz
cd mujoco-py-2.0.2.13
sed -i 's/void c_warning_callback(const char \*msg) except \* nogil:/void c_warning_callback(const char \*msg) noexcept nogil:/g' mujoco_py/cymj.pyx
sed -i 's/void c_error_callback(const char \*msg) except \* nogil:/void c_error_callback(const char \*msg) noexcept nogil:/g' mujoco_py/cymj.pyx
pip install -e .
우선 나는 Maze2D 환경을 테스트할 예정이라, git switch maze2d로 브랜치를 바꿔 준다.
python3 script... 를 실행할 때 tap관련 이슈 뜨면
pip uninstall -y tap
pip uninstall -y mc-bin-client
pip install typed-argument-parser
pip install gym==0.23.1
pip install git+https://github.com/Farama-Foundation/d4rl@master#egg=d4rl
실행
python scripts/train.py --config config.maze2d --dataset maze2d-large-v1
여기서 TypeError: 'int' object is not subscriptable 라는 에러가 뜨면 Diffusion.py에서
# 이전
####################### SafeDiffusers
# x = self.invariance(x, xp1) # RoS
x = self.invariance_cf(x, xp1) # RoS closed form # <- 이 줄이 활성화되어 있었음
# x = self.invariance_relax(x, xp1, t) # ReS
# x = self.invariance_relax_cf(x, xp1, t) #ReS closed form
# x = self.invariance_time(x, xp1, t) # TVS
# x = self.invariance_time_cf(x, xp1, t) # TVS closed form
return x
# 이후
####################### SafeDiffusers
# x = self.invariance(x, xp1) # RoS
# x = self.invariance_cf(x, xp1) # RoS closed form # <- 이 줄을 주석 처리함
# x = self.invariance_relax(x, xp1, timesteps) # ReS
# x = self.invariance_relax_cf(x, xp1, timesteps) #ReS closed form
# x = self.invariance_time(x, xp1, timesteps) #RoS time-dependent
# x = self.invariance_time_cf(x, xp1, timesteps) #RoS time-dependent closed form
return xp1 # <- x 대신 xp1을 반환
# 이전
@torch.no_grad()
def p_sample_loop(self, shape, cond, verbose=True, return_diffusion=False):
device = self.betas.device
batch_size = shape[0]
x = torch.randn(shape, device=device)
x = apply_conditioning(x, cond, self.action_dim)
if return_diffusion: diffusion = [x]
progress = utils.Progress(self.n_timesteps) if verbose else utils.Silent()
safe1, safe2 = [], []
for i in reversed(range(0, self.n_timesteps)): #-50 change here for the number of diffusion steps,
if i < 0:
i = 0
timesteps = torch.full((batch_size,), i, device=device, dtype=torch.long)
x = self.p_sample(x, cond, timesteps)
x = apply_conditioning(x, cond, self.action_dim)
safe1.append(self.safe1.unsqueeze(0))
safe2.append(self.safe2.unsqueeze(0))
progress.update({'t': i})
if return_diffusion: diffusion.append(x)
self.safe1 = torch.cat(safe1, dim=0)
self.safe2 = torch.cat(safe2, dim=0)
progress.close()
# pdb.set_trace()
if return_diffusion:
return x, torch.stack(diffusion, dim=1)
else:
return x
# 이후
@torch.no_grad()
def p_sample_loop(self, shape, cond, verbose=True, return_diffusion=False):
device = self.betas.device
batch_size = shape[0]
x = torch.randn(shape, device=device)
x = apply_conditioning(x, cond, self.action_dim)
if return_diffusion: diffusion = [x]
progress = utils.Progress(self.n_timesteps) if verbose else utils.Silent()
for i in reversed(range(0, self.n_timesteps)):
if i < 0:
i = 0
timesteps = torch.full((batch_size,), i, device=device, dtype=torch.long)
x = self.p_sample(x, cond, timesteps)
x = apply_conditioning(x, cond, self.action_dim)
progress.update({'t': i})
if return_diffusion: diffusion.append(x)
progress.close()
# pdb.set_trace()
if return_diffusion:
return x, torch.stack(diffusion, dim=1)
else:
return x
그럼 이렇게 실행이 되고,
이런 st로 이미지가 저장이 된다.
Robust-safe Diffuser
이건 diffusion.py에 구현되어 있다.
@torch.no_grad() #only for sampling
def invariance_cf(self, x, xp1): # closed form solution, RoS-diffuser for maze2d-large-v1
x = x.squeeze(0)
xp1 = xp1.squeeze(0)
nBatch = x.shape[0]
ref = xp1 - x
#normalize obstacle 1, x-1, y-0 x = 1/12*np.cos(theta) + 5.5/12, y = 1/9*np.sin(theta) + 5/9
xr = 2*1/(self.norm_maxs[1] - self.norm_mins[1])
yr = 2*1/(self.norm_maxs[0] - self.norm_mins[0])
off_x = 2*(5.8-0.5 - self.norm_mins[1])/(self.norm_maxs[1] - self.norm_mins[1]) - 1
off_y = 2*(5-0.5 - self.norm_mins[0])/(self.norm_maxs[0] - self.norm_mins[0]) - 1
#CBF
b0 = ((x[:,2:3] - off_y)/yr)**2 + ((x[:,3:4] - off_x)/xr)**2 - 1 - 0.01 # robust term 09/25
Lfb = 0
Lgbu1 = 2*((x[:,2:3] - off_y)/yr)/yr
Lgbu2 = 2*((x[:,3:4] - off_x)/xr)/xr
G0 = torch.cat([-Lgbu1, -Lgbu2], dim = 1)
k = 1
h0 = Lfb + k*b0
self.safe1 = torch.min(b0[:,0] + 0.01) # robust term 09/25
#normalize obstacle 2, x = 1/12*np.sqrt(np.abs(np.cos(theta)))*np.sign(np.cos(theta)) + 5.3/12, y = 1/9*np.sqrt(np.abs(np.sin(theta)))*np.sign(np.sin(theta)) + 2/9
xr = 2*1/(self.norm_maxs[1] - self.norm_mins[1])
yr = 2*1/(self.norm_maxs[0] - self.norm_mins[0])
off_x = 2*(5.3-0.5 - self.norm_mins[1])/(self.norm_maxs[1] - self.norm_mins[1]) - 1
off_y = 2*(2-0.5 - self.norm_mins[0])/(self.norm_maxs[0] - self.norm_mins[0]) - 1
#CBF
b = ((x[:,2:3] - off_y)/yr)**4 + ((x[:,3:4] - off_x)/xr)**4 - 1 - 0.01 # robust term 09/25
Lfb = 0
Lgbu1 = 4*((x[:,2:3] - off_y)/yr)**3/yr
Lgbu2 = 4*((x[:,3:4] - off_x)/xr)**3/xr
self.safe2 = torch.min(b[:,0]+ 0.01) # robust term 09/25
G1 = torch.cat([-Lgbu1, -Lgbu2], dim = 1)
k = 1
h1 = Lfb + k*b
q = -ref[:,2:4].to(b.device)
y1_bar = 1*G0 # H or Q = identity matrix
y2_bar = 1*G1
u_bar = -1*q
p1_bar = h0 - torch.sum(G0*u_bar,dim = 1).unsqueeze(1)
p2_bar = h1 - torch.sum(G1*u_bar,dim = 1).unsqueeze(1)
G = torch.cat([torch.sum(y1_bar*y1_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y1_bar*y2_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y2_bar*y1_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y2_bar*y2_bar,dim = 1).unsqueeze(1).unsqueeze(0)], dim = 0)
#G = 1*[y1_bar*y1_bar', y1_bar*y2_bar'; y2_bar*y1_bar', y2_bar*y2_bar']
w_p1_bar = torch.clamp(p1_bar, max=0)
w_p2_bar = torch.clamp(p2_bar, max=0)
# G 0-(1,1), 1-(1,2), 2-(2,1), 3-(2,2)
lambda1 = torch.where(G[2]*w_p2_bar < G[3]*p1_bar, torch.zeros_like(p1_bar), torch.where(G[1]*w_p1_bar < G[0]*p2_bar, w_p1_bar/G[0], torch.clamp(G[3]*p1_bar - G[2]*p2_bar, max=0)/(G[0]*G[3] - G[1]*G[2])))
lambda2 = torch.where(G[2]*w_p2_bar < G[3]*p1_bar, w_p2_bar/G[3], torch.where(G[1]*w_p1_bar < G[0]*p2_bar, torch.zeros_like(p1_bar), torch.clamp(G[0]*p2_bar - G[1]*p1_bar, max=0)/(G[0]*G[3] - G[1]*G[2])))
out = lambda1*y1_bar + lambda2*y2_bar + u_bar
rt = xp1.clone()
rt[:,2:4] = x[:,2:4] + out
# print(out)
rt = rt.unsqueeze(0)
return rt
이 함수는 전 포스팅에서도 설명했듯이 Robust-safe Diffuser(CBF 제약을 만족하도록 경로를 생성하는 디퓨져)에 대한 구현이다.
def invariance_cf(self, x, xp1): # closed form solution, RoS-diffuser for maze2d-large-v1
# x: 현재 디퓨전 단계의 상태
# xp1: 다음 디퓨전 단계의 예측 상태(수정 전)
x = x.squeeze(0)
xp1 = xp1.squeeze(0)
nBatch = x.shape[0]
ref = xp1 - x
# 배치 차원을 제거하고 참조 방향(ref)을 계산
# ref는 원래의 디퓨전 방향(τj-τj+1)
# 첫 번째 장애물에 대한 CBF 설정
xr = 2*1/(self.norm_maxs[1] - self.norm_mins[1])
yr = 2*1/(self.norm_maxs[0] - self.norm_mins[0])
off_x = 2*(5.8-0.5 - self.norm_mins[1])/(self.norm_maxs[1] - self.norm_mins[1]) - 1
off_y = 2*(5-0.5 - self.norm_mins[0])/(self.norm_maxs[0] - self.norm_mins[0]) - 1
# 첫 번째 장애물은 원형으로, 정규화된 공간에서의 중심점과 반지름을 설정
b0 = ((x[:,2:3] - off_y)/yr)**2 + ((x[:,3:4] - off_x)/xr)**2 - 1 - 0.01 # robust term 09/25
# 원형 장애물에 대한 제어 장벽 함수(CBF) b0를 정의
# "점이 원 바깥에 있으면 양수, 원 내부에 있으면 음수"
Lfb = 0
Lgbu1 = 2*((x[:,2:3] - off_y)/yr)/yr
Lgbu2 = 2*((x[:,3:4] - off_x)/xr)/xr
G0 = torch.cat([-Lgbu1, -Lgbu2], dim = 1)
k = 1
h0 = Lfb + k*b0
# CBF의 리 도함수를 계산
q = -ref[:,2:4].to(b.device)
y1_bar = 1*G0 # H or Q = identity matrix
y2_bar = 1*G1
u_bar = -1*q
p1_bar = h0 - torch.sum(G0*u_bar,dim = 1).unsqueeze(1)
p2_bar = h1 - torch.sum(G1*u_bar,dim = 1).unsqueeze(1)
# q는 원래 방향의 반대 방향
# y1_bar, y2_bar는 장애물에 대한 CBF 제약조건의 그래디언트
# u_bar는 초기 제어 입력
# p1_bar, p2_bar는 CBF 제약조건의 잔여 부분
out = lambda1*y1_bar + lambda2*y2_bar + u_bar
rt = xp1.clone()
rt[:,2:4] = x[:,2:4] + out
rt = rt.unsqueeze(0)
return rt
# 계산된 최적 제어 입력 out을 사용하여 상태를 업데이트
# 기존 예측 상태 xp1을 복제하고, 위치 부분(인덱스 2:4)만 수정된 값으로 대체
# 결과를 원래 형태로 되돌려(배치 차원 추가) 반환
그래서 이 함수의 핵심은
- 두 개의 장애물(원형 및 타원)에 대한 제어 장벽 함수(CBF)를 정의하고
- 각 CBF에 대해 그래디언트와 안전 조건을 계산하고
- 원래 디퓨전 방향에서 최소로 벗어나면서 안전 조건을 만족하는 제어 입력을 계산하고
- 계산된 제어 입력으로 상태를 업데이트
하는 것이다.
- Quadratic Programming(QP) 최적화란 목적 함수가 이차식(Quadratic function)이고, 제약 조건이 선형인 최적화 문제이다.
최소화: (1/2)x^T Q x + c^T x
제약 조건: Ax ≤ b
Ex = d
- x는 결정 변수(우리가 찾고자 하는 값)
- Q는 대칭 행렬
- c, b, d는 벡터
- A, E는 행렬
이 문제에서 QP는
- 최소화: ‖uj - (τj-τj+1)/Δτ‖² (원래 디퓨전 방향과의 차이 최소화)
- 제약 조건: h(ujₖ|xjₖ) ≥ 0 (안전 조건 만족)
다만 이런 문제는 반복적인 계산이 필요해 계산 비용이 높기 때문에 이 코드에서는 Closed-form solution을 이용한다.
lambda1 = torch.where(G[2]*w_p2_bar < G[3]*p1_bar, torch.zeros_like(p1_bar), torch.where(G[1]*w_p1_bar < G[0]*p2_bar, w_p1_bar/G[0], torch.clamp(G[3]*p1_bar - G[2]*p2_bar, max=0)/(G[0]*G[3] - G[1]*G[2])))
lambda2 = torch.where(G[2]*w_p2_bar < G[3]*p1_bar, w_p2_bar/G[3], torch.where(G[1]*w_p1_bar < G[0]*p2_bar, torch.zeros_like(p1_bar), torch.clamp(G[0]*p2_bar - G[1]*p1_bar, max=0)/(G[0]*G[3] - G[1]*G[2])))
out = lambda1*y1_bar + lambda2*y2_bar + u_bar
이 코드는 KKT(Karush-Kuhn-Tucker) 조건을 기반으로 라그랑주 승수 lambda1과 lambda2를 직접 계산한다.
Relaxed-safe Diffuser
마찬가지로 diffusion.py에 구현되어 있다. 이 코드는 Robust-safe diffuser(RoS)에 비해서 local trap 문제를 해결하기 위해서 변형을 가한 구조이다. 변화는 다음과 같다.
- RoS와 비교해서 추가적인 입력 t(현재 디퓨젼 시간 단계)가 있다.
- 디퓨젼 시간 t에 따라 완화 변수 가중치 sign을 설정한다.
- 디퓨젼 초기 단계에서는 안전 제약 조건을 완화하여 local trap에서 벗어날 수 있게 한다.
- 디퓨젼이 진행됨에 따라 (t<10) 완화를 제거하여 최종 결과가 안전 제약 조건을 만족하도록 한다.
- 초기 디퓨젼 단계에서는 안전 제약 조건을 일시적으로 완화하여 시스템이 local trap에 갇히는 것을 방지하고 디퓨젼 후반부에서는 제약 조건을 강화하여 최종 결과가 안전하도록 보장한다.
@torch.no_grad() #only for sampling
def invariance_relax_cf(self, x, xp1, t): # closed-form solution, ReS-diffuser for maze2d-large-v1
x = x.squeeze(0)
xp1 = xp1.squeeze(0)
nBatch = x.shape[0]
ref = xp1 - x
#normalize obstacle 1, x-1, y-0 x = 1/12*np.cos(theta) + 5.5/12, y = 1/9*np.sin(theta) + 5/9
xr = 2*1/(self.norm_maxs[1] - self.norm_mins[1])
yr = 2*1/(self.norm_maxs[0] - self.norm_mins[0])
off_x = 2*(5.8-0.5 - self.norm_mins[1])/(self.norm_maxs[1] - self.norm_mins[1]) - 1
off_y = 2*(5-0.5 - self.norm_mins[0])/(self.norm_maxs[0] - self.norm_mins[0]) - 1
#CBF
b = ((x[:,2:3] - off_y)/yr)**2 + ((x[:,3:4] - off_x)/xr)**2 - 1 - 0.01
Lfb = 0
Lgbu1 = 2*((x[:,2:3] - off_y)/yr)/yr
Lgbu2 = 2*((x[:,3:4] - off_x)/xr)/xr
self.safe1 = torch.min(b[:,0] + 0.01)
if t >= 10: # debug 10
sign = 100 #relax
else:
sign = 0 #non-relax
rx0 = torch.zeros_like(Lgbu1).to(b.device)
rx1 = sign*torch.ones_like(Lgbu1).to(b.device)
G0 = torch.cat([-Lgbu1, -Lgbu2, rx1, rx0], dim = 1)
k = 1
h0 = Lfb + k*b
#normalize obstacle 2, x = 1/12*np.sqrt(np.abs(np.cos(theta)))*np.sign(np.cos(theta)) + 5.3/12, y = 1/9*np.sqrt(np.abs(np.sin(theta)))*np.sign(np.sin(theta)) + 2/9
xr = 2*1/(self.norm_maxs[1] - self.norm_mins[1])
yr = 2*1/(self.norm_maxs[0] - self.norm_mins[0])
off_x = 2*(5.3-0.5 - self.norm_mins[1])/(self.norm_maxs[1] - self.norm_mins[1]) - 1
off_y = 2*(2-0.5 - self.norm_mins[0])/(self.norm_maxs[0] - self.norm_mins[0]) - 1
#CBF
b = ((x[:,2:3] - off_y)/yr)**4 + ((x[:,3:4] - off_x)/xr)**4 - 1 - 0.01
Lfb = 0
Lgbu1 = 4*((x[:,2:3] - off_y)/yr)**3/yr
Lgbu2 = 4*((x[:,3:4] - off_x)/xr)**3/xr
self.safe2 = torch.min(b[:,0] + 0.01)
G1 = torch.cat([-Lgbu1, -Lgbu2, rx0, rx1], dim = 1)
k = 1
h1 = Lfb + k*b
q = -ref[:,2:4].to(G0.device)
q0 = torch.zeros_like(q).to(G0.device)
q = torch.cat([q, q0], dim = 1)
y1_bar = 1*G0 # H or Q = identity matrix
y2_bar = 1*G1
u_bar = -1*q
p1_bar = h0 - torch.sum(G0*u_bar,dim = 1).unsqueeze(1)
p2_bar = h1 - torch.sum(G1*u_bar,dim = 1).unsqueeze(1)
G = torch.cat([torch.sum(y1_bar*y1_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y1_bar*y2_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y2_bar*y1_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y2_bar*y2_bar,dim = 1).unsqueeze(1).unsqueeze(0)], dim = 0)
#G = 1*[y1_bar*y1_bar', y1_bar*y2_bar'; y2_bar*y1_bar', y2_bar*y2_bar']
w_p1_bar = torch.clamp(p1_bar, max=0)
w_p2_bar = torch.clamp(p2_bar, max=0)
# G 0-(1,1), 1-(1,2), 2-(2,1), 3-(2,2)
lambda1 = torch.where(G[2]*w_p2_bar < G[3]*p1_bar, torch.zeros_like(p1_bar), torch.where(G[1]*w_p1_bar < G[0]*p2_bar, w_p1_bar/G[0], torch.clamp(G[3]*p1_bar - G[2]*p2_bar, max=0)/(G[0]*G[3] - G[1]*G[2])))
lambda2 = torch.where(G[2]*w_p2_bar < G[3]*p1_bar, w_p2_bar/G[3], torch.where(G[1]*w_p1_bar < G[0]*p2_bar, torch.zeros_like(p1_bar), torch.clamp(G[0]*p2_bar - G[1]*p1_bar, max=0)/(G[0]*G[3] - G[1]*G[2])))
out = lambda1*y1_bar + lambda2*y2_bar + u_bar
rt = xp1.clone()
rt[:,2:4] = x[:,2:4] + out[:,0:2]
rt = rt.unsqueeze(0)
return rt
Time-varying-safe Diffuser
이 함수는 RoS, ReS와는 다른 방식으로 local trap 문제를 해결하는데, '안전 영역 자체를 확장'함으로써 제약을 적용하도록 한다.
- Relaxed-Safe Diffuser: 제어 입력 벡터에 완화 변수를 추가
- Time-Varying-Safe Diffuser: CBF 함수 정의에 시간 의존적 항을 추가
- 최적화 변수의 차원은 더 작지만, CBF 함수와 리 도함수가 더 복잡
@torch.no_grad() #only for sampling
def invariance_time_cf(self, x, xp1, t): # closed-form solution, TVS-diffuser for maze2d-large-v1
t_bias = 5 #50
x = x.squeeze(0)
xp1 = xp1.squeeze(0)
nBatch = x.shape[0]
ref = xp1 - x
#normalize obstacle 1, x-1, y-0 x = 1/12*np.cos(theta) + 5.5/12, y = 1/9*np.sin(theta) + 5/9
xr = 2*1/(self.norm_maxs[1] - self.norm_mins[1])
yr = 2*1/(self.norm_maxs[0] - self.norm_mins[0])
off_x = 2*(5.8-0.5 - self.norm_mins[1])/(self.norm_maxs[1] - self.norm_mins[1]) - 1
off_y = 2*(5-0.5 - self.norm_mins[0])/(self.norm_maxs[0] - self.norm_mins[0]) - 1
#CBF
b = ((x[:,2:3] - off_y)/yr)**2 + ((x[:,3:4] - off_x)/xr)**2 - nn.Sigmoid()(t_bias - t) -0.01
Lfb = nn.Sigmoid()(t_bias - t)*(1 - nn.Sigmoid()(t_bias - t))
Lgbu1 = 2*((x[:,2:3] - off_y)/yr)/yr
Lgbu2 = 2*((x[:,3:4] - off_x)/xr)/xr
self.safe1 = torch.min(b[:,0] + 0.01)
G0 = torch.cat([-Lgbu1, -Lgbu2], dim = 1)
k = 1 #0.3
h0 = Lfb + k*b
#normalize obstacle 2, x = 1/12*np.sqrt(np.abs(np.cos(theta)))*np.sign(np.cos(theta)) + 5.3/12, y = 1/9*np.sqrt(np.abs(np.sin(theta)))*np.sign(np.sin(theta)) + 2/9
xr = 2*1/(self.norm_maxs[1] - self.norm_mins[1])
yr = 2*1/(self.norm_maxs[0] - self.norm_mins[0])
off_x = 2*(5.3-0.5 - self.norm_mins[1])/(self.norm_maxs[1] - self.norm_mins[1]) - 1
off_y = 2*(2-0.5 - self.norm_mins[0])/(self.norm_maxs[0] - self.norm_mins[0]) - 1
#CBF
b = ((x[:,2:3] - off_y)/yr)**4 + ((x[:,3:4] - off_x)/xr)**4 - nn.Sigmoid()(t_bias - t) - 0.01
Lfb = nn.Sigmoid()(t_bias - t)*(1 - nn.Sigmoid()(t_bias - t))
Lgbu1 = 4*((x[:,2:3] - off_y)/yr)**3/yr
Lgbu2 = 4*((x[:,3:4] - off_x)/xr)**3/xr
self.safe2 = torch.min(b[:,0] + 0.01)
G1 = torch.cat([-Lgbu1, -Lgbu2], dim = 1)
k = 1 #0.4
h1 = Lfb + k*b
q = -ref[:,2:4].to(G0.device)
y1_bar = 1*G0 # H or Q = identity matrix
y2_bar = 1*G1
u_bar = -1*q
p1_bar = h0 - torch.sum(G0*u_bar,dim = 1).unsqueeze(1)
p2_bar = h1 - torch.sum(G1*u_bar,dim = 1).unsqueeze(1)
G = torch.cat([torch.sum(y1_bar*y1_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y1_bar*y2_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y2_bar*y1_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y2_bar*y2_bar,dim = 1).unsqueeze(1).unsqueeze(0)], dim = 0)
#G = 1*[y1_bar*y1_bar', y1_bar*y2_bar'; y2_bar*y1_bar', y2_bar*y2_bar']
w_p1_bar = torch.clamp(p1_bar, max=0)
w_p2_bar = torch.clamp(p2_bar, max=0)
# G 0-(1,1), 1-(1,2), 2-(2,1), 3-(2,2)
lambda1 = torch.where(G[2]*w_p2_bar < G[3]*p1_bar, torch.zeros_like(p1_bar), torch.where(G[1]*w_p1_bar < G[0]*p2_bar, w_p1_bar/G[0], torch.clamp(G[3]*p1_bar - G[2]*p2_bar, max=0)/(G[0]*G[3] - G[1]*G[2])))
lambda2 = torch.where(G[2]*w_p2_bar < G[3]*p1_bar, w_p2_bar/G[3], torch.where(G[1]*w_p1_bar < G[0]*p2_bar, torch.zeros_like(p1_bar), torch.clamp(G[0]*p2_bar - G[1]*p1_bar, max=0)/(G[0]*G[3] - G[1]*G[2])))
out = lambda1*y1_bar + lambda2*y2_bar + u_bar
rt = xp1.clone()
rt[:,2:4] = x[:,2:4] + out
# print(out)
rt = rt.unsqueeze(0)
return rt
Sampling
여러 옵션을 제공하고 있다.
- 원본 디퓨저만 사용
- Shield와 Gradient Descent 방법
- Shield: 안전하지 않은 상태를 차단하고 안전한 영역으로 투사
- GD: 그래디언트 방법으로 안전 영역으로 유도
Result
원본 디퓨져
CBF 적용하여 Inference 하기
# return xp1
x = self.invariance_cf(x, xp1) # RoS closed form # RoS 방식 사용
return x # xp1 대신 x를 반환
일단 냅다 주석을 풀고 실행하면 이런 에러가 뜬다.
https://github.com/Weixy21/SafeDiffuser/issues/2
TypeError: 'int' object is not subscriptable · Issue #2 · Weixy21/SafeDiffuser
In diffuser/models/diffusion.py, the following block of code seems to fail when running python scripts/train.py --dataset maze2d-medium-v1. x = xp1 xr = 2*1/(self.norm_maxs[1] - self.norm_mins[1]) ...
github.com
학습 중에는 Diffusion invariance를 사용하지 않고, inference 시에만 diffusion invariance를 사용한다는 얘기인데, Safe Diffuser의 safe contraints가 inference time에만 적용된다는 뜻이다. (학습 중에 이런 제약을 적용하면 모델이 데이터로부터 분포를 잘 학습하는 것을 방해할 수 있기 때문에?)
어쨌든 그래서 plan_maze2d.py를 실행할 때
# /config/maze2d.py
'checkpoint': 'state_1960000.pt', # 체크포인트 파일 추가
python scripts/plan_maze2d.py --config config.maze2d --dataset maze2d-large-v1 --logbase logs/maze2d-large-v1/diffusion --diffusion_loadpath H384_T256
Conditioning
우선 maze2d 환경에서 observation은 4차원 벡터로 들어온다.
[x, y, dx, dy]
# x, y: 2D 공간에서 현재 위치 좌표
# dx, dy: 각각 x축과 y축 방향의 속도
observation = np.array([ 0.94875744, 2.93648809, -0.01347715, 0.06358764])
# x y dx dy
observation = np.array([ 0.94875744, 2.93648809, -0.01347715, 0.06358764]) # fix the initial position and final destination for comparison (not needed for general testing)
total_reward = 0
for t in range(env.max_episode_steps):
state = env.state_vector().copy()
## can replan if desired, but the open-loop plans are good enough for maze2d
## that we really only need to plan once
if t == 0:
cond[0] = observation
start = time.time()
action, samples, diffusion_paths, safe1, safe2, elbo = policy(cond, batch_size=args.batch_size)
end = time.time()
comp_time.append(end-start)
elbo_batch.append(elbo)
############################# single test
# cond[0] = observation
# action, samples, diffusion_paths, safe1, safe2 = policy(cond, batch_size=args.batch_size) #policy.normalizer.normalizers['observations'].mins
actions = samples.actions[0]
sequence = samples.observations[0]
diffusion_paths = diffusion_paths[0]
# policies.py
## run reverse diffusion process
self.diffusion_model.norm_mins = self.normalizer.normalizers['observations'].mins
self.diffusion_model.norm_maxs = self.normalizer.normalizers['observations'].maxs
sample, diffusion = self.diffusion_model(conditions)
sample, diffusion = self.diffusion_model(conditions)
@torch.no_grad()
def conditional_sample(self, cond, *args, horizon=None, return_diffusion = True, **kwargs):
'''
conditions : [ (time, state), ... ]
'''
device = self.betas.device
batch_size = len(cond[0])
horizon = horizon or self.horizon
shape = (batch_size, horizon, self.transition_dim)
return self.p_sample_loop(shape, cond, return_diffusion= return_diffusion, *args, **kwargs) ## debug
어쨌든 최종적으로 호출되는 것은 이 conditional_sample 함수이다.
'논문 리뷰' 카테고리의 다른 글
SafeDiffuser: Safe Planning with Diffusion Probabilistic Models (0) | 2025.04.12 |
---|---|
Flow Matching for Generative Modeling (0) | 2025.03.31 |
Planning with Diffusion for Flexible Behavior Synthesis (0) | 2025.03.19 |
Diffusion Policy:Visuomotor Policy Learning via Action Diffusion (0) | 2025.03.19 |
LDP: A Local Diffusion Planner for Efficient Robot Navigation and Collision Avoidance (0) | 2025.03.17 |
소중한 공감 감사합니다