Spaces:
Running
on
Zero
Running
on
Zero
| # adapted from https://github.com/jzhangbs/DTUeval-python | |
| import numpy as np | |
| import open3d as o3d | |
| import sklearn.neighbors as skln | |
| from tqdm import tqdm | |
| from scipy.io import loadmat | |
| import multiprocessing as mp | |
| import argparse | |
| def sample_single_tri(input_): | |
| n1, n2, v1, v2, tri_vert = input_ | |
| c = np.mgrid[:n1+1, :n2+1] | |
| c += 0.5 | |
| c[0] /= max(n1, 1e-7) | |
| c[1] /= max(n2, 1e-7) | |
| c = np.transpose(c, (1,2,0)) | |
| k = c[c.sum(axis=-1) < 1] # m2 | |
| q = v1 * k[:,:1] + v2 * k[:,1:] + tri_vert | |
| return q | |
| def write_vis_pcd(file, points, colors): | |
| pcd = o3d.geometry.PointCloud() | |
| pcd.points = o3d.utility.Vector3dVector(points) | |
| pcd.colors = o3d.utility.Vector3dVector(colors) | |
| o3d.io.write_point_cloud(file, pcd) | |
| if __name__ == '__main__': | |
| mp.freeze_support() | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--data', type=str, default='data_in.ply') | |
| parser.add_argument('--scan', type=int, default=1) | |
| parser.add_argument('--mode', type=str, default='mesh', choices=['mesh', 'pcd']) | |
| parser.add_argument('--dataset_dir', type=str, default='.') | |
| parser.add_argument('--vis_out_dir', type=str, default='.') | |
| parser.add_argument('--downsample_density', type=float, default=0.2) | |
| parser.add_argument('--patch_size', type=float, default=60) | |
| parser.add_argument('--max_dist', type=float, default=20) | |
| parser.add_argument('--visualize_threshold', type=float, default=10) | |
| args = parser.parse_args() | |
| thresh = args.downsample_density | |
| if args.mode == 'mesh': | |
| pbar = tqdm(total=9) | |
| pbar.set_description('read data mesh') | |
| data_mesh = o3d.io.read_triangle_mesh(args.data) | |
| vertices = np.asarray(data_mesh.vertices) | |
| triangles = np.asarray(data_mesh.triangles) | |
| tri_vert = vertices[triangles] | |
| pbar.update(1) | |
| pbar.set_description('sample pcd from mesh') | |
| v1 = tri_vert[:,1] - tri_vert[:,0] | |
| v2 = tri_vert[:,2] - tri_vert[:,0] | |
| l1 = np.linalg.norm(v1, axis=-1, keepdims=True) | |
| l2 = np.linalg.norm(v2, axis=-1, keepdims=True) | |
| area2 = np.linalg.norm(np.cross(v1, v2), axis=-1, keepdims=True) | |
| non_zero_area = (area2 > 0)[:,0] | |
| l1, l2, area2, v1, v2, tri_vert = [ | |
| arr[non_zero_area] for arr in [l1, l2, area2, v1, v2, tri_vert] | |
| ] | |
| thr = thresh * np.sqrt(l1 * l2 / area2) | |
| n1 = np.floor(l1 / thr) | |
| n2 = np.floor(l2 / thr) | |
| with mp.Pool() as mp_pool: | |
| new_pts = mp_pool.map(sample_single_tri, ((n1[i,0], n2[i,0], v1[i:i+1], v2[i:i+1], tri_vert[i:i+1,0]) for i in range(len(n1))), chunksize=1024) | |
| new_pts = np.concatenate(new_pts, axis=0) | |
| data_pcd = np.concatenate([vertices, new_pts], axis=0) | |
| elif args.mode == 'pcd': | |
| pbar = tqdm(total=8) | |
| pbar.set_description('read data pcd') | |
| data_pcd_o3d = o3d.io.read_point_cloud(args.data) | |
| data_pcd = np.asarray(data_pcd_o3d.points) | |
| pbar.update(1) | |
| pbar.set_description('random shuffle pcd index') | |
| shuffle_rng = np.random.default_rng() | |
| shuffle_rng.shuffle(data_pcd, axis=0) | |
| pbar.update(1) | |
| pbar.set_description('downsample pcd') | |
| nn_engine = skln.NearestNeighbors(n_neighbors=1, radius=thresh, algorithm='kd_tree', n_jobs=-1) | |
| nn_engine.fit(data_pcd) | |
| rnn_idxs = nn_engine.radius_neighbors(data_pcd, radius=thresh, return_distance=False) | |
| mask = np.ones(data_pcd.shape[0], dtype=np.bool_) | |
| for curr, idxs in enumerate(rnn_idxs): | |
| if mask[curr]: | |
| mask[idxs] = 0 | |
| mask[curr] = 1 | |
| data_down = data_pcd[mask] | |
| pbar.update(1) | |
| pbar.set_description('masking data pcd') | |
| obs_mask_file = loadmat(f'{args.dataset_dir}/ObsMask/ObsMask{args.scan}_10.mat') | |
| ObsMask, BB, Res = [obs_mask_file[attr] for attr in ['ObsMask', 'BB', 'Res']] | |
| BB = BB.astype(np.float32) | |
| patch = args.patch_size | |
| inbound = ((data_down >= BB[:1]-patch) & (data_down < BB[1:]+patch*2)).sum(axis=-1) ==3 | |
| data_in = data_down[inbound] | |
| data_grid = np.around((data_in - BB[:1]) / Res).astype(np.int32) | |
| grid_inbound = ((data_grid >= 0) & (data_grid < np.expand_dims(ObsMask.shape, 0))).sum(axis=-1) ==3 | |
| data_grid_in = data_grid[grid_inbound] | |
| in_obs = ObsMask[data_grid_in[:,0], data_grid_in[:,1], data_grid_in[:,2]].astype(np.bool_) | |
| data_in_obs = data_in[grid_inbound][in_obs] | |
| pbar.update(1) | |
| pbar.set_description('read STL pcd') | |
| stl_pcd = o3d.io.read_point_cloud(f'{args.dataset_dir}/Points/stl/stl{args.scan:03}_total.ply') | |
| stl = np.asarray(stl_pcd.points) | |
| pbar.update(1) | |
| pbar.set_description('compute data2stl') | |
| nn_engine.fit(stl) | |
| dist_d2s, idx_d2s = nn_engine.kneighbors(data_in_obs, n_neighbors=1, return_distance=True) | |
| max_dist = args.max_dist | |
| mean_d2s = dist_d2s[dist_d2s < max_dist].mean() | |
| pbar.update(1) | |
| pbar.set_description('compute stl2data') | |
| ground_plane = loadmat(f'{args.dataset_dir}/ObsMask/Plane{args.scan}.mat')['P'] | |
| stl_hom = np.concatenate([stl, np.ones_like(stl[:,:1])], -1) | |
| above = (ground_plane.reshape((1,4)) * stl_hom).sum(-1) > 0 | |
| stl_above = stl[above] | |
| nn_engine.fit(data_in) | |
| dist_s2d, idx_s2d = nn_engine.kneighbors(stl_above, n_neighbors=1, return_distance=True) | |
| mean_s2d = dist_s2d[dist_s2d < max_dist].mean() | |
| pbar.update(1) | |
| pbar.set_description('visualize error') | |
| vis_dist = args.visualize_threshold | |
| R = np.array([[1,0,0]], dtype=np.float64) | |
| G = np.array([[0,1,0]], dtype=np.float64) | |
| B = np.array([[0,0,1]], dtype=np.float64) | |
| W = np.array([[1,1,1]], dtype=np.float64) | |
| data_color = np.tile(B, (data_down.shape[0], 1)) | |
| data_alpha = dist_d2s.clip(max=vis_dist) / vis_dist | |
| data_color[ np.where(inbound)[0][grid_inbound][in_obs] ] = R * data_alpha + W * (1-data_alpha) | |
| data_color[ np.where(inbound)[0][grid_inbound][in_obs][dist_d2s[:,0] >= max_dist] ] = G | |
| write_vis_pcd(f'{args.vis_out_dir}/vis_{args.scan:03}_d2s.ply', data_down, data_color) | |
| stl_color = np.tile(B, (stl.shape[0], 1)) | |
| stl_alpha = dist_s2d.clip(max=vis_dist) / vis_dist | |
| stl_color[ np.where(above)[0] ] = R * stl_alpha + W * (1-stl_alpha) | |
| stl_color[ np.where(above)[0][dist_s2d[:,0] >= max_dist] ] = G | |
| write_vis_pcd(f'{args.vis_out_dir}/vis_{args.scan:03}_s2d.ply', stl, stl_color) | |
| pbar.update(1) | |
| pbar.set_description('done') | |
| pbar.close() | |
| over_all = (mean_d2s + mean_s2d) / 2 | |
| print(mean_d2s, mean_s2d, over_all) | |
| import json | |
| with open(f'{args.vis_out_dir}/results.json', 'w') as fp: | |
| json.dump({ | |
| 'mean_d2s': mean_d2s, | |
| 'mean_s2d': mean_s2d, | |
| 'overall': over_all, | |
| }, fp, indent=True) | |