새소식

논문 리뷰

SafeDiffuser : code explained

  • -

https://github.com/Weixy21/SafeDiffuser

 

GitHub - Weixy21/SafeDiffuser: Safe Planning with Diffusion Probabilistic Models

Safe Planning with Diffusion Probabilistic Models. Contribute to Weixy21/SafeDiffuser development by creating an account on GitHub.

github.com

 

Filetree

# SafeDiffuser Project Structure

├── LICENSE
├── README.md
├── azure/                  # Azure 클라우드 관련 설정
│   ├── Dockerfile
│   ├── config.py
│   ├── launch.py
│   └── ...
├── config/                 # 설정 파일
│   ├── locomotion.py
│   └── maze2d.py
└── diffuser/              # 메인 코드
    ├── __init__.py
    ├── datasets/          # 데이터셋 관련 코드
    │   ├── buffer.py
    │   ├── d4rl.py
    │   ├── normalization.py
    │   ├── preprocessing.py
    │   └── sequence.py
    └── environments/      # 환경 관련 코드
        ├── ant.py
        ├── assets/       # 환경 에셋 파일들
        │   ├── ant.xml
        │   ├── half_cheetah.xml
        │   ├── hopper.xml
        │   └── walker2d.xml
        └── ...

 

Install Mujoco 2.0.0

mkdir -p ~/.mujoco

# MuJoCo 2.0.0 다운로드 (Linux용)
wget https://www.roboti.us/download/mujoco200_linux.zip

unzip mujoco200_linux.zip -d ~/.mujoco/
wget https://www.roboti.us/file/mjkey.txt -P ~/.mujoco/


echo 'export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:~/.mujoco/mujoco200/bin' >> ~/.bashrc
echo 'export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/nvidia-515' >> ~/.bashrc
# nvidia driver 위치 확인하기 : 저기 없으면 
echo 'export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/x86_64-linux-gnu' >> ~/.bashrc

source ~/.bashrc

sudo apt update
sudo apt install -y libgl1-mesa-dev libglew-dev patchelf python3-pip python3-numpy python3-scipy

pip install mujoco-py==2.0.2.13

pip install gdown typed-argument-parser sk-video d4rl
pip install git+https://github.com/Farama-Foundation/d4rl@master#egg=d4rl

pip install 에서 cython에러가 날 경우 소스를 다운받아서 수정해서 빌드하기

pip download mujoco-py==2.0.2.13
tar -xzf mujoco-py-2.0.2.13.tar.gz
cd mujoco-py-2.0.2.13

sed -i 's/void c_warning_callback(const char \*msg) except \* nogil:/void c_warning_callback(const char \*msg) noexcept nogil:/g' mujoco_py/cymj.pyx
sed -i 's/void c_error_callback(const char \*msg) except \* nogil:/void c_error_callback(const char \*msg) noexcept nogil:/g' mujoco_py/cymj.pyx

pip install -e .

우선 나는 Maze2D 환경을 테스트할 예정이라, git switch maze2d로 브랜치를 바꿔 준다. 

python3 script... 를 실행할 때 tap관련 이슈 뜨면

pip uninstall -y tap
pip uninstall -y mc-bin-client

pip install typed-argument-parser
pip install gym==0.23.1
pip install git+https://github.com/Farama-Foundation/d4rl@master#egg=d4rl

 

실행

python scripts/train.py --config config.maze2d --dataset maze2d-large-v1

여기서 TypeError: 'int' object is not subscriptable 라는 에러가 뜨면 Diffusion.py에서

# 이전
        ####################### SafeDiffusers 
        # x = self.invariance(x, xp1)    # RoS
        x = self.invariance_cf(x, xp1)  # RoS closed form  # <- 이 줄이 활성화되어 있었음
        # x = self.invariance_relax(x, xp1, t) # ReS
        # x = self.invariance_relax_cf(x, xp1, t)   #ReS closed form    
        # x = self.invariance_time(x, xp1, t)   # TVS
        # x = self.invariance_time_cf(x, xp1, t)  # TVS closed form
        return x
        
# 이후
        ####################### SafeDiffusers 
        # x = self.invariance(x, xp1)    # RoS
        # x = self.invariance_cf(x, xp1)  # RoS closed form  # <- 이 줄을 주석 처리함
        # x = self.invariance_relax(x, xp1, timesteps) # ReS
        # x = self.invariance_relax_cf(x, xp1, timesteps)   #ReS closed form    
        # x = self.invariance_time(x, xp1, timesteps)   #RoS time-dependent
        # x = self.invariance_time_cf(x, xp1, timesteps)   #RoS time-dependent closed form
        return xp1  # <- x 대신 xp1을 반환
# 이전

    @torch.no_grad()
    def p_sample_loop(self, shape, cond, verbose=True, return_diffusion=False):
        device = self.betas.device

        batch_size = shape[0]
        x = torch.randn(shape, device=device)
        x = apply_conditioning(x, cond, self.action_dim)

        if return_diffusion: diffusion = [x]

        progress = utils.Progress(self.n_timesteps) if verbose else utils.Silent()
        safe1, safe2 = [], []
        for i in reversed(range(0, self.n_timesteps)):  #-50 change here for the number of diffusion steps,
            if i < 0:
                i = 0
            timesteps = torch.full((batch_size,), i, device=device, dtype=torch.long)
            x = self.p_sample(x, cond, timesteps)
            x = apply_conditioning(x, cond, self.action_dim)
            safe1.append(self.safe1.unsqueeze(0))
            safe2.append(self.safe2.unsqueeze(0))
            progress.update({'t': i})

            if return_diffusion: diffusion.append(x)
        
        self.safe1 = torch.cat(safe1, dim=0)
        self.safe2 = torch.cat(safe2, dim=0)

        progress.close()
        # pdb.set_trace()
        if return_diffusion:
            return x, torch.stack(diffusion, dim=1)
        else:
            return x
            
            
 # 이후

    @torch.no_grad()
    def p_sample_loop(self, shape, cond, verbose=True, return_diffusion=False):
        device = self.betas.device

        batch_size = shape[0]
        x = torch.randn(shape, device=device)
        x = apply_conditioning(x, cond, self.action_dim)

        if return_diffusion: diffusion = [x]

        progress = utils.Progress(self.n_timesteps) if verbose else utils.Silent()
        for i in reversed(range(0, self.n_timesteps)):
            if i < 0:
                i = 0
            timesteps = torch.full((batch_size,), i, device=device, dtype=torch.long)
            x = self.p_sample(x, cond, timesteps)
            x = apply_conditioning(x, cond, self.action_dim)
            progress.update({'t': i})

            if return_diffusion: diffusion.append(x)
        
        progress.close()
        # pdb.set_trace()
        if return_diffusion:
            return x, torch.stack(diffusion, dim=1)
        else:
            return x

그럼 이렇게 실행이 되고, 

이런 st로 이미지가 저장이 된다.  

Robust-safe Diffuser

이건 diffusion.py에 구현되어 있다.  

    @torch.no_grad()   #only for sampling
    def invariance_cf(self, x, xp1):  # closed form solution,  RoS-diffuser for maze2d-large-v1

        x = x.squeeze(0)
        xp1 = xp1.squeeze(0)

        nBatch = x.shape[0]
        ref = xp1 - x

        #normalize obstacle 1, x-1, y-0  x = 1/12*np.cos(theta) + 5.5/12, y = 1/9*np.sin(theta) + 5/9
        xr = 2*1/(self.norm_maxs[1] - self.norm_mins[1])
        yr = 2*1/(self.norm_maxs[0] - self.norm_mins[0])
        off_x = 2*(5.8-0.5 - self.norm_mins[1])/(self.norm_maxs[1] - self.norm_mins[1]) - 1
        off_y = 2*(5-0.5 - self.norm_mins[0])/(self.norm_maxs[0] - self.norm_mins[0]) - 1

        #CBF
        b0 = ((x[:,2:3] - off_y)/yr)**2 + ((x[:,3:4] - off_x)/xr)**2 - 1 - 0.01  # robust term 09/25
        Lfb = 0
        Lgbu1 = 2*((x[:,2:3] - off_y)/yr)/yr
        Lgbu2 = 2*((x[:,3:4] - off_x)/xr)/xr

        G0 = torch.cat([-Lgbu1, -Lgbu2], dim = 1)
        k = 1
        h0 = Lfb + k*b0

        self.safe1 = torch.min(b0[:,0] + 0.01)  # robust term 09/25

        #normalize obstacle 2,  x = 1/12*np.sqrt(np.abs(np.cos(theta)))*np.sign(np.cos(theta)) + 5.3/12, y = 1/9*np.sqrt(np.abs(np.sin(theta)))*np.sign(np.sin(theta)) + 2/9
        xr = 2*1/(self.norm_maxs[1] - self.norm_mins[1])
        yr = 2*1/(self.norm_maxs[0] - self.norm_mins[0])
        off_x = 2*(5.3-0.5 - self.norm_mins[1])/(self.norm_maxs[1] - self.norm_mins[1]) - 1
        off_y = 2*(2-0.5 - self.norm_mins[0])/(self.norm_maxs[0] - self.norm_mins[0]) - 1

        #CBF
        b = ((x[:,2:3] - off_y)/yr)**4 + ((x[:,3:4] - off_x)/xr)**4 - 1 - 0.01 # robust term 09/25
        Lfb = 0
        Lgbu1 = 4*((x[:,2:3] - off_y)/yr)**3/yr
        Lgbu2 = 4*((x[:,3:4] - off_x)/xr)**3/xr

        self.safe2 = torch.min(b[:,0]+ 0.01) # robust term 09/25

        G1 = torch.cat([-Lgbu1, -Lgbu2], dim = 1)
        k = 1
        h1 = Lfb + k*b
        
   
        q = -ref[:,2:4].to(b.device)
        
        y1_bar = 1*G0  # H or Q = identity matrix
        y2_bar = 1*G1
        u_bar = -1*q
        p1_bar = h0 - torch.sum(G0*u_bar,dim = 1).unsqueeze(1)
        p2_bar = h1 - torch.sum(G1*u_bar,dim = 1).unsqueeze(1)

        G = torch.cat([torch.sum(y1_bar*y1_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y1_bar*y2_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y2_bar*y1_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y2_bar*y2_bar,dim = 1).unsqueeze(1).unsqueeze(0)], dim = 0)
        #G = 1*[y1_bar*y1_bar', y1_bar*y2_bar'; y2_bar*y1_bar', y2_bar*y2_bar']
        w_p1_bar = torch.clamp(p1_bar, max=0)
        w_p2_bar = torch.clamp(p2_bar, max=0)

        # G 0-(1,1), 1-(1,2), 2-(2,1), 3-(2,2)
        lambda1 = torch.where(G[2]*w_p2_bar < G[3]*p1_bar, torch.zeros_like(p1_bar), torch.where(G[1]*w_p1_bar < G[0]*p2_bar, w_p1_bar/G[0], torch.clamp(G[3]*p1_bar - G[2]*p2_bar, max=0)/(G[0]*G[3] - G[1]*G[2])))
        
        lambda2 = torch.where(G[2]*w_p2_bar < G[3]*p1_bar, w_p2_bar/G[3], torch.where(G[1]*w_p1_bar < G[0]*p2_bar, torch.zeros_like(p1_bar), torch.clamp(G[0]*p2_bar - G[1]*p1_bar, max=0)/(G[0]*G[3] - G[1]*G[2])))

        out = lambda1*y1_bar + lambda2*y2_bar + u_bar
        rt = xp1.clone()      
        rt[:,2:4] = x[:,2:4] + out
        # print(out)
        rt = rt.unsqueeze(0)
        return rt

이 함수는 전 포스팅에서도 설명했듯이 Robust-safe Diffuser(CBF 제약을 만족하도록 경로를 생성하는 디퓨져)에 대한 구현이다.

def invariance_cf(self, x, xp1):  # closed form solution,  RoS-diffuser for maze2d-large-v1

# x: 현재 디퓨전 단계의 상태
# xp1: 다음 디퓨전 단계의 예측 상태(수정 전)

x = x.squeeze(0)
xp1 = xp1.squeeze(0)
nBatch = x.shape[0]
ref = xp1 - x

# 배치 차원을 제거하고 참조 방향(ref)을 계산
# ref는 원래의 디퓨전 방향(τj-τj+1)

# 첫 번째 장애물에 대한 CBF 설정

xr = 2*1/(self.norm_maxs[1] - self.norm_mins[1])
yr = 2*1/(self.norm_maxs[0] - self.norm_mins[0])
off_x = 2*(5.8-0.5 - self.norm_mins[1])/(self.norm_maxs[1] - self.norm_mins[1]) - 1
off_y = 2*(5-0.5 - self.norm_mins[0])/(self.norm_maxs[0] - self.norm_mins[0]) - 1

# 첫 번째 장애물은 원형으로, 정규화된 공간에서의 중심점과 반지름을 설정

b0 = ((x[:,2:3] - off_y)/yr)**2 + ((x[:,3:4] - off_x)/xr)**2 - 1 - 0.01  # robust term 09/25

# 원형 장애물에 대한 제어 장벽 함수(CBF) b0를 정의
# "점이 원 바깥에 있으면 양수, 원 내부에 있으면 음수"

Lfb = 0
Lgbu1 = 2*((x[:,2:3] - off_y)/yr)/yr
Lgbu2 = 2*((x[:,3:4] - off_x)/xr)/xr

G0 = torch.cat([-Lgbu1, -Lgbu2], dim = 1)
k = 1
h0 = Lfb + k*b0

# CBF의 리 도함수를 계산

q = -ref[:,2:4].to(b.device)
        
y1_bar = 1*G0  # H or Q = identity matrix
y2_bar = 1*G1
u_bar = -1*q
p1_bar = h0 - torch.sum(G0*u_bar,dim = 1).unsqueeze(1)
p2_bar = h1 - torch.sum(G1*u_bar,dim = 1).unsqueeze(1)

# q는 원래 방향의 반대 방향
# y1_bar, y2_bar는 장애물에 대한 CBF 제약조건의 그래디언트
# u_bar는 초기 제어 입력
# p1_bar, p2_bar는 CBF 제약조건의 잔여 부분

out = lambda1*y1_bar + lambda2*y2_bar + u_bar
rt = xp1.clone()      
rt[:,2:4] = x[:,2:4] + out
rt = rt.unsqueeze(0)
return rt

# 계산된 최적 제어 입력 out을 사용하여 상태를 업데이트
# 기존 예측 상태 xp1을 복제하고, 위치 부분(인덱스 2:4)만 수정된 값으로 대체
# 결과를 원래 형태로 되돌려(배치 차원 추가) 반환

그래서 이 함수의 핵심은 

  • 두 개의 장애물(원형 및 타원)에 대한 제어 장벽 함수(CBF)를 정의하고
  • 각 CBF에 대해 그래디언트와 안전 조건을 계산하고
  • 원래 디퓨전 방향에서 최소로 벗어나면서 안전 조건을 만족하는 제어 입력을 계산하고
  • 계산된 제어 입력으로 상태를 업데이트

하는 것이다. 

  • Quadratic Programming(QP) 최적화란 목적 함수가 이차식(Quadratic function)이고, 제약 조건이 선형인 최적화 문제이다. 
최소화: (1/2)x^T Q x + c^T x
제약 조건: Ax ≤ b
           Ex = d
  • x는 결정 변수(우리가 찾고자 하는 값)
  • Q는 대칭 행렬
  • c, b, d는 벡터
  • A, E는 행렬

이 문제에서 QP는

  • 최소화: ‖uj - (τj-τj+1)/Δτ‖² (원래 디퓨전 방향과의 차이 최소화)
  • 제약 조건: h(ujₖ|xjₖ) ≥ 0 (안전 조건 만족)

 

다만 이런 문제는 반복적인 계산이 필요해 계산 비용이 높기 때문에 이 코드에서는 Closed-form solution을 이용한다. 

lambda1 = torch.where(G[2]*w_p2_bar < G[3]*p1_bar, torch.zeros_like(p1_bar), torch.where(G[1]*w_p1_bar < G[0]*p2_bar, w_p1_bar/G[0], torch.clamp(G[3]*p1_bar - G[2]*p2_bar, max=0)/(G[0]*G[3] - G[1]*G[2])))
        
lambda2 = torch.where(G[2]*w_p2_bar < G[3]*p1_bar, w_p2_bar/G[3], torch.where(G[1]*w_p1_bar < G[0]*p2_bar, torch.zeros_like(p1_bar), torch.clamp(G[0]*p2_bar - G[1]*p1_bar, max=0)/(G[0]*G[3] - G[1]*G[2])))

out = lambda1*y1_bar + lambda2*y2_bar + u_bar

이 코드는 KKT(Karush-Kuhn-Tucker) 조건을 기반으로 라그랑주 승수 lambda1과 lambda2를 직접 계산한다. 

 

Relaxed-safe Diffuser

마찬가지로 diffusion.py에 구현되어 있다. 이 코드는 Robust-safe diffuser(RoS)에 비해서 local trap 문제를 해결하기 위해서 변형을 가한 구조이다. 변화는 다음과 같다. 

  • RoS와 비교해서 추가적인 입력 t(현재 디퓨젼 시간 단계)가 있다. 
    • 디퓨젼 시간 t에 따라 완화 변수 가중치 sign을 설정한다. 
  • 디퓨젼 초기 단계에서는 안전 제약 조건을 완화하여 local trap에서 벗어날 수 있게 한다. 
  • 디퓨젼이 진행됨에 따라 (t<10) 완화를 제거하여 최종 결과가 안전 제약 조건을 만족하도록 한다. 
  • 초기 디퓨젼 단계에서는 안전 제약 조건을 일시적으로 완화하여 시스템이 local trap에 갇히는 것을 방지하고 디퓨젼 후반부에서는 제약 조건을 강화하여 최종 결과가 안전하도록 보장한다. 
    @torch.no_grad()   #only for sampling
    def invariance_relax_cf(self, x, xp1, t):  # closed-form solution, ReS-diffuser for maze2d-large-v1

        x = x.squeeze(0)
        xp1 = xp1.squeeze(0)

        nBatch = x.shape[0]
        ref = xp1 - x

        #normalize obstacle 1, x-1, y-0  x = 1/12*np.cos(theta) + 5.5/12, y = 1/9*np.sin(theta) + 5/9
        xr = 2*1/(self.norm_maxs[1] - self.norm_mins[1])
        yr = 2*1/(self.norm_maxs[0] - self.norm_mins[0])
        off_x = 2*(5.8-0.5 - self.norm_mins[1])/(self.norm_maxs[1] - self.norm_mins[1]) - 1
        off_y = 2*(5-0.5 - self.norm_mins[0])/(self.norm_maxs[0] - self.norm_mins[0]) - 1

        #CBF
        b = ((x[:,2:3] - off_y)/yr)**2 + ((x[:,3:4] - off_x)/xr)**2 - 1 - 0.01
        Lfb = 0
        Lgbu1 = 2*((x[:,2:3] - off_y)/yr)/yr
        Lgbu2 = 2*((x[:,3:4] - off_x)/xr)/xr

        self.safe1 = torch.min(b[:,0] + 0.01)

        if t >= 10:   # debug  10
            sign = 100   #relax
        else:
            sign = 0   #non-relax

        rx0 = torch.zeros_like(Lgbu1).to(b.device)
        rx1 = sign*torch.ones_like(Lgbu1).to(b.device)

        G0 = torch.cat([-Lgbu1, -Lgbu2, rx1, rx0], dim = 1)
        k = 1
        h0 = Lfb + k*b

        #normalize obstacle 2,  x = 1/12*np.sqrt(np.abs(np.cos(theta)))*np.sign(np.cos(theta)) + 5.3/12, y = 1/9*np.sqrt(np.abs(np.sin(theta)))*np.sign(np.sin(theta)) + 2/9
        xr = 2*1/(self.norm_maxs[1] - self.norm_mins[1])
        yr = 2*1/(self.norm_maxs[0] - self.norm_mins[0])
        off_x = 2*(5.3-0.5 - self.norm_mins[1])/(self.norm_maxs[1] - self.norm_mins[1]) - 1
        off_y = 2*(2-0.5 - self.norm_mins[0])/(self.norm_maxs[0] - self.norm_mins[0]) - 1

        #CBF
        b = ((x[:,2:3] - off_y)/yr)**4 + ((x[:,3:4] - off_x)/xr)**4 - 1 - 0.01
        Lfb = 0
        Lgbu1 = 4*((x[:,2:3] - off_y)/yr)**3/yr
        Lgbu2 = 4*((x[:,3:4] - off_x)/xr)**3/xr

        self.safe2 = torch.min(b[:,0] + 0.01)

        G1 = torch.cat([-Lgbu1, -Lgbu2, rx0, rx1], dim = 1)
        k = 1
        h1 = Lfb + k*b
        
   
        q = -ref[:,2:4].to(G0.device)
        q0 = torch.zeros_like(q).to(G0.device)
        q = torch.cat([q, q0], dim = 1)

        y1_bar = 1*G0  # H or Q = identity matrix
        y2_bar = 1*G1
        u_bar = -1*q
        p1_bar = h0 - torch.sum(G0*u_bar,dim = 1).unsqueeze(1)
        p2_bar = h1 - torch.sum(G1*u_bar,dim = 1).unsqueeze(1)

        G = torch.cat([torch.sum(y1_bar*y1_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y1_bar*y2_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y2_bar*y1_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y2_bar*y2_bar,dim = 1).unsqueeze(1).unsqueeze(0)], dim = 0)
        #G = 1*[y1_bar*y1_bar', y1_bar*y2_bar'; y2_bar*y1_bar', y2_bar*y2_bar']
        w_p1_bar = torch.clamp(p1_bar, max=0)
        w_p2_bar = torch.clamp(p2_bar, max=0)

        # G 0-(1,1), 1-(1,2), 2-(2,1), 3-(2,2)
        lambda1 = torch.where(G[2]*w_p2_bar < G[3]*p1_bar, torch.zeros_like(p1_bar), torch.where(G[1]*w_p1_bar < G[0]*p2_bar, w_p1_bar/G[0], torch.clamp(G[3]*p1_bar - G[2]*p2_bar, max=0)/(G[0]*G[3] - G[1]*G[2])))
        
        lambda2 = torch.where(G[2]*w_p2_bar < G[3]*p1_bar, w_p2_bar/G[3], torch.where(G[1]*w_p1_bar < G[0]*p2_bar, torch.zeros_like(p1_bar), torch.clamp(G[0]*p2_bar - G[1]*p1_bar, max=0)/(G[0]*G[3] - G[1]*G[2])))

        out = lambda1*y1_bar + lambda2*y2_bar + u_bar
        rt = xp1.clone()      
        rt[:,2:4] = x[:,2:4] + out[:,0:2]
        rt = rt.unsqueeze(0)
        return rt

 

Time-varying-safe Diffuser

 

이 함수는 RoS, ReS와는 다른 방식으로 local trap 문제를 해결하는데, '안전 영역 자체를 확장'함으로써 제약을 적용하도록 한다. 

  • Relaxed-Safe Diffuser: 제어 입력 벡터에 완화 변수를 추가
  • Time-Varying-Safe Diffuser: CBF 함수 정의에 시간 의존적 항을 추가
    • 최적화 변수의 차원은 더 작지만, CBF 함수와 리 도함수가 더 복잡
@torch.no_grad()   #only for sampling
    def invariance_time_cf(self, x, xp1, t):  # closed-form solution, TVS-diffuser for maze2d-large-v1
        t_bias = 5  #50 

        x = x.squeeze(0)
        xp1 = xp1.squeeze(0)

        nBatch = x.shape[0]
        ref = xp1 - x

        #normalize obstacle 1, x-1, y-0  x = 1/12*np.cos(theta) + 5.5/12, y = 1/9*np.sin(theta) + 5/9
        xr = 2*1/(self.norm_maxs[1] - self.norm_mins[1])
        yr = 2*1/(self.norm_maxs[0] - self.norm_mins[0])
        off_x = 2*(5.8-0.5 - self.norm_mins[1])/(self.norm_maxs[1] - self.norm_mins[1]) - 1
        off_y = 2*(5-0.5 - self.norm_mins[0])/(self.norm_maxs[0] - self.norm_mins[0]) - 1

        #CBF
        b = ((x[:,2:3] - off_y)/yr)**2 + ((x[:,3:4] - off_x)/xr)**2 - nn.Sigmoid()(t_bias - t) -0.01
        Lfb = nn.Sigmoid()(t_bias - t)*(1 - nn.Sigmoid()(t_bias - t))
        Lgbu1 = 2*((x[:,2:3] - off_y)/yr)/yr
        Lgbu2 = 2*((x[:,3:4] - off_x)/xr)/xr

        self.safe1 = torch.min(b[:,0] + 0.01)

        G0 = torch.cat([-Lgbu1, -Lgbu2], dim = 1)
        k = 1  #0.3
        h0 = Lfb + k*b

        #normalize obstacle 2,  x = 1/12*np.sqrt(np.abs(np.cos(theta)))*np.sign(np.cos(theta)) + 5.3/12, y = 1/9*np.sqrt(np.abs(np.sin(theta)))*np.sign(np.sin(theta)) + 2/9
        xr = 2*1/(self.norm_maxs[1] - self.norm_mins[1])
        yr = 2*1/(self.norm_maxs[0] - self.norm_mins[0])
        off_x = 2*(5.3-0.5 - self.norm_mins[1])/(self.norm_maxs[1] - self.norm_mins[1]) - 1
        off_y = 2*(2-0.5 - self.norm_mins[0])/(self.norm_maxs[0] - self.norm_mins[0]) - 1

        #CBF
        b = ((x[:,2:3] - off_y)/yr)**4 + ((x[:,3:4] - off_x)/xr)**4 - nn.Sigmoid()(t_bias - t) - 0.01
        Lfb = nn.Sigmoid()(t_bias - t)*(1 - nn.Sigmoid()(t_bias - t))
        Lgbu1 = 4*((x[:,2:3] - off_y)/yr)**3/yr
        Lgbu2 = 4*((x[:,3:4] - off_x)/xr)**3/xr

        self.safe2 = torch.min(b[:,0] + 0.01)

        G1 = torch.cat([-Lgbu1, -Lgbu2], dim = 1)
        k = 1  #0.4
        h1 = Lfb + k*b
        
   
        q = -ref[:,2:4].to(G0.device)
        
        y1_bar = 1*G0  # H or Q = identity matrix
        y2_bar = 1*G1
        u_bar = -1*q
        p1_bar = h0 - torch.sum(G0*u_bar,dim = 1).unsqueeze(1)
        p2_bar = h1 - torch.sum(G1*u_bar,dim = 1).unsqueeze(1)

        G = torch.cat([torch.sum(y1_bar*y1_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y1_bar*y2_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y2_bar*y1_bar,dim = 1).unsqueeze(1).unsqueeze(0), torch.sum(y2_bar*y2_bar,dim = 1).unsqueeze(1).unsqueeze(0)], dim = 0)
        #G = 1*[y1_bar*y1_bar', y1_bar*y2_bar'; y2_bar*y1_bar', y2_bar*y2_bar']
        w_p1_bar = torch.clamp(p1_bar, max=0)
        w_p2_bar = torch.clamp(p2_bar, max=0)

        # G 0-(1,1), 1-(1,2), 2-(2,1), 3-(2,2)
        lambda1 = torch.where(G[2]*w_p2_bar < G[3]*p1_bar, torch.zeros_like(p1_bar), torch.where(G[1]*w_p1_bar < G[0]*p2_bar, w_p1_bar/G[0], torch.clamp(G[3]*p1_bar - G[2]*p2_bar, max=0)/(G[0]*G[3] - G[1]*G[2])))
        
        lambda2 = torch.where(G[2]*w_p2_bar < G[3]*p1_bar, w_p2_bar/G[3], torch.where(G[1]*w_p1_bar < G[0]*p2_bar, torch.zeros_like(p1_bar), torch.clamp(G[0]*p2_bar - G[1]*p1_bar, max=0)/(G[0]*G[3] - G[1]*G[2])))

        out = lambda1*y1_bar + lambda2*y2_bar + u_bar
        rt = xp1.clone()      
        rt[:,2:4] = x[:,2:4] + out
        # print(out)
        rt = rt.unsqueeze(0)
        return rt

 

Sampling

여러 옵션을 제공하고 있다. 

  • 원본 디퓨저만 사용
  • Shield와 Gradient Descent 방법
    • Shield: 안전하지 않은 상태를 차단하고 안전한 영역으로 투사
    • GD: 그래디언트 방법으로 안전 영역으로 유도

 

Result

 

원본 디퓨져

0 / 5000 / 10000

 

50000 / 100000 / 1999000

 

CBF 적용하여 Inference 하기

 

        # return xp1
        x = self.invariance_cf(x, xp1)  # RoS closed form  # RoS 방식 사용
        return x  # xp1 대신 x를 반환

일단 냅다 주석을 풀고 실행하면 이런 에러가 뜬다. 

https://github.com/Weixy21/SafeDiffuser/issues/2

 

TypeError: 'int' object is not subscriptable · Issue #2 · Weixy21/SafeDiffuser

In diffuser/models/diffusion.py, the following block of code seems to fail when running python scripts/train.py --dataset maze2d-medium-v1. x = xp1 xr = 2*1/(self.norm_maxs[1] - self.norm_mins[1]) ...

github.com

학습 중에는 Diffusion invariance를 사용하지 않고, inference 시에만 diffusion invariance를 사용한다는 얘기인데, Safe Diffuser의 safe contraints가 inference time에만 적용된다는 뜻이다.  (학습 중에 이런 제약을 적용하면 모델이 데이터로부터 분포를 잘 학습하는 것을 방해할 수 있기 때문에?)

어쨌든 그래서 plan_maze2d.py를 실행할 때

# /config/maze2d.py
'checkpoint': 'state_1960000.pt',  # 체크포인트 파일 추가
python scripts/plan_maze2d.py --config config.maze2d --dataset maze2d-large-v1 --logbase logs/maze2d-large-v1/diffusion --diffusion_loadpath H384_T256

Robust-Safe Diffuser
Relaxed-Safe Diffuser
Time-varying Safe Diffuser

 

Conditioning

 

우선 maze2d 환경에서 observation은 4차원 벡터로 들어온다. 

[x, y, dx, dy]
# x, y: 2D 공간에서 현재 위치 좌표
# dx, dy: 각각 x축과 y축 방향의 속도

observation = np.array([ 0.94875744,  2.93648809, -0.01347715,  0.06358764])
#                          x           y            dx          dy
    observation = np.array([ 0.94875744,  2.93648809, -0.01347715,  0.06358764])   # fix the initial position and final destination for comparison (not needed for general testing)
    total_reward = 0
    for t in range(env.max_episode_steps):

        state = env.state_vector().copy()

        ## can replan if desired, but the open-loop plans are good enough for maze2d
        ## that we really only need to plan once
        if t == 0:

            cond[0] = observation
            start = time.time()
            action, samples, diffusion_paths, safe1, safe2, elbo = policy(cond, batch_size=args.batch_size)
            end = time.time()
            comp_time.append(end-start)
            elbo_batch.append(elbo)
            
    #############################       single test
            # cond[0] = observation
            # action, samples, diffusion_paths, safe1, safe2 = policy(cond, batch_size=args.batch_size)  #policy.normalizer.normalizers['observations'].mins
            actions = samples.actions[0]
            sequence = samples.observations[0]
            diffusion_paths = diffusion_paths[0]
# policies.py
## run reverse diffusion process
        self.diffusion_model.norm_mins = self.normalizer.normalizers['observations'].mins
        self.diffusion_model.norm_maxs = self.normalizer.normalizers['observations'].maxs
        sample, diffusion = self.diffusion_model(conditions)
        sample, diffusion = self.diffusion_model(conditions)
    @torch.no_grad()
    def conditional_sample(self, cond, *args, horizon=None, return_diffusion = True, **kwargs):
        '''
            conditions : [ (time, state), ... ]
        '''
        device = self.betas.device
        batch_size = len(cond[0])
        horizon = horizon or self.horizon
        shape = (batch_size, horizon, self.transition_dim)

        return self.p_sample_loop(shape, cond, return_diffusion= return_diffusion, *args, **kwargs)   ## debug

어쨌든 최종적으로 호출되는 것은 이 conditional_sample 함수이다. 

goal point를 [4,4]로 했을 때.

 

Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.