Point Cloud 를 동심원 모양으로 분할하기 (Python)
- -
1. 문제 상황 : Sparse Point Cloud
RELLIS-3D 데이터셋의 포인트 클라우드를 다루던 도중, 일정 거리 이상으로 멀어지면 포인트 클라우드가 너무 Sparse 해 져서 문제가 자꾸 발생했습니다.
위 이미지는 포인트 클라우드를 Heightmap 이미지로 바꾼 것을 해당 위치에 재구성한 것입니다. 센서에서 멀어질 수록 포인트가 적어지는 것을 볼 수 있습니다. 따라서 이런 "Sparse Point Cloud" 문제를 해결하기 위해서 다양한 방법들이 제시되고 있습니다. 대표적으로는 다음과 같은 방법들이 있습니다.
1) interpolation : K-NN 등을 사용하여 주변의 다른 데이터 포인트를 사용하여 누락된 데이터를 채워 넣는 기술입니다.
2) Octree / KD-Tree 구조 등을 사용해 PC 데이터를 효율적으로 구성하고 조회하고 보간.
3) Super resolution
다양한 방법들이 있지만, 저는 분할하는 방법을 바꿈으로써 Sparse Point Cloud 문제를 해결하려고 했습니다.
이전 포스팅 (https://zzziito.tistory.com/40) 에서도 다룬 <Patchwork: Concentric Zone-based Region-wise Ground Segmentation with Ground Likelihood Estimation Using a 3D LiDAR Sensor> 논문에서 제시한 Concentric Zone Model 입니다.
CZM 기반 분할은, 센서에서 가까운 지역은 촘촘하게, 센서에서 먼 지역은 큼직하게 분할함으로써 각각의 grid 에 있는 포인트의 갯수를 비슷하게 맞추고자 했습니다. 제가 다루고 있는 문제에서도 이와 같은 접근법이 유용할 것이라고 생각했습니다.
또한 분할한 지역을 눈으로 확인하기 위해서 Patchwork 에서는 jsk_msgs polygon 을 이용해서 해당 정보를 발행합니다. 따라서 해당 부분도 반영하여 코드를 작성했습니다.
jsk_msgs 에 관해서는 다음 포스팅을 참고해 주세요.
2. Code
Patchwork 논문에 소개된 CZM 기반 포인트 클라우드 분할 코드는 이곳에 있습니다.
https://github.com/LimHyungTae/patchwork
이 코드는 (ROS / C++ / PCL) 기반으로 작성된 코드인데, 저는 (ROS / Python3 / Open3d) 환경에서 개발을 진행하고자 하여,
CZM 기반 포인트 클라우드 분할 부분 코드를 리팩토링했습니다.
결과
Code
#! /usr/bin/env python3
import rospy
import sensor_msgs.msg import PointCloud2
import sensor_msgs.point_cloud2 as pc2
import jsk_recognition_msgs.msg import PolygonArray
from geometry_msgs.msg import Point32, PolygonStamped
import numpy as np
from math import sin, cos
import math
import sklearn
### DEFINE CONSTANTS ###
MIN_POINTS = 100
min_range_ = 3
min_range_z2_ = 12.3625
min_range_z3_ = 22.025
min_range_z4_ = 41.35
max_range_ = 100.0
min_ranges_ = [min_range_, min_range_z2_, min_range_z3_, min_range_z4_]
sector_const = 32
num_zones = 4
num_sectors_each_zone = [16, 32 ,54, 32]
num_rings_each_zone = [2, 4, 4, 4]
ring_sizes_ = [(min_range_z2_ - min_range_) / num_rings_each_zone[0],
(min_range_z3_ - min_range_z2_) / num_rings_each_zone[1],
(min_range_z4_ - min_range_z3_) / num_rings_each_zone[2],
(max_range_ - min_range_z4_) / num_rings_each_zone[3]]
sector_sizes_ = [2 * math.pi / num_sectors_each_zone[0],
2 * math.pi / num_sectors_each_zone[1],
2 * math.pi / num_sectors_each_zone[2],
2 * math.pi / num_sectors_each_zone[3]]
# Define the color for each layer
layer_colors = [(255, 0, 0), (255, 165, 0), (0, 255, 0), (0, 0, 255)] # Red, Orange, Green, Blue
fields = [pc2.PointField('x', 0, pc2.PointField.FLOAT32, 1),
pc2.PointField('y', 4, pc2.PointField.FLOAT32, 1),
pc2.PointField('z', 8, pc2.PointField.FLOAT32, 1),
# Channel format: name, offset (in bytes), datatype, count
pc2.PointField('rgb', 12, pc2.PointField.FLOAT32, 1)]
### DEFINE NODES ###
pub = rospy.Publisher('output', PointCloud2, queue_size=1)
poly_pub = rospy.Publisher('polygons', PolygonArray, queue_size=1)
### CLASS FOR CZM ###
class Zone:
def __init__(self):
self.points = []
# 센서와 포인트 사이의 거리를 구하기 위한 함수
def xyz2radius(x,y):
return np.sqrt(x**2 + y**2)
def xy2theta(x, y):
theta = math.atan2(y, x)
if theta < 0:
theta += 2 * math.pi # theta가 음수일 경우, 2*pi를 더하여 양수로 변환
return theta
def rgb_to_float(color):
""" Converts 8-bit RGB values to a single floating-point number. """
hex_color = 0x0000 | (int(color[0]) << 16) | (int(color[1]) << 8) | int(color[2])
return struct.unpack('f', struct.pack('I', hex_color))[0]
#jsk_msgs polygon 을 발행하기 위한 함수
# Patchwork 에서는 zone 을 크게 4개로 나누고, zone 을 다시 ring 으로 나누고, ring 을 다시 sector 로 나누어 구역을 구분합니다.
def set_polygons(cloud_msg, zone_idx, r_idx, theta_idx, num_spilt, ring_size, sector_size, min_range):
polygon_stamped = PolygonStamped()
polygon_stamped.header = cloud_msg.header
MARKER_Z_VALUE = 0.0
#여기서 RL, RU, LU, 는 각각 (RL: Right Lower - 오른쪽 아래 모서리), (RU: Right Upper - 오른쪽 위 모서리), (LU: Left Upper - 왼쪽 위 모서리), (LL: Left Lower - 왼쪽 아래 모서리)
#를 뜻합니다.
zone_min_range = min_range[zone_idx]
r_len =r_idx * ring_size[zone_idx] + zone_min_range
angle = theta_idx * sector_size[zone_idx]
point = Point32(x = r_len * cos(angle),
y = r_len * sin(angle),
z = MARKER_Z_VALUE)
polygon_stamped.polygon.points.append(point)
# RU
r_len = r_len + ring_size[zone_idx]
point = Point32(x = r_len * cos(angle),
y = r_len * sin(angle),
z = MARKER_Z_VALUE)
polygon_stamped.polygon.points.append(point)
# RU -> LU
for idx in range(1, num_split + 1):
angle = angle + sector_size[zone_idx] / num_split
point = Point32(x = r_len * cos(angle),
y = r_len * sin(angle),
z = MARKER_Z_VALUE)
polygon_stamped.polygon.points.append(point)
r_len = r_len - ring_size[zone_idx]
point = Point32(x = r_len * cos(angle),
y = r_len * sin(angle),
z = MARKER_Z_VALUE)
polygon_stamped.polygon.points.append(point)
for idx in range(1, num_split):
angle = angle - sector_size[zone_idx] / num_split
point = Point32(x = r_len * cos(angle),
y = r_len * sin(angle),
z = MARKER_Z_VALUE)
polygon_stamped.polygon.points.append(point)
return polygon_stamped
# cloud_cb 는 point cloud 데이터를 받아올 때 수행되는 callback 함수입니다.
def cloud_cb(cloud_msg):
cloud = pc2.read_points(cloud_msg, field_names=("x","y","z","intensity"), skip_nans = True)
czm = [[[Zone() for _ in range(num_sectors_each_zone[zone_idx])]
for _ in range(num_rings_each_zone[zone_idx])
] for zone_idx in range(num_zones)]
for pt in cloud:
x, y, z, intensity = pt
r = xy2radius(x,y)
if r <= max_range_ and r > min_range_:
theta = xy2theta(x,y)
ring_idx =sector_idx = 0
if r < min_range_z2_:
ring_idx = min(int((r - min_range_) / ring_sizes_[0]), num_rings_each_zone[0]-1)
sector_idx = min(int(theta / sector_sizes_[0]), num_sectors_each_zone[0]-1)
czm[0][ring_idx][sector_idx].points.append([x, y, z, intensity, layer_colors[0]])
elif r < min_range_z3_:
ring_idx = min(int((r - min_range_z2_) / ring_sizes_[1]), num_rings_each_zone[1]-1)
sector_idx = min(int(theta / sector_sizes_[1]), num_sectors_each_zone[1]-1)
czm[1][ring_idx][sector_idx].points.append([x, y, z, intensity, layer_colors[1]])
elif r < min_range_z4_:
ring_idx = min(int((r - min_range_z3_) / ring_sizes_[2]), num_rings_each_zone[2]-1)
sector_idx = min(int(theta / sector_sizes_[2]), num_sectors_each_zone[2]-1)
czm[2][ring_idx][sector_idx].points.append([x, y, z, intensity, layer_colors[2]])
else:
ring_idx = min(int((r - min_range_z4_) / ring_sizes_[3]), num_rings_each_zone[3]-1)
sector_idx = min(int(theta / sector_sizes_[3]), num_sectors_each_zone[3]-1)
czm[3][ring_idx][sector_idx].points.append([x, y, z, intensity, layer_colors[3]])
#이때 저는 sector 에 일정 개수 이상의 점이 없다면 해당 섹터를 발행하지 않도록 하기 위해 다음 코드를 추가했습니다.
all_points = []
for layer in czm:
for ring in layer:
for sector in ring:
if len(sector.points) > MIN_POINTS:
all_points.extend(sector.points)
#Prepare PolygonArray message
polgon_msg = PolygonArray()
polygon_msg.header = cloud_msg.header
for zone_idx, layer in enumerate(czm):
for ring_idx, ring in enumerate(layer):
for sector_idx, sector in enumerate(ring):
if len(sector.points) > MIN_POINTS:
polygon_stamped = set_polygons(cloud_msg, zone_idx, ring_idx, 10, ring_sizes_, sector_sizes_, min_ranges_)
polygon_msg.polygons.append(polygon_stamped)
poly_pub.publish(polygon_msg)
if all_points:
output = pc2.create_cloud(cloud_msg.header, fields, [(pt[0], pt[1], pt[2], rgb_to_float(pt[4])) for pt in all_points])
pub.publish(output)
def listener():
rospy.init_node('bumpypatch_node', anonymous=True)
rospy.Subscriber("/os1_cloud_node/points", PointCloud2, cloud_cb)
rospy.spin()
if __name__ == '__main__':
listener()
이 코드의 문제점은, 삼중 for 문이 들어가 있어서 속도가 약간 느리다는 점입니다. 코드 개선 과정 중에 있습니다.
'Point Cloud' 카테고리의 다른 글
PointNet의 input point cloud processing에 대해서 (0) | 2025.02.04 |
---|---|
Point Cloud Descriptor (2) : FPFH, 3DSC, SHOT, NARF (0) | 2023.07.18 |
Point Cloud Descriptor (1) : PFH (0) | 2023.07.15 |
바닥을 찾아보자 - Plane Segmentation (0) | 2023.07.07 |
Point Cloud 를 군집화해보자 - Clustering : K-means, DBSCAN (1) | 2023.07.06 |
소중한 공감 감사합니다