Spaces:
Sleeping
Sleeping
File size: 9,557 Bytes
8d199d1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
import os
import cv2
import numpy as np
from collections import defaultdict
import matplotlib.pyplot as plt
from rmn import RMN
import gradio as gr
def process_video(video_path, share_screen_mode):
# 初始化目录
output_dir = 'output'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 初始化表情检测模型
print("Initializing emotion detection model...")
m = RMN()
# 打开视频文件
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = int(fps * 1) # 每秒处理一帧
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"Total frames: {total_frames}, FPS: {fps}")
# 创建视频写入器
output_video_path = os.path.join(output_dir, 'output_video.avi')
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))
current_frame = 0
# 面部ID和表情数据
face_ids = []
max_face_id = 0
face_emotions = defaultdict(list)
max_faces = 0
initial_faces = []
last_detections = {}
print("Starting video processing...")
while True:
ret, frame = cap.read()
if not ret:
print("Finished processing video.")
break
if share_screen_mode:
# 裁剪右侧1/5区域
x_start = int(frame_width * 4 / 5)
frame_to_process = frame[:, x_start:]
else:
frame_to_process = frame.copy()
x_start = 0 # 无偏移
if current_frame % frame_interval == 0:
print(f"Processing frame {current_frame}...")
# 检测面部
detections = m.detect_faces(frame_to_process)
print(f"Detected {len(detections)} faces.")
# 更新最大面部计数
if len(detections) > max_faces:
max_faces = len(detections)
for det in detections:
xmin = det['xmin']
ymin = det['ymin']
xmax = det['xmax']
ymax = det['ymax']
matched_id = None
max_iou = 0
# 与现有面部进行比较
for face in initial_faces:
ixmin, iymin, ixmax, iymax = face['bbox']
# 计算IoU
xx1 = max(xmin, ixmin)
yy1 = max(ymin, iymin)
xx2 = min(xmax, ixmax)
yy2 = min(ymax, iymax)
inter_area = max(0, xx2 - xx1) * max(0, yy2 - yy1)
area1 = (xmax - xmin) * (ymax - ymin)
area2 = (ixmax - ixmin) * (iymax - iymin)
iou = inter_area / float(area1 + area2 - inter_area + 1e-5)
if iou > 0.3 and iou > max_iou:
matched_id = face['id']
max_iou = iou
if matched_id is None:
if len(initial_faces) < max_faces:
# 创建新的面部ID
matched_id = max_face_id
max_face_id += 1
initial_faces.append({'id': matched_id, 'bbox': (xmin, ymin, xmax, ymax)})
else:
# 基于距离匹配
cx = (xmin + xmax) / 2
cy = (ymin + ymax) / 2
min_dist = float('inf')
for face in initial_faces:
fx = (face['bbox'][0] + face['bbox'][2]) / 2
fy = (face['bbox'][1] + face['bbox'][3]) / 2
dist = np.sqrt((cx - fx) ** 2 + (cy - fy) ** 2)
if dist < min_dist:
min_dist = dist
matched_id = face['id']
# 更新面部边界框
for face in initial_faces:
if face['id'] == matched_id:
face['bbox'] = (xmin, ymin, xmax, ymax)
break
# 获取表情标签
face_img = frame_to_process[ymin:ymax, xmin:xmax]
if face_img.size == 0:
continue
emo_label, _, _ = m.detect_emotion_for_single_face_image(face_img)
if emo_label not in ['neutral', 'happy']:
emo_label = 'confused'
# 记录表情
face_emotions[matched_id].append((current_frame / fps, emo_label))
print(f"Face {matched_id} emotion: {emo_label}")
# 更新最后的检测结果,调整坐标到原始帧
xmin_global = xmin + x_start
xmax_global = xmax + x_start
last_detections[matched_id] = (xmin_global, ymin, xmax_global, ymax, emo_label)
# 在原始帧上绘制最后的检测结果
for face_id, (xmin, ymin, xmax, ymax, emo_label) in last_detections.items():
cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
cv2.putText(frame, f"ID:{face_id} {emo_label}", (xmin, ymin + 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2)
# 将处理后的帧写入输出视频
out.write(frame)
current_frame += 1
cap.release()
out.release()
print("Finished processing video.")
# 返回输出视频路径和面部表情数据
return output_video_path, face_emotions
def generate_graphs(selected_ids, face_emotions):
# 将selected_ids从字符串转换为整数
selected_ids = [int(face_id) for face_id in selected_ids]
selected_face_emotions = {face_id: emotions for face_id, emotions in face_emotions.items() if face_id in selected_ids}
output_dir = 'output'
emotion_labels = ['confused', 'neutral', 'happy']
# 生成表情变化图
plt.figure(figsize=(15, 10))
for i, (face_id, emotions) in enumerate(selected_face_emotions.items(), 1):
times = [t for t, _ in emotions]
labels = [emotion_labels.index(emo) for _, emo in emotions]
plt.subplot(len(selected_face_emotions), 1, i)
plt.plot(times, labels, marker='o')
plt.title(f"Emotion changes for face {face_id}")
plt.xlabel('Time (s)')
plt.ylabel('Emotion')
plt.yticks([0, 1, 2], emotion_labels)
plt.tight_layout()
graph_path = os.path.join(output_dir, "selected_faces_emotions.png")
plt.savefig(graph_path)
plt.close()
print("Saved emotion change graph for selected faces.")
# 生成表情比例图
time_points = sorted(set(t for emotions in selected_face_emotions.values() for t, _ in emotions))
emotion_counts_over_time = {t: defaultdict(int) for t in time_points}
for emotions in selected_face_emotions.values():
for t, emo in emotions:
emotion_counts_over_time[t][emo] += 1
emotion_proportions_over_time = {t: {emo: 0 for emo in emotion_labels} for t in time_points}
for t in time_points:
total_faces = sum(emotion_counts_over_time[t].values())
if total_faces > 0:
for emo in emotion_labels:
emotion_proportions_over_time[t][emo] = emotion_counts_over_time[t][emo] / total_faces
plt.figure(figsize=(15, 10))
for i, emo in enumerate(emotion_labels, 1):
proportions = [emotion_proportions_over_time[t][emo] for t in time_points]
plt.subplot(len(emotion_labels), 1, i)
plt.plot(time_points, proportions, marker='o')
plt.title(f"Proportion of {emo} over time")
plt.xlabel('Time (s)')
plt.ylabel('Proportion')
plt.ylim(0, 1)
plt.tight_layout()
emotion_proportions_path = os.path.join(output_dir, "selected_emotion_proportions_over_time.png")
plt.savefig(emotion_proportions_path)
plt.close()
print("Saved emotion proportion graph for selected faces.")
return graph_path, emotion_proportions_path
# Gradio Interface
with gr.Blocks() as demo:
gr.Markdown("# Emotion Detection in Videos")
video_input = gr.Video(label="Upload a video")
share_screen_checkbox = gr.Checkbox(label="Turn on share mode", value=False)
process_btn = gr.Button("Process Video")
video_output = gr.Video(label="Processed Video Output")
# 状态,用于保存面部表情数据
face_emotions_state = gr.State()
# 多选框,列出检测到的ID
id_checkbox_group = gr.CheckboxGroup(label="Select Face IDs")
generate_graphs_btn = gr.Button("Generate Graphs")
graph_output = gr.Image(label="Emotion Change Graph")
emotion_proportions_output = gr.Image(label="Emotion Proportions Graph")
def process_and_get_ids(video, share_screen_mode):
video_output_path, face_emotions = process_video(video, share_screen_mode)
face_ids = [str(face_id) for face_id in face_emotions.keys()]
return video_output_path, gr.update(choices=face_ids), face_emotions
process_btn.click(
fn=process_and_get_ids,
inputs=[video_input, share_screen_checkbox],
outputs=[video_output, id_checkbox_group, face_emotions_state]
)
generate_graphs_btn.click(
fn=generate_graphs,
inputs=[id_checkbox_group, face_emotions_state],
outputs=[graph_output, emotion_proportions_output]
)
demo.launch(share=True) |