Spaces:
Running
Running
| import copy | |
| import pickle | |
| from typing import Dict, List, Tuple, Union | |
| import numpy as np | |
| import torch | |
| import torch.distributed as dist | |
| from datasets import Dataset | |
| from .cocoeval import COCOeval | |
| # Typings | |
| _TYPING_BOX = Tuple[float, float, float, float] | |
| _TYPING_SCORES = List[float] | |
| _TYPING_LABELS = List[int] | |
| _TYPING_BOXES = List[_TYPING_BOX] | |
| _TYPING_PRED_REF = Union[_TYPING_SCORES, _TYPING_LABELS, _TYPING_BOXES] | |
| _TYPING_PREDICTION = Dict[str, _TYPING_PRED_REF] | |
| _TYPING_REFERENCE = Dict[str, _TYPING_PRED_REF] | |
| _TYPING_PREDICTIONS = Dict[int, _TYPING_PREDICTION] | |
| def convert_to_xywh(boxes: torch.Tensor) -> torch.Tensor: | |
| """ | |
| Convert bounding boxes from (xmin, ymin, xmax, ymax) format to (x, y, width, height) format. | |
| Args: | |
| boxes (torch.Tensor): Tensor of shape (N, 4) representing bounding boxes in \ | |
| (xmin, ymin, xmax, ymax) format. | |
| Returns: | |
| torch.Tensor: Tensor of shape (N, 4) representing bounding boxes in (x, y, width, height) \ | |
| format. | |
| """ | |
| xmin, ymin, xmax, ymax = boxes.unbind(1) | |
| return torch.stack((xmin, ymin, xmax - xmin, ymax - ymin), dim=1) | |
| def create_common_coco_eval( | |
| coco_eval: COCOeval, img_ids: List[int], eval_imgs: np.ndarray | |
| ) -> None: | |
| """ | |
| Create a common COCO evaluation by merging image IDs and evaluation images into the \ | |
| coco_eval object. | |
| Args: | |
| coco_eval: COCOeval evaluation object. | |
| img_ids (List[int]): Tensor of image IDs. | |
| eval_imgs (torch.Tensor): Tensor of evaluation images. | |
| """ | |
| img_ids, eval_imgs = merge(img_ids, eval_imgs) | |
| img_ids = list(img_ids) | |
| eval_imgs = list(eval_imgs.flatten()) | |
| coco_eval.evalImgs = eval_imgs | |
| coco_eval.params.imgIds = img_ids | |
| coco_eval._paramsEval = copy.deepcopy(coco_eval.params) | |
| def merge(img_ids: List[int], eval_imgs: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Merge image IDs and evaluation images from different processes. | |
| Args: | |
| img_ids (List[int]): List of image ID arrays from different processes. | |
| eval_imgs (np.ndarray): Evaluation images from different processes. | |
| Returns: | |
| Tuple[np.ndarray, np.ndarray]: Merged image IDs and evaluation images. | |
| """ | |
| all_img_ids = all_gather(img_ids) | |
| all_eval_imgs = all_gather(eval_imgs) | |
| merged_img_ids = [] | |
| for p in all_img_ids: | |
| merged_img_ids.extend(p) | |
| merged_eval_imgs = [] | |
| for p in all_eval_imgs: | |
| merged_eval_imgs.append(p) | |
| merged_img_ids = np.array(merged_img_ids) | |
| merged_eval_imgs = np.concatenate(merged_eval_imgs, 2) | |
| # keep only unique (and in sorted order) images | |
| merged_img_ids, idx = np.unique(merged_img_ids, return_index=True) | |
| merged_eval_imgs = merged_eval_imgs[..., idx] | |
| return merged_img_ids, merged_eval_imgs | |
| def all_gather(data: List[int]) -> List[List[int]]: | |
| """ | |
| Run all_gather on arbitrary picklable data (not necessarily tensors). | |
| Args: | |
| data (List[int]): any picklable object | |
| Returns: | |
| List[List[int]]: list of data gathered from each rank | |
| """ | |
| world_size = get_world_size() | |
| if world_size == 1: | |
| return [data] | |
| # serialized to a Tensor | |
| buffer = pickle.dumps(data) | |
| storage = torch.ByteStorage.from_buffer(buffer) | |
| tensor = torch.ByteTensor(storage).to("cuda") | |
| # obtain Tensor size of each rank | |
| local_size = torch.tensor([tensor.numel()], device="cuda") | |
| size_list = [torch.tensor([0], device="cuda") for _ in range(world_size)] | |
| dist.all_gather(size_list, local_size) | |
| size_list = [int(size.item()) for size in size_list] | |
| max_size = max(size_list) | |
| # receiving Tensor from all ranks | |
| # we pad the tensor because torch all_gather does not support | |
| # gathering tensors of different shapes | |
| tensor_list = [] | |
| for _ in size_list: | |
| tensor_list.append(torch.empty((max_size,), dtype=torch.uint8, device="cuda")) | |
| if local_size != max_size: | |
| padding = torch.empty( | |
| size=(max_size - local_size,), dtype=torch.uint8, device="cuda" | |
| ) | |
| tensor = torch.cat((tensor, padding), dim=0) | |
| dist.all_gather(tensor_list, tensor) | |
| data_list = [] | |
| for size, tensor in zip(size_list, tensor_list): | |
| buffer = tensor.cpu().numpy().tobytes()[:size] | |
| data_list.append(pickle.loads(buffer)) | |
| return data_list | |
| def get_world_size() -> int: | |
| """ | |
| Get the number of processes in the distributed environment. | |
| Returns: | |
| int: Number of processes. | |
| """ | |
| if not is_dist_avail_and_initialized(): | |
| return 1 | |
| return dist.get_world_size() | |
| def is_dist_avail_and_initialized() -> bool: | |
| """ | |
| Check if distributed environment is available and initialized. | |
| Returns: | |
| bool: True if distributed environment is available and initialized, False otherwise. | |
| """ | |
| return dist.is_available() and dist.is_initialized() | |