import os import cv2 import numpy as np from collections import defaultdict import matplotlib.pyplot as plt from rmn import RMN import gradio as gr def process_video(video_path, share_screen_mode): # 初始化目录 output_dir = 'output' if not os.path.exists(output_dir): os.makedirs(output_dir) # 初始化表情检测模型 print("Initializing emotion detection model...") m = RMN() # 打开视频文件 cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) frame_interval = int(fps * 1) # 每秒处理一帧 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(f"Total frames: {total_frames}, FPS: {fps}") # 创建视频写入器 output_video_path = os.path.join(output_dir, 'output_video.avi') fourcc = cv2.VideoWriter_fourcc(*'XVID') out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height)) current_frame = 0 # 面部ID和表情数据 face_ids = [] max_face_id = 0 face_emotions = defaultdict(list) max_faces = 0 initial_faces = [] last_detections = {} print("Starting video processing...") while True: ret, frame = cap.read() if not ret: print("Finished processing video.") break if share_screen_mode: # 裁剪右侧1/5区域 x_start = int(frame_width * 4 / 5) frame_to_process = frame[:, x_start:] else: frame_to_process = frame.copy() x_start = 0 # 无偏移 if current_frame % frame_interval == 0: print(f"Processing frame {current_frame}...") # 检测面部 detections = m.detect_faces(frame_to_process) print(f"Detected {len(detections)} faces.") # 更新最大面部计数 if len(detections) > max_faces: max_faces = len(detections) for det in detections: xmin = det['xmin'] ymin = det['ymin'] xmax = det['xmax'] ymax = det['ymax'] matched_id = None max_iou = 0 # 与现有面部进行比较 for face in initial_faces: ixmin, iymin, ixmax, iymax = face['bbox'] # 计算IoU xx1 = max(xmin, ixmin) yy1 = max(ymin, iymin) xx2 = min(xmax, ixmax) yy2 = min(ymax, iymax) inter_area = max(0, xx2 - xx1) * max(0, yy2 - yy1) area1 = (xmax - xmin) * (ymax - ymin) area2 = (ixmax - ixmin) * (iymax - iymin) iou = inter_area / float(area1 + area2 - inter_area + 1e-5) if iou > 0.3 and iou > max_iou: matched_id = face['id'] max_iou = iou if matched_id is None: if len(initial_faces) < max_faces: # 创建新的面部ID matched_id = max_face_id max_face_id += 1 initial_faces.append({'id': matched_id, 'bbox': (xmin, ymin, xmax, ymax)}) else: # 基于距离匹配 cx = (xmin + xmax) / 2 cy = (ymin + ymax) / 2 min_dist = float('inf') for face in initial_faces: fx = (face['bbox'][0] + face['bbox'][2]) / 2 fy = (face['bbox'][1] + face['bbox'][3]) / 2 dist = np.sqrt((cx - fx) ** 2 + (cy - fy) ** 2) if dist < min_dist: min_dist = dist matched_id = face['id'] # 更新面部边界框 for face in initial_faces: if face['id'] == matched_id: face['bbox'] = (xmin, ymin, xmax, ymax) break # 获取表情标签 face_img = frame_to_process[ymin:ymax, xmin:xmax] if face_img.size == 0: continue emo_label, _, _ = m.detect_emotion_for_single_face_image(face_img) if emo_label not in ['neutral', 'happy']: emo_label = 'confused' # 记录表情 face_emotions[matched_id].append((current_frame / fps, emo_label)) print(f"Face {matched_id} emotion: {emo_label}") # 更新最后的检测结果,调整坐标到原始帧 xmin_global = xmin + x_start xmax_global = xmax + x_start last_detections[matched_id] = (xmin_global, ymin, xmax_global, ymax, emo_label) # 在原始帧上绘制最后的检测结果 for face_id, (xmin, ymin, xmax, ymax, emo_label) in last_detections.items(): cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2) cv2.putText(frame, f"ID:{face_id} {emo_label}", (xmin, ymin + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2) # 将处理后的帧写入输出视频 out.write(frame) current_frame += 1 cap.release() out.release() print("Finished processing video.") # 返回输出视频路径和面部表情数据 return output_video_path, face_emotions def generate_graphs(selected_ids, face_emotions): # 将selected_ids从字符串转换为整数 selected_ids = [int(face_id) for face_id in selected_ids] selected_face_emotions = {face_id: emotions for face_id, emotions in face_emotions.items() if face_id in selected_ids} output_dir = 'output' emotion_labels = ['confused', 'neutral', 'happy'] # 生成表情变化图 plt.figure(figsize=(15, 10)) for i, (face_id, emotions) in enumerate(selected_face_emotions.items(), 1): times = [t for t, _ in emotions] labels = [emotion_labels.index(emo) for _, emo in emotions] plt.subplot(len(selected_face_emotions), 1, i) plt.plot(times, labels, marker='o') plt.title(f"Emotion changes for face {face_id}") plt.xlabel('Time (s)') plt.ylabel('Emotion') plt.yticks([0, 1, 2], emotion_labels) plt.tight_layout() graph_path = os.path.join(output_dir, "selected_faces_emotions.png") plt.savefig(graph_path) plt.close() print("Saved emotion change graph for selected faces.") # 生成表情比例图 time_points = sorted(set(t for emotions in selected_face_emotions.values() for t, _ in emotions)) emotion_counts_over_time = {t: defaultdict(int) for t in time_points} for emotions in selected_face_emotions.values(): for t, emo in emotions: emotion_counts_over_time[t][emo] += 1 emotion_proportions_over_time = {t: {emo: 0 for emo in emotion_labels} for t in time_points} for t in time_points: total_faces = sum(emotion_counts_over_time[t].values()) if total_faces > 0: for emo in emotion_labels: emotion_proportions_over_time[t][emo] = emotion_counts_over_time[t][emo] / total_faces plt.figure(figsize=(15, 10)) for i, emo in enumerate(emotion_labels, 1): proportions = [emotion_proportions_over_time[t][emo] for t in time_points] plt.subplot(len(emotion_labels), 1, i) plt.plot(time_points, proportions, marker='o') plt.title(f"Proportion of {emo} over time") plt.xlabel('Time (s)') plt.ylabel('Proportion') plt.ylim(0, 1) plt.tight_layout() emotion_proportions_path = os.path.join(output_dir, "selected_emotion_proportions_over_time.png") plt.savefig(emotion_proportions_path) plt.close() print("Saved emotion proportion graph for selected faces.") return graph_path, emotion_proportions_path # Gradio Interface with gr.Blocks() as demo: gr.Markdown("# Emotion Detection in Videos") video_input = gr.Video(label="Upload a video") share_screen_checkbox = gr.Checkbox(label="Turn on share mode", value=False) process_btn = gr.Button("Process Video") video_output = gr.Video(label="Processed Video Output") # 状态,用于保存面部表情数据 face_emotions_state = gr.State() # 多选框,列出检测到的ID id_checkbox_group = gr.CheckboxGroup(label="Select Face IDs") generate_graphs_btn = gr.Button("Generate Graphs") graph_output = gr.Image(label="Emotion Change Graph") emotion_proportions_output = gr.Image(label="Emotion Proportions Graph") def process_and_get_ids(video, share_screen_mode): video_output_path, face_emotions = process_video(video, share_screen_mode) face_ids = [str(face_id) for face_id in face_emotions.keys()] return video_output_path, gr.update(choices=face_ids), face_emotions process_btn.click( fn=process_and_get_ids, inputs=[video_input, share_screen_checkbox], outputs=[video_output, id_checkbox_group, face_emotions_state] ) generate_graphs_btn.click( fn=generate_graphs, inputs=[id_checkbox_group, face_emotions_state], outputs=[graph_output, emotion_proportions_output] ) demo.launch(share=True)