Spaces:
Running
Running
| from __future__ import annotations | |
| import sys | |
| import tempfile | |
| import cv2 | |
| import huggingface_hub | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| sys.path.insert(0, "ViTPose/") | |
| from mmdet.apis import inference_detector, init_detector | |
| from mmpose.apis import ( | |
| inference_top_down_pose_model, | |
| init_pose_model, | |
| process_mmdet_results, | |
| vis_pose_result, | |
| ) | |
| class DetModel: | |
| MODEL_DICT = { | |
| "YOLOX-tiny": { | |
| "config": "mmdet_configs/configs/yolox/yolox_tiny_8x8_300e_coco.py", | |
| "model": "https://download.openmmlab.com/mmdetection/v2.0/yolox/yolox_tiny_8x8_300e_coco/yolox_tiny_8x8_300e_coco_20211124_171234-b4047906.pth", | |
| }, | |
| "YOLOX-s": { | |
| "config": "mmdet_configs/configs/yolox/yolox_s_8x8_300e_coco.py", | |
| "model": "https://download.openmmlab.com/mmdetection/v2.0/yolox/yolox_s_8x8_300e_coco/yolox_s_8x8_300e_coco_20211121_095711-4592a793.pth", | |
| }, | |
| "YOLOX-l": { | |
| "config": "mmdet_configs/configs/yolox/yolox_l_8x8_300e_coco.py", | |
| "model": "https://download.openmmlab.com/mmdetection/v2.0/yolox/yolox_l_8x8_300e_coco/yolox_l_8x8_300e_coco_20211126_140236-d3bd2b23.pth", | |
| }, | |
| "YOLOX-x": { | |
| "config": "mmdet_configs/configs/yolox/yolox_x_8x8_300e_coco.py", | |
| "model": "https://download.openmmlab.com/mmdetection/v2.0/yolox/yolox_x_8x8_300e_coco/yolox_x_8x8_300e_coco_20211126_140254-1ef88d67.pth", | |
| }, | |
| } | |
| def __init__(self): | |
| self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
| self._load_all_models_once() | |
| self.model_name = "YOLOX-l" | |
| self.model = self._load_model(self.model_name) | |
| def _load_all_models_once(self) -> None: | |
| for name in self.MODEL_DICT: | |
| self._load_model(name) | |
| def _load_model(self, name: str) -> nn.Module: | |
| d = self.MODEL_DICT[name] | |
| return init_detector(d["config"], d["model"], device=self.device) | |
| def set_model(self, name: str) -> None: | |
| if name == self.model_name: | |
| return | |
| self.model_name = name | |
| self.model = self._load_model(name) | |
| def detect_and_visualize(self, image: np.ndarray, score_threshold: float) -> tuple[list[np.ndarray], np.ndarray]: | |
| out = self.detect(image) | |
| vis = self.visualize_detection_results(image, out, score_threshold) | |
| return out, vis | |
| def detect(self, image: np.ndarray) -> list[np.ndarray]: | |
| image = image[:, :, ::-1] # RGB -> BGR | |
| out = inference_detector(self.model, image) | |
| return out | |
| def visualize_detection_results( | |
| self, image: np.ndarray, detection_results: list[np.ndarray], score_threshold: float = 0.3 | |
| ) -> np.ndarray: | |
| person_det = [detection_results[0]] + [np.array([]).reshape(0, 5)] * 79 | |
| image = image[:, :, ::-1] # RGB -> BGR | |
| vis = self.model.show_result( | |
| image, person_det, score_thr=score_threshold, bbox_color=None, text_color=(200, 200, 200), mask_color=None | |
| ) | |
| return vis[:, :, ::-1] # BGR -> RGB | |
| class PoseModel: | |
| MODEL_DICT = { | |
| "ViTPose-B (single-task train)": { | |
| "config": "ViTPose/configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/ViTPose_base_coco_256x192.py", | |
| "model": "models/vitpose-b.pth", | |
| }, | |
| "ViTPose-L (single-task train)": { | |
| "config": "ViTPose/configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/ViTPose_large_coco_256x192.py", | |
| "model": "models/vitpose-l.pth", | |
| }, | |
| "ViTPose-B (multi-task train, COCO)": { | |
| "config": "ViTPose/configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/ViTPose_base_coco_256x192.py", | |
| "model": "models/vitpose-b-multi-coco.pth", | |
| }, | |
| "ViTPose-L (multi-task train, COCO)": { | |
| "config": "ViTPose/configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/ViTPose_large_coco_256x192.py", | |
| "model": "models/vitpose-l-multi-coco.pth", | |
| }, | |
| } | |
| def __init__(self): | |
| self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
| self.model_name = "ViTPose-B (multi-task train, COCO)" | |
| self.model = self._load_model(self.model_name) | |
| def _load_all_models_once(self) -> None: | |
| for name in self.MODEL_DICT: | |
| self._load_model(name) | |
| def _load_model(self, name: str) -> nn.Module: | |
| d = self.MODEL_DICT[name] | |
| ckpt_path = huggingface_hub.hf_hub_download("public-data/ViTPose", d["model"]) | |
| model = init_pose_model(d["config"], ckpt_path, device=self.device) | |
| return model | |
| def set_model(self, name: str) -> None: | |
| if name == self.model_name: | |
| return | |
| self.model_name = name | |
| self.model = self._load_model(name) | |
| def predict_pose_and_visualize( | |
| self, | |
| image: np.ndarray, | |
| det_results: list[np.ndarray], | |
| box_score_threshold: float, | |
| kpt_score_threshold: float, | |
| vis_dot_radius: int, | |
| vis_line_thickness: int, | |
| ) -> tuple[list[dict[str, np.ndarray]], np.ndarray]: | |
| out = self.predict_pose(image, det_results, box_score_threshold) | |
| vis = self.visualize_pose_results(image, out, kpt_score_threshold, vis_dot_radius, vis_line_thickness) | |
| return out, vis | |
| def predict_pose( | |
| self, image: np.ndarray, det_results: list[np.ndarray], box_score_threshold: float = 0.5 | |
| ) -> list[dict[str, np.ndarray]]: | |
| image = image[:, :, ::-1] # RGB -> BGR | |
| person_results = process_mmdet_results(det_results, 1) | |
| out, _ = inference_top_down_pose_model( | |
| self.model, image, person_results=person_results, bbox_thr=box_score_threshold, format="xyxy" | |
| ) | |
| return out | |
| def visualize_pose_results( | |
| self, | |
| image: np.ndarray, | |
| pose_results: list[dict[str, np.ndarray]], | |
| kpt_score_threshold: float = 0.3, | |
| vis_dot_radius: int = 4, | |
| vis_line_thickness: int = 1, | |
| ) -> np.ndarray: | |
| image = image[:, :, ::-1] # RGB -> BGR | |
| vis = vis_pose_result( | |
| self.model, | |
| image, | |
| pose_results, | |
| kpt_score_thr=kpt_score_threshold, | |
| radius=vis_dot_radius, | |
| thickness=vis_line_thickness, | |
| ) | |
| return vis[:, :, ::-1] # BGR -> RGB | |
| class AppModel: | |
| def __init__(self): | |
| self.det_model = DetModel() | |
| self.pose_model = PoseModel() | |
| def run( | |
| self, | |
| video_path: str, | |
| det_model_name: str, | |
| pose_model_name: str, | |
| box_score_threshold: float, | |
| max_num_frames: int, | |
| kpt_score_threshold: float, | |
| vis_dot_radius: int, | |
| vis_line_thickness: int, | |
| ) -> tuple[str, list[list[dict[str, np.ndarray]]]]: | |
| if video_path is None: | |
| return | |
| self.det_model.set_model(det_model_name) | |
| self.pose_model.set_model(pose_model_name) | |
| cap = cv2.VideoCapture(video_path) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| preds_all = [] | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
| out_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) | |
| writer = cv2.VideoWriter(out_file.name, fourcc, fps, (width, height)) | |
| for _ in range(max_num_frames): | |
| ok, frame = cap.read() | |
| if not ok: | |
| break | |
| rgb_frame = frame[:, :, ::-1] | |
| det_preds = self.det_model.detect(rgb_frame) | |
| preds, vis = self.pose_model.predict_pose_and_visualize( | |
| rgb_frame, det_preds, box_score_threshold, kpt_score_threshold, vis_dot_radius, vis_line_thickness | |
| ) | |
| preds_all.append(preds) | |
| writer.write(vis[:, :, ::-1]) | |
| cap.release() | |
| writer.release() | |
| return out_file.name, preds_all | |
| def visualize_pose_results( | |
| self, | |
| video_path: str, | |
| pose_preds_all: list[list[dict[str, np.ndarray]]], | |
| kpt_score_threshold: float, | |
| vis_dot_radius: int, | |
| vis_line_thickness: int, | |
| ) -> str: | |
| if video_path is None or pose_preds_all is None: | |
| return | |
| cap = cv2.VideoCapture(video_path) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
| out_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) | |
| writer = cv2.VideoWriter(out_file.name, fourcc, fps, (width, height)) | |
| for pose_preds in pose_preds_all: | |
| ok, frame = cap.read() | |
| if not ok: | |
| break | |
| rgb_frame = frame[:, :, ::-1] | |
| vis = self.pose_model.visualize_pose_results( | |
| rgb_frame, pose_preds, kpt_score_threshold, vis_dot_radius, vis_line_thickness | |
| ) | |
| writer.write(vis[:, :, ::-1]) | |
| cap.release() | |
| writer.release() | |
| return out_file.name | |