Spaces:
Sleeping
Sleeping
| import os | |
| import cv2 | |
| import numpy as np | |
| from collections import defaultdict | |
| import matplotlib.pyplot as plt | |
| from rmn import RMN | |
| import gradio as gr | |
| def process_video(video_path, share_screen_mode): | |
| # 初始化目录 | |
| output_dir = 'output' | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| # 初始化表情检测模型 | |
| print("Initializing emotion detection model...") | |
| m = RMN() | |
| # 打开视频文件 | |
| cap = cv2.VideoCapture(video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_interval = int(fps * 1) # 每秒处理一帧 | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| print(f"Total frames: {total_frames}, FPS: {fps}") | |
| # 创建视频写入器 | |
| output_video_path = os.path.join(output_dir, 'output_video.avi') | |
| fourcc = cv2.VideoWriter_fourcc(*'XVID') | |
| out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height)) | |
| current_frame = 0 | |
| # 面部ID和表情数据 | |
| face_ids = [] | |
| max_face_id = 0 | |
| face_emotions = defaultdict(list) | |
| max_faces = 0 | |
| initial_faces = [] | |
| last_detections = {} | |
| print("Starting video processing...") | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| print("Finished processing video.") | |
| break | |
| if share_screen_mode: | |
| # 裁剪右侧1/5区域 | |
| x_start = int(frame_width * 4 / 5) | |
| frame_to_process = frame[:, x_start:] | |
| else: | |
| frame_to_process = frame.copy() | |
| x_start = 0 # 无偏移 | |
| if current_frame % frame_interval == 0: | |
| print(f"Processing frame {current_frame}...") | |
| # 检测面部 | |
| detections = m.detect_faces(frame_to_process) | |
| print(f"Detected {len(detections)} faces.") | |
| # 更新最大面部计数 | |
| if len(detections) > max_faces: | |
| max_faces = len(detections) | |
| for det in detections: | |
| xmin = det['xmin'] | |
| ymin = det['ymin'] | |
| xmax = det['xmax'] | |
| ymax = det['ymax'] | |
| matched_id = None | |
| max_iou = 0 | |
| # 与现有面部进行比较 | |
| for face in initial_faces: | |
| ixmin, iymin, ixmax, iymax = face['bbox'] | |
| # 计算IoU | |
| xx1 = max(xmin, ixmin) | |
| yy1 = max(ymin, iymin) | |
| xx2 = min(xmax, ixmax) | |
| yy2 = min(ymax, iymax) | |
| inter_area = max(0, xx2 - xx1) * max(0, yy2 - yy1) | |
| area1 = (xmax - xmin) * (ymax - ymin) | |
| area2 = (ixmax - ixmin) * (iymax - iymin) | |
| iou = inter_area / float(area1 + area2 - inter_area + 1e-5) | |
| if iou > 0.3 and iou > max_iou: | |
| matched_id = face['id'] | |
| max_iou = iou | |
| if matched_id is None: | |
| if len(initial_faces) < max_faces: | |
| # 创建新的面部ID | |
| matched_id = max_face_id | |
| max_face_id += 1 | |
| initial_faces.append({'id': matched_id, 'bbox': (xmin, ymin, xmax, ymax)}) | |
| else: | |
| # 基于距离匹配 | |
| cx = (xmin + xmax) / 2 | |
| cy = (ymin + ymax) / 2 | |
| min_dist = float('inf') | |
| for face in initial_faces: | |
| fx = (face['bbox'][0] + face['bbox'][2]) / 2 | |
| fy = (face['bbox'][1] + face['bbox'][3]) / 2 | |
| dist = np.sqrt((cx - fx) ** 2 + (cy - fy) ** 2) | |
| if dist < min_dist: | |
| min_dist = dist | |
| matched_id = face['id'] | |
| # 更新面部边界框 | |
| for face in initial_faces: | |
| if face['id'] == matched_id: | |
| face['bbox'] = (xmin, ymin, xmax, ymax) | |
| break | |
| # 获取表情标签 | |
| face_img = frame_to_process[ymin:ymax, xmin:xmax] | |
| if face_img.size == 0: | |
| continue | |
| emo_label, _, _ = m.detect_emotion_for_single_face_image(face_img) | |
| if emo_label not in ['neutral', 'happy']: | |
| emo_label = 'confused' | |
| # 记录表情 | |
| face_emotions[matched_id].append((current_frame / fps, emo_label)) | |
| print(f"Face {matched_id} emotion: {emo_label}") | |
| # 更新最后的检测结果,调整坐标到原始帧 | |
| xmin_global = xmin + x_start | |
| xmax_global = xmax + x_start | |
| last_detections[matched_id] = (xmin_global, ymin, xmax_global, ymax, emo_label) | |
| # 在原始帧上绘制最后的检测结果 | |
| for face_id, (xmin, ymin, xmax, ymax, emo_label) in last_detections.items(): | |
| cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2) | |
| cv2.putText(frame, f"ID:{face_id} {emo_label}", (xmin, ymin + 20), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2) | |
| # 将处理后的帧写入输出视频 | |
| out.write(frame) | |
| current_frame += 1 | |
| cap.release() | |
| out.release() | |
| print("Finished processing video.") | |
| # 返回输出视频路径和面部表情数据 | |
| return output_video_path, face_emotions | |
| def generate_graphs(selected_ids, face_emotions): | |
| # 将selected_ids从字符串转换为整数 | |
| selected_ids = [int(face_id) for face_id in selected_ids] | |
| selected_face_emotions = {face_id: emotions for face_id, emotions in face_emotions.items() if face_id in selected_ids} | |
| output_dir = 'output' | |
| emotion_labels = ['confused', 'neutral', 'happy'] | |
| # 生成表情变化图 | |
| plt.figure(figsize=(15, 10)) | |
| for i, (face_id, emotions) in enumerate(selected_face_emotions.items(), 1): | |
| times = [t for t, _ in emotions] | |
| labels = [emotion_labels.index(emo) for _, emo in emotions] | |
| plt.subplot(len(selected_face_emotions), 1, i) | |
| plt.plot(times, labels, marker='o') | |
| plt.title(f"Emotion changes for face {face_id}") | |
| plt.xlabel('Time (s)') | |
| plt.ylabel('Emotion') | |
| plt.yticks([0, 1, 2], emotion_labels) | |
| plt.tight_layout() | |
| graph_path = os.path.join(output_dir, "selected_faces_emotions.png") | |
| plt.savefig(graph_path) | |
| plt.close() | |
| print("Saved emotion change graph for selected faces.") | |
| # 生成表情比例图 | |
| time_points = sorted(set(t for emotions in selected_face_emotions.values() for t, _ in emotions)) | |
| emotion_counts_over_time = {t: defaultdict(int) for t in time_points} | |
| for emotions in selected_face_emotions.values(): | |
| for t, emo in emotions: | |
| emotion_counts_over_time[t][emo] += 1 | |
| emotion_proportions_over_time = {t: {emo: 0 for emo in emotion_labels} for t in time_points} | |
| for t in time_points: | |
| total_faces = sum(emotion_counts_over_time[t].values()) | |
| if total_faces > 0: | |
| for emo in emotion_labels: | |
| emotion_proportions_over_time[t][emo] = emotion_counts_over_time[t][emo] / total_faces | |
| plt.figure(figsize=(15, 10)) | |
| for i, emo in enumerate(emotion_labels, 1): | |
| proportions = [emotion_proportions_over_time[t][emo] for t in time_points] | |
| plt.subplot(len(emotion_labels), 1, i) | |
| plt.plot(time_points, proportions, marker='o') | |
| plt.title(f"Proportion of {emo} over time") | |
| plt.xlabel('Time (s)') | |
| plt.ylabel('Proportion') | |
| plt.ylim(0, 1) | |
| plt.tight_layout() | |
| emotion_proportions_path = os.path.join(output_dir, "selected_emotion_proportions_over_time.png") | |
| plt.savefig(emotion_proportions_path) | |
| plt.close() | |
| print("Saved emotion proportion graph for selected faces.") | |
| return graph_path, emotion_proportions_path | |
| # Gradio Interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Emotion Detection in Videos") | |
| video_input = gr.Video(label="Upload a video") | |
| share_screen_checkbox = gr.Checkbox(label="Turn on share mode", value=False) | |
| process_btn = gr.Button("Process Video") | |
| video_output = gr.Video(label="Processed Video Output") | |
| # 状态,用于保存面部表情数据 | |
| face_emotions_state = gr.State() | |
| # 多选框,列出检测到的ID | |
| id_checkbox_group = gr.CheckboxGroup(label="Select Face IDs") | |
| generate_graphs_btn = gr.Button("Generate Graphs") | |
| graph_output = gr.Image(label="Emotion Change Graph") | |
| emotion_proportions_output = gr.Image(label="Emotion Proportions Graph") | |
| def process_and_get_ids(video, share_screen_mode): | |
| video_output_path, face_emotions = process_video(video, share_screen_mode) | |
| face_ids = [str(face_id) for face_id in face_emotions.keys()] | |
| return video_output_path, gr.update(choices=face_ids), face_emotions | |
| process_btn.click( | |
| fn=process_and_get_ids, | |
| inputs=[video_input, share_screen_checkbox], | |
| outputs=[video_output, id_checkbox_group, face_emotions_state] | |
| ) | |
| generate_graphs_btn.click( | |
| fn=generate_graphs, | |
| inputs=[id_checkbox_group, face_emotions_state], | |
| outputs=[graph_output, emotion_proportions_output] | |
| ) | |
| demo.launch(share=True) |