Spaces:
Sleeping
Sleeping
| import io | |
| import base64 | |
| import inspect | |
| import sys | |
| import os | |
| import types | |
| import shutil | |
| from unittest.mock import MagicMock | |
| import numpy as np | |
| import cv2 | |
| import torch | |
| import joblib | |
| import pandas as pd | |
| from pathlib import Path | |
| from scipy.spatial import distance | |
| from torchvision import transforms | |
| from PIL import Image | |
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI, File, UploadFile, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| import uvicorn | |
| from huggingface_hub import hf_hub_download | |
| # --- Compatibility Patches for Numpy and Inspect --- | |
| if not hasattr(inspect, "getargspec"): | |
| inspect.getargspec = inspect.getfullargspec | |
| for attr, typ in [("int", int), ("float", float), ("complex", complex), | |
| ("bool", bool), ("object", object), ("str", str), ("unicode", str)]: | |
| if not hasattr(np, attr): | |
| setattr(np, attr, typ) | |
| # --- Pyrender / OpenGL Mock (Headless Environment Fix) --- | |
| pyrender_mock = types.ModuleType("pyrender") | |
| for _attr in ["Scene", "Mesh", "Node", "PerspectiveCamera", "DirectionalLight", | |
| "PointLight", "SpotLight", "OffscreenRenderer", "RenderFlags", | |
| "Viewer", "MetallicRoughnessMaterial"]: | |
| setattr(pyrender_mock, _attr, MagicMock) | |
| sys.modules["pyrender"] = pyrender_mock | |
| for _mod in ["OpenGL", "OpenGL.GL", "OpenGL.GL.framebufferobjects", | |
| "OpenGL.platform", "OpenGL.error"]: | |
| if _mod not in sys.modules: | |
| sys.modules[_mod] = types.ModuleType(_mod) | |
| os.environ["PYOPENGL_PLATFORM"] = "osmesa" | |
| # --- Hugging Face Model Integration --- | |
| REPO_ID = "SondosM/api_GP" | |
| def get_hf_file(filename, is_mano=False): | |
| print(f"Downloading {filename} from {REPO_ID}...") | |
| temp_path = hf_hub_download(repo_id=REPO_ID, filename=filename) | |
| if is_mano: | |
| # Create local folder structure expected by WiLoR | |
| os.makedirs("./mano_data", exist_ok=True) | |
| target_path = os.path.join("./mano_data", os.path.basename(filename)) | |
| if not os.path.exists(target_path): | |
| shutil.copy(temp_path, target_path) | |
| print(f"Copied {filename} to {target_path}") | |
| return target_path | |
| return temp_path | |
| # --- Map paths according to your Repo list --- | |
| print("Initializing model file paths...") | |
| # MANO Files | |
| get_hf_file("mano_data/mano_data/mano_mean_params.npz", is_mano=True) | |
| get_hf_file("mano_data/mano_data/MANO_LEFT.pkl", is_mano=True) | |
| get_hf_file("mano_data/mano_data/MANO_RIGHT.pkl", is_mano=True) | |
| WILOR_REPO_PATH = "./WiLoR" | |
| # Model weights | |
| WILOR_CKPT = get_hf_file("pretrained_models/pretrained_models/wilor_final.ckpt") | |
| WILOR_CFG = get_hf_file("pretrained_models/pretrained_models/model_config.yaml") | |
| DETECTOR_PATH = get_hf_file("pretrained_models/pretrained_models/detector.pt") | |
| # Classifier | |
| CLASSIFIER_PATH = get_hf_file("classifier.pkl") | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| WILOR_TRANSFORM = transforms.Compose([ | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) | |
| ]) | |
| wilor_model = None | |
| yolo_detector = None | |
| classifier = None | |
| def load_models(): | |
| global wilor_model, yolo_detector, classifier | |
| sys.path.insert(0, WILOR_REPO_PATH) | |
| from wilor.models import load_wilor | |
| from ultralytics import YOLO | |
| print(f"Loading WiLoR on {DEVICE}...") | |
| wilor_model, _ = load_wilor(checkpoint_path=WILOR_CKPT, cfg_path=WILOR_CFG) | |
| wilor_model.to(DEVICE) | |
| wilor_model.eval() | |
| print(f"Loading YOLO detector...") | |
| yolo_detector = YOLO(DETECTOR_PATH) | |
| print("Loading RandomForest classifier...") | |
| classifier = joblib.load(CLASSIFIER_PATH) | |
| print("✅ All models loaded successfully!") | |
| async def lifespan(app: FastAPI): | |
| load_models() | |
| yield | |
| app = FastAPI(title="Arabic Sign Language Interpreter", lifespan=lifespan) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def extract_features(crop_rgb: np.ndarray) -> np.ndarray | None: | |
| img_input = cv2.resize(crop_rgb, (256, 256)) | |
| img_tensor = WILOR_TRANSFORM(img_input).unsqueeze(0).to(DEVICE) | |
| with torch.no_grad(): | |
| output = wilor_model({"img": img_tensor}) | |
| if "pred_mano_params" not in output or "pred_keypoints_3d" not in output: | |
| return None | |
| mano = output["pred_mano_params"] | |
| hand_pose = mano["hand_pose"][0].cpu().numpy().flatten() | |
| global_orient = mano["global_orient"][0].cpu().numpy().flatten() | |
| theta = np.concatenate([global_orient, hand_pose]) | |
| joints = output["pred_keypoints_3d"][0].cpu().numpy() | |
| tips = [4, 8, 12, 16, 20] | |
| hand_scale = distance.euclidean(joints[0], joints[9]) + 1e-8 | |
| dist_feats = [] | |
| for i in range(1, 5): | |
| dist_feats.append(distance.euclidean(joints[tips[0]], joints[tips[i]]) / hand_scale) | |
| for i in range(1, 4): | |
| dist_feats.append(distance.euclidean(joints[tips[i]], joints[tips[i+1]]) / hand_scale) | |
| return np.concatenate([theta, dist_feats]) | |
| def get_3d_joints(crop_rgb: np.ndarray) -> np.ndarray: | |
| img_input = cv2.resize(crop_rgb, (256, 256)) | |
| img_tensor = WILOR_TRANSFORM(img_input).unsqueeze(0).to(DEVICE) | |
| with torch.no_grad(): | |
| output = wilor_model({"img": img_tensor}) | |
| return output["pred_keypoints_3d"][0].cpu().numpy() | |
| def read_image_from_upload(file_bytes: bytes) -> np.ndarray: | |
| arr = np.frombuffer(file_bytes, np.uint8) | |
| img = cv2.imdecode(arr, cv2.IMREAD_COLOR) | |
| if img is None: | |
| raise HTTPException(status_code=400, detail="Invalid image format.") | |
| return img | |
| def root(): | |
| return {"status": "running", "device": DEVICE} | |
| async def predict(file: UploadFile = File(...)): | |
| raw = await file.read() | |
| img_bgr = read_image_from_upload(raw) | |
| img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) | |
| results = yolo_detector.predict(img_rgb, conf=0.5, verbose=False, device=DEVICE) | |
| if not results[0].boxes: | |
| raise HTTPException(status_code=422, detail="No hand detected.") | |
| box = results[0].boxes.xyxy[0].cpu().numpy().astype(int) | |
| label_id = int(results[0].boxes.cls[0].cpu().item()) | |
| hand_side = "left" if label_id == 0 else "right" | |
| x1, y1, x2, y2 = box | |
| h, w = img_rgb.shape[:2] | |
| x1, y1, x2, y2 = max(0, x1), max(0, y1), min(w, x2), min(h, y2) | |
| crop = img_rgb[y1:y2, x1:x2] | |
| if crop.size == 0: | |
| raise HTTPException(status_code=422, detail="Empty hand crop.") | |
| features = extract_features(crop) | |
| if features is None: | |
| raise HTTPException(status_code=500, detail="Feature extraction failed.") | |
| expected_cols = classifier.feature_names_in_ | |
| final_vector = np.zeros(len(expected_cols)) | |
| limit = min(len(features), len(final_vector)) | |
| final_vector[:limit] = features[:limit] | |
| feat_df = pd.DataFrame([final_vector], columns=expected_cols) | |
| prediction = classifier.predict(feat_df)[0] | |
| proba = classifier.predict_proba(feat_df)[0] | |
| return JSONResponse({ | |
| "prediction": str(prediction), | |
| "confidence": round(float(proba.max()), 4), | |
| "hand_side": hand_side, | |
| "bbox": [int(x1), int(y1), int(x2), int(y2)], | |
| }) | |
| async def predict_with_skeleton(file: UploadFile = File(...)): | |
| raw = await file.read() | |
| img_bgr = read_image_from_upload(raw) | |
| img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) | |
| results = yolo_detector.predict(img_rgb, conf=0.5, verbose=False, device=DEVICE) | |
| if not results[0].boxes: | |
| raise HTTPException(status_code=422, detail="No hand detected.") | |
| box = results[0].boxes.xyxy[0].cpu().numpy().astype(int) | |
| label_id = int(results[0].boxes.cls[0].cpu().item()) | |
| hand_side = "left" if label_id == 0 else "right" | |
| x1, y1, x2, y2 = box | |
| h, w = img_rgb.shape[:2] | |
| x1, y1, x2, y2 = max(0, x1), max(0, y1), min(w, x2), min(h, y2) | |
| crop = img_rgb[y1:y2, x1:x2] | |
| features = extract_features(crop) | |
| joints = get_3d_joints(crop) | |
| expected_cols = classifier.feature_names_in_ | |
| final_vector = np.zeros(len(expected_cols)) | |
| limit = min(len(features), len(final_vector)) | |
| final_vector[:limit] = features[:limit] | |
| feat_df = pd.DataFrame([final_vector], columns=expected_cols) | |
| prediction = classifier.predict(feat_df)[0] | |
| proba = classifier.predict_proba(feat_df)[0] | |
| _, buf = cv2.imencode(".png", cv2.cvtColor(crop, cv2.COLOR_RGB2BGR)) | |
| crop_b64 = base64.b64encode(buf).decode("utf-8") | |
| return JSONResponse({ | |
| "prediction": str(prediction), | |
| "confidence": round(float(proba.max()), 4), | |
| "hand_side": hand_side, | |
| "bbox": [int(x1), int(y1), int(x2), int(y2)], | |
| "joints_3d": joints.tolist(), | |
| "crop_b64": crop_b64, | |
| }) | |
| if __name__ == "__main__": | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False) |