Spaces:
Running
Running
| """ | |
| File: face_utils.py | |
| Author: Elena Ryumina and Dmitry Ryumin | |
| Description: This module contains utility functions related to facial landmarks and image processing. | |
| License: MIT License | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| import math | |
| import subprocess | |
| import torchaudio | |
| import torch | |
| import os | |
| from PIL import Image | |
| from torchvision import transforms | |
| # Importing necessary components for the Gradio app | |
| from app.config import NAME_EMO_AUDIO, DICT_CE, config_data | |
| from app.plot import plot_compound_expression_prediction, plot_audio | |
| def norm_coordinates(normalized_x, normalized_y, image_width, image_height): | |
| x_px = min(math.floor(normalized_x * image_width), image_width - 1) | |
| y_px = min(math.floor(normalized_y * image_height), image_height - 1) | |
| return x_px, y_px | |
| def get_box(fl, w, h): | |
| idx_to_coors = {} | |
| for idx, landmark in enumerate(fl.landmark): | |
| landmark_px = norm_coordinates(landmark.x, landmark.y, w, h) | |
| if landmark_px: | |
| idx_to_coors[idx] = landmark_px | |
| x_min = np.min(np.asarray(list(idx_to_coors.values()))[:, 0]) | |
| y_min = np.min(np.asarray(list(idx_to_coors.values()))[:, 1]) | |
| endX = np.max(np.asarray(list(idx_to_coors.values()))[:, 0]) | |
| endY = np.max(np.asarray(list(idx_to_coors.values()))[:, 1]) | |
| (startX, startY) = (max(0, x_min), max(0, y_min)) | |
| (endX, endY) = (min(w - 1, endX), min(h - 1, endY)) | |
| return startX, startY, endX, endY | |
| def pth_processing(fp): | |
| class PreprocessInput(torch.nn.Module): | |
| def init(self): | |
| super(PreprocessInput, self).init() | |
| def forward(self, x): | |
| x = x.to(torch.float32) | |
| x = torch.flip(x, dims=(0,)) | |
| x[0, :, :] -= 91.4953 | |
| x[1, :, :] -= 103.8827 | |
| x[2, :, :] -= 131.0912 | |
| return x | |
| def get_img_torch(img, target_size=(224, 224)): | |
| transform = transforms.Compose([transforms.PILToTensor(), PreprocessInput()]) | |
| img = img.resize(target_size, Image.Resampling.NEAREST) | |
| img = transform(img) | |
| img = torch.unsqueeze(img, 0) | |
| return img | |
| return get_img_torch(fp) | |
| def convert_webm_to_mp4(input_file): | |
| path_save = input_file.split('.')[0] + ".mp4" | |
| if not os.path.exists(path_save): | |
| ff_video = "ffmpeg -i {} -c:v copy -c:a aac -strict experimental {}".format( | |
| input_file, path_save | |
| ) | |
| subprocess.call(ff_video, shell=True) | |
| return path_save | |
| def convert_mp4_to_mp3(path, frame_indices, fps, sampling_rate=16000): | |
| path_save = path.split('.')[0] + ".wav" | |
| if not os.path.exists(path_save): | |
| ff_audio = "ffmpeg -i {} -vn -acodec pcm_s16le -ar 44100 -ac 2 {}".format( | |
| path, path_save | |
| ) | |
| subprocess.call(ff_audio, shell=True) | |
| wav, sr = torchaudio.load(path_save) | |
| num_frames = wav.numpy().shape[1] | |
| time_axis = [i / sr for i in range(num_frames)] | |
| plt = plot_audio(time_axis, wav, frame_indices, fps, (12, 2)) | |
| if wav.size(0) > 1: | |
| wav = wav.mean(dim=0, keepdim=True) | |
| if sr != sampling_rate: | |
| transform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=sampling_rate) | |
| wav = transform(wav) | |
| sr = sampling_rate | |
| assert sr == sampling_rate | |
| return wav.squeeze(0), plt | |
| def pad_wav(wav, max_length): | |
| current_length = len(wav) | |
| if current_length < max_length: | |
| repetitions = (max_length + current_length - 1) // current_length | |
| wav = torch.cat([wav] * repetitions, dim=0)[:max_length] | |
| elif current_length > max_length: | |
| wav = wav[:max_length] | |
| return wav | |
| def pad_wav_zeros(wav, max_length, mode="constant"): | |
| if mode == "mean": | |
| wav = torch.nn.functional.pad( | |
| wav, | |
| (0, max(0, max_length - wav.shape[0])), | |
| mode="constant", | |
| value=torch.mean(wav), | |
| ) | |
| else: | |
| wav = torch.nn.functional.pad( | |
| wav, (0, max(0, max_length - wav.shape[0])), mode=mode | |
| ) | |
| return wav | |
| def softmax(matrix): | |
| exp_matrix = np.exp(matrix - np.max(matrix, axis=1, keepdims=True)) | |
| return exp_matrix / np.sum(exp_matrix, axis=1, keepdims=True) | |
| def get_compound_expression(pred, com_emo): | |
| pred = np.asarray(pred) | |
| prob = np.zeros((len(pred), len(com_emo))) | |
| for idx, (_, v) in enumerate(com_emo.items()): | |
| idx_1 = v[0] | |
| idx_2 = v[1] | |
| prob[:, idx] = pred[:, idx_1] + pred[:, idx_2] | |
| return prob | |
| def get_image_location(curr_video, frame): | |
| frame = int(frame.split(".")[0]) + 1 | |
| frame = str(frame).zfill(5) + ".jpg" | |
| return f"{curr_video}/{frame}" | |
| def save_txt(column_names, file_names, labels, save_name): | |
| data_lines = [",".join(column_names)] | |
| for file_name, label in zip(file_names, labels): | |
| data_lines.append(f"{file_name},{label}") | |
| with open(save_name, "w") as file: | |
| for line in data_lines: | |
| file.write(line + "\n") | |
| def get_mix_pred(emo_pred, ce_prob): | |
| pred = [] | |
| for idx, curr_pred in enumerate(emo_pred): | |
| if np.max(curr_pred) > config_data.CONFIDENCE_BE: | |
| pred.append(np.argmax(curr_pred)) | |
| else: | |
| pred.append(ce_prob[idx]+6) | |
| return pred | |
| def get_c_expr_db_pred( | |
| stat_df: pd.DataFrame, | |
| dyn_df: pd.DataFrame, | |
| audio_df: pd.DataFrame, | |
| name_video: str, | |
| weights_1: list[float], | |
| frame_indices: list[int], | |
| ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, list[str]]: | |
| """ | |
| Predict compound expressions using audio-visual emotional probabilities, optimized weights, and rules. | |
| Args: | |
| stat_df (pd.DataFrame): DataFrame containing static visual probabilities. | |
| dyn_df (pd.DataFrame): DataFrame containing dynamic visual probabilities. | |
| audio_df (pd.DataFrame): DataFrame containing audio probabilities. | |
| name_video (str): Name of the video. | |
| weights_1 (List[float]): List of weights for the Dirichlet-based fusion. | |
| Returns: | |
| Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, List[str]]: Predictions for compound expressions, | |
| and list of image locations. | |
| """ | |
| stat_df["image_location"] = [ | |
| f"{name_video}/{str(f+1).zfill(5)}.jpg" for f in stat_df.index | |
| ] | |
| dyn_df["image_location"] = [ | |
| f"{name_video}/{str(f+1).zfill(5)}.jpg" for f in dyn_df.index | |
| ] | |
| image_location = dyn_df.image_location.tolist() | |
| stat_df = stat_df[stat_df.image_location.isin(image_location)][NAME_EMO_AUDIO[:-1]].values | |
| dyn_df = softmax( | |
| dyn_df[dyn_df.image_location.isin(image_location)][NAME_EMO_AUDIO[:-1]].values | |
| ) | |
| audio_df = audio_df.groupby(["frames"]).mean().reset_index() | |
| audio_df = audio_df.rename(columns={"frames": "image_location"}) | |
| audio_df["image_location"] = [ | |
| get_image_location(name_video, i) for i in audio_df.image_location | |
| ] | |
| audio_df = softmax( | |
| audio_df[audio_df.image_location.isin(image_location)][NAME_EMO_AUDIO[:-1]].values | |
| ) | |
| if len(image_location) > len(audio_df): | |
| last_pred_audio = audio_df[-1] | |
| audio_df = np.vstack( | |
| (audio_df, [last_pred_audio] * (len(image_location) - len(audio_df))) | |
| ) | |
| predictions = [stat_df, dyn_df, audio_df] | |
| num_predictions = len(predictions) | |
| if weights_1: | |
| final_predictions = predictions[0] * weights_1[0] | |
| for i in range(1, num_predictions): | |
| final_predictions += predictions[i] * weights_1[i] | |
| else: | |
| final_predictions = np.sum(predictions, axis=0) / num_predictions | |
| av_prob = np.argmax(get_compound_expression( | |
| final_predictions, DICT_CE, | |
| ), axis=1) | |
| vs_prob = get_compound_expression( | |
| predictions[0], DICT_CE) | |
| vd_prob = get_compound_expression( | |
| predictions[1], DICT_CE) | |
| a_prob = get_compound_expression( | |
| predictions[2], DICT_CE) | |
| av_pred = get_mix_pred(final_predictions, av_prob) | |
| vs_pred = get_mix_pred(predictions[0], np.argmax(vs_prob, axis=1)) | |
| vd_pred = get_mix_pred(predictions[1], np.argmax(vd_prob, axis=1)) | |
| a_pred = get_mix_pred(predictions[2], np.argmax(a_prob, axis=1)) | |
| dict_pred_final = {'Audio-visual fusion':av_pred, 'Static visual model':vs_pred,'Dynamic visual model':vd_pred,'Audio model':a_pred} | |
| plt = plot_compound_expression_prediction( | |
| dict_preds = dict_pred_final, | |
| save_path = None, | |
| frame_indices = frame_indices, | |
| title = "Basic emotion and compound expression predictions") | |
| df = pd.DataFrame(dict_pred_final) | |
| return df, plt | |
| def get_evenly_spaced_frame_indices(total_frames, num_frames=10): | |
| if total_frames <= num_frames: | |
| return list(range(total_frames)) | |
| step = total_frames / num_frames | |
| return [int(np.round(i * step)) for i in range(num_frames)] |