机器视觉:基于 Python 的人脸识别与照片管理工具——从检测到整理的全流程实现

Python人脸聚类工具:MTCNN检测、InsightFace特征、余弦相似度聚类,支持多进程批处理、相似分组保存与重复照片删除,适用于说话人/人脸数据集整理。


基于Python的人脸聚类与相似性分析工具:从检测到整理的全流程实现

前言

在数据集中,Speaker的数量统计是非常重要的一个环节。基于此,我开发了一个自动化的人脸聚类与相似性分析工具,能够快速处理目录中的所有图片,检测人脸、提取特征、识别相似人脸并自动分组,提供重复照片删除功能。本文将详细介绍这个工具的实现原理、核心技术及代码结构,帮助大家理解从人脸检测到照片整理的全流程技术细节。

模型及算法介绍

该工具整合了多个计算机视觉领域的经典模型和算法,核心技术栈包括人脸检测、特征提取、相似性计算和聚类分析四部分:

1. 人脸检测:MTCNN

MTCNN(Multi-task Cascaded Convolutional Networks)是一种多任务级联卷积神经网络,能够同时完成人脸检测、关键点定位等任务。相比传统的人脸检测算法,MTCNN具有检测速度快、准确率高的特点,尤其在处理遮挡、多角度人脸时表现优异。

在本工具中,MTCNN的主要作用是定位图片中的人脸位置,输出人脸边界框(bbox),为后续的特征提取提供准确的区域范围。

2. 人脸特征提取:InsightFace

InsightFace是一个开源的人脸分析工具包,内置了高性能的人脸特征提取模型。它能够将人脸图像转换为固定维度的特征向量(嵌入向量),同一人的不同照片会生成相似的向量,而不同人的向量差异较大。

工具中使用InsightFace的FaceAnalysis模块提取特征,生成的特征向量具有良好的区分性,为后续的相似性计算奠定基础。

3. 相似性计算:余弦相似度

余弦相似度是衡量两个向量方向差异的指标,取值范围为[-1, 1],值越接近1表示向量方向越相似。对于人脸特征向量,余弦相似度能够有效反映两张人脸的相似程度:

cosine_similarity ( A , B )

A ⋅ B ∣ ∣ A ∣ ∣ ⋅ ∣ ∣ B ∣ ∣ \text{cosine\_similarity}(A,B) = \frac{A \cdot B}{||A|| \cdot ||B||} cosine_similarity(A,B)=∣∣A∣∣⋅∣∣B∣∣A⋅B​

工具中设置了相似度阈值(默认0.65),超过该阈值的人脸被判定为同一人。

4. 聚类算法:基于相似度矩阵的分组

在得到所有图片的特征向量后,工具通过计算全量特征的相似度矩阵,构建相似人脸对,再通过贪心聚类算法将相似的人脸合并为组。这种方法虽然简单,但在中小规模图片集(数千张)上效率较高,且能保证聚类结果的可解释性。

代码实现介绍

工具的核心代码封装在FaceRecognitionSystem类中,配合多进程处理提高效率,整体结构清晰,分为初始化、图片处理、相似性分析、结果处理和辅助分析五大模块。

1. 初始化与多进程配置

为了提高大规模图片处理的效率,工具使用多进程并行处理图片。由于深度学习模型无法直接在多进程间共享,因此通过initialize_worker函数为每个进程单独初始化MTCNN和InsightFace模型:

def initialize_worker():
    """初始化每个工作进程的模型"""
    global detector, app
    if detector is None:
        detector = MTCNN()  # 初始化MTCNN人脸检测器
    if app is None:
        app = FaceAnalysis(providers=['CPUExecutionProvider'])  # 初始化InsightFace
        app.prepare(ctx_id=0, det_size=(640, 640))

2. 图片处理流程

单张图片处理(process_image函数)

该函数是多进程的核心任务单元,负责:

  • 读取图片并转换为RGB格式(MTCNN要求输入为RGB)
  • 使用MTCNN检测人脸,获取边界框
  • 使用InsightFace提取人脸特征向量
  • 返回图片路径、特征向量和边界框(若检测到人脸)
def process_image(image_path):
    try:
        image = cv2.imread(image_path)
        if image is None:
            return None
        rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # BGR转RGB
        results = detector.detect_faces(rgb_image)  # MTCNN检测人脸
        if results:
            faces = app.get(rgb_image)  # InsightFace提取特征
            if faces:
                x, y, w, h = results[0]['box']  # 取第一个人脸框
                return image_path, faces[0].embedding, (int(x), int(y), int(w), int(h))
    except Exception as e:
        print(f"处理图片 {image_path} 时出错: {str(e)}")
    return None
目录批量处理(process_directory方法)

该方法遍历指定目录下的所有图片文件,使用进程池并行调用process_image,收集有效结果(包含人脸的图片)并存储特征向量、路径和边界框:

def process_directory(self, input_dir):
    image_extensions = ['.jpg', '.jpeg', '.png', '.bmp']
    image_files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) 
                  if os.path.isfile(os.path.join(input_dir, f)) 
                  and os.path.splitext(f)[1].lower() in image_extensions]
    
    with Pool(processes=2, initializer=initialize_worker) as pool:  # 2个进程并行
        results = list(tqdm(pool.imap_unordered(process_image, image_files), 
                          total=len(image_files), desc="处理图片"))
    
    for result in results:
        if result:
            image_path, feature, bbox = result
            self.face_features.append(feature)
            self.file_paths.append(image_path)
            self.bboxes.append(bbox)

3. 相似人脸分析(find_similar_faces方法)

该方法是核心分析逻辑,分为三步:

  1. 计算所有特征向量的余弦相似度矩阵
  2. 基于阈值筛选相似人脸对
  3. 聚类相似人脸,形成群组并计算组内平均相似度
def find_similar_faces(self):
    if len(self.face_features) < 2:
        return []
    
    # 计算余弦相似度矩阵
    features_array = np.array(self.face_features)
    similarity_matrix = cosine_similarity(features_array)
    
    # 聚类相似人脸
    groups = []
    used = set()
    for i in range(len(self.face_features)):
        if i not in used:
            group = [i]
            used.add(i)
            # 寻找所有与当前人脸相似的人脸
            for j in range(len(self.face_features)):
                if j not in used and similarity_matrix[i][j] > self.similarity_threshold:
                    group.append(j)
                    used.add(j)
            if len(group) > 1:
                groups.append(group)
    
    # 转换为文件路径并计算组内平均相似度
    result_groups = []
    for group in groups:
        group_files = [self.file_paths[idx] for idx in group]
        avg_sim = self._calculate_group_similarity(group, similarity_matrix)
        result_groups.append({'files': group_files, 'average_similarity': avg_sim})
    
    return sorted(result_groups, key=lambda x: x['average_similarity'], reverse=True)

4. 结果处理功能

保存分组结果(save_similar_groups方法)

将相似人脸组保存到不同文件夹,并生成CSV结果文件,包含每组的图片路径和平均相似度;未分组的图片保存到ungrouped目录。

    def save_similar_groups(self, output_dir, similar_groups):
        """
        将相似的人脸保存到不同的文件夹
        :param output_dir: 输出目录
        :param similar_groups: 相似人脸组
        """
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        # 保存结果到CSV
        results = []
        for i, group in enumerate(similar_groups):
            group_dir = os.path.join(output_dir, f"group_{i+1}_similarity_{group['average_similarity']:.4f}")
            os.makedirs(group_dir, exist_ok=True)
            
            for file_path in group['files']:
                file_name = os.path.basename(file_path)
                dest_path = os.path.join(group_dir, file_name)
                shutil.copy2(file_path, dest_path)
                
                results.append({
                    'group': i+1,
                    'file': file_name,
                    'similarity': group['average_similarity']
                })
        
        # 保存未分组的文件(没有找到相似人脸的)
        all_group_files = set()
        for group in similar_groups:
            # 注意:这里有一个小bug,应该是 group['files']
            all_group_files.update(file for file in group['files'])
        
        ungrouped_dir = os.path.join(output_dir, "ungrouped")
        os.makedirs(ungrouped_dir, exist_ok=True)
        
        for file_path in self.file_paths:
            if file_path not in all_group_files:
                file_name = os.path.basename(file_path)
                dest_path = os.path.join(ungrouped_dir, file_name)
                shutil.copy2(file_path, dest_path)
                results.append({
                    'group': 'ungrouped',
                    'file': file_name,
                    'similarity': 0.0
                })
        
        # 保存结果到CSV
        df = pd.DataFrame(results)
        df.to_csv(os.path.join(output_dir, "similarity_results.csv"), index=False)
        print(f"结果已保存到 {output_dir} 目录")

重复照片删除(delete_one_duplicate_per_group方法)

从每个相似组中随机删除一张照片,减少重复存储(需用户确认)。

    def delete_one_duplicate_per_group(self, similar_groups):
        """
        从每个相似组中随机删除一张照片。
        :param similar_groups: 相似人脸组
        """
        if not similar_groups:
            print("没有发现相似的照片组,无需删除。")
            return

        print("\n开始随机删除每个组中的一张重复照片...")
        deleted_count = 0
        for i, group in enumerate(similar_groups):
            if len(group['files']) > 1:
                # 随机选择一张要删除的照片
                file_to_delete = random.choice(group['files'])
                try:
                    os.remove(file_to_delete)
                    print(f"组 {i+1}: 已删除照片 -> {os.path.basename(file_to_delete)}")
                    deleted_count += 1
                except OSError as e:
                    print(f"删除文件 {file_to_delete} 时出错: {e}")
        print(f"\n总共删除了 {deleted_count} 张重复照片。")

结果清理(clear_results方法)

删除生成的结果目录,方便重新处理。

    def clear_results(self, output_dir):
        """
        清除生成的结果目录。
        :param output_dir: 输出目录
        """
        if os.path.exists(output_dir):
            try:
                shutil.rmtree(output_dir)
                print(f"\n已成功清除结果目录: {output_dir}")
            except OSError as e:
                print(f"清除目录 {output_dir} 时出错: {e}")
        else:
            print("\n结果目录不存在,无需清除。")

6. 主函数逻辑

主函数支持两种模式(skip=0重新处理图片,skip=1从已有结果加载),用户可通过输入目录、输出目录和相似度阈值进行配置,并支持交互式选择是否删除重复照片和清理结果。

if __name__ == "__main__":
    # 配置
    script_dir = os.path.dirname(os.path.abspath(__file__))
    INPUT_DIRECTORY = os.path.join(script_dir, "face_dir")  # 替换为你的图片目录
    OUTPUT_DIRECTORY = os.path.join(script_dir, "face_recognition_results")
    SIMILARITY_THRESHOLD = 0.65  # 可根据需要调整
    
    # skip=0: 重新处理图片 | skip=1: 跳过处理,直接从结果文件操作
    skip = 1
    
    face_system = FaceRecognitionSystem(similarity_threshold=SIMILARITY_THRESHOLD)
    similar_groups = []

    if skip == 1:
        print("模式: skip=1。尝试从现有结果文件加载...")
        csv_path = os.path.join(OUTPUT_DIRECTORY, "similarity_results.csv")
        if os.path.exists(csv_path):
            df = pd.read_csv(csv_path)
            # 筛选出已分组的记录
            grouped_df = df[df['group'] != 'ungrouped'].copy()
            
            # pd.to_numeric to handle potential string group IDs
            grouped_df['group'] = pd.to_numeric(grouped_df['group'])

            for group_id, group_data in grouped_df.groupby('group'):
                # 从文件名重建完整路径
                files = [os.path.join(INPUT_DIRECTORY, fname) for fname in group_data['file']]
                avg_sim = group_data['similarity'].iloc[0]
                similar_groups.append({
                    'files': files,
                    'average_similarity': avg_sim
                })
            print(f"成功从 {csv_path} 加载了 {len(similar_groups)} 个相似组。")
        else:
            print(f"错误: 结果文件 {csv_path} 未找到。")
            print("将自动切换到 skip=0 模式重新处理图片。")
            # skip = 0 # 强制切换模式

    if skip == 0:
        print("模式: skip=0。开始处理图片...")
        # 创建系统实例
        face_system.process_directory(INPUT_DIRECTORY)
        # 查找相似的人脸
        similar_groups = face_system.find_similar_faces()
        print(f"\n发现 {len(similar_groups)} 组可能包含同一人的照片")
        # 保存结果
        if similar_groups:
            face_system.save_similar_groups(OUTPUT_DIRECTORY, similar_groups)

    # --- 后续操作 ---
    if similar_groups:
        # 询问是否删除重复照片
        choice_delete = input("\n是否要从每个相似组中随机删除一张照片? (y/n): ").lower()
        if choice_delete in ['y', 'yes']:
            face_system.delete_one_duplicate_per_group(similar_groups)
    else:
        print("\n没有发现可操作的相似组。")

    # 询问是否清除结果
    choice_clear = input("\n是否要清除本次运行生成的结果? (y/n): ").lower()
    if choice_clear in ['y', 'yes']:
        face_system.clear_results(OUTPUT_DIRECTORY)

完整代码

import os
import cv2
import numpy as np
import pandas as pd
from mtcnn import MTCNN
from insightface.app import FaceAnalysis
from sklearn.metrics.pairwise import cosine_similarity
import shutil
from tqdm import tqdm
from multiprocessing import Pool, cpu_count
import random

# 全局变量,用于在多进程中初始化模型
detector = None
app = None

def initialize_worker():
    """初始化每个工作进程的模型"""
    global detector, app
    if detector is None:
        detector = MTCNN()
    if app is None:
        app = FaceAnalysis(providers=['CPUExecutionProvider'])
        app.prepare(ctx_id=0, det_size=(640, 640))

def process_image(image_path):
    """
    处理单个图片,检测人脸并提取特征(设计为在多进程中运行)
    :param image_path: 图片路径
    :return: (image_path, feature, bbox) 或 None; bbox=(x,y,w,h)
    """
    try:
        image = cv2.imread(image_path)
        if image is None:
            return None
        rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = detector.detect_faces(rgb_image)
        if results:
            faces = app.get(rgb_image)
            if faces:
                # MTCNN与insightface可能检测顺序不同,这里优先使用MTCNN第一个框
                x, y, w, h = results[0]['box']
                return image_path, faces[0].embedding, (int(x), int(y), int(w), int(h))
    except Exception as e:
        print(f"处理图片 {image_path} 时出错: {str(e)}")
    return None

class FaceRecognitionSystem:
    def __init__(self, similarity_threshold=0.65):
        """
        初始化人脸检测和识别系统
        :param similarity_threshold: 判定为同一人的相似度阈值
        """
        # 相似度阈值,超过此值判定为同一人
        self.similarity_threshold = similarity_threshold
        
        # 存储人脸特征和对应的文件路径
        self.face_features = []
        self.file_paths = []
        self.valid_files = []
        self.bboxes = []  # 与file_paths一一对应的人脸框 (x,y,w,h)
        # 年龄性别模型相关
        self.age_net = None
        self.gender_net = None
        self.detector_net = None
        self.age_list = ['(0-2)', '(4-6)', '(8-12)', '(15-20)', '(25-32)', '(38-43)', '(48-53)', '(60-100)']
        self.gender_list = ['Male', 'Female']
        self.age_gender_loaded = False
        
    def process_directory(self, input_dir):
        """
        使用多进程处理目录中的所有图片
        :param input_dir: 图片所在目录
        """
        image_extensions = ['.jpg', '.jpeg', '.png', '.bmp']
        image_files = [
            os.path.join(input_dir, f) 
            for f in os.listdir(input_dir) 
            if os.path.isfile(os.path.join(input_dir, f)) 
            and os.path.splitext(f)[1].lower() in image_extensions
        ]
        
        # 在这里设置要使用的进程数 (max_workers)
        # 设置为 None 将自动使用所有可用的CPU核心
        max_workers = 2
        
        num_to_print = max_workers if max_workers is not None else cpu_count()
        print(f"发现 {len(image_files)} 个图片文件,使用 {num_to_print} 个进程开始处理...")
        
        # 创建进程池
        with Pool(processes=max_workers, initializer=initialize_worker) as pool:
            # 使用imap_unordered来获取进度条
            results = list(tqdm(
                pool.imap_unordered(process_image, image_files), 
                total=len(image_files),
                desc="处理图片"
            ))

        for result in results:
            if result:
                image_path, feature, bbox = result
                self.face_features.append(feature)
                self.file_paths.append(image_path)
                self.valid_files.append(os.path.basename(image_path))
                self.bboxes.append(bbox)
        
        print(f"成功处理 {len(self.face_features)} 张包含人脸的图片")
    
    def find_similar_faces(self):
        """
        找出相似的人脸(可能是同一个人)
        :return: 相似人脸组的列表
        """
        if len(self.face_features) < 2:
            return []
            
        # 计算所有特征之间的余弦相似度
        features_array = np.array(self.face_features)
        similarity_matrix = cosine_similarity(features_array)
        
        # 找出相似的人脸对
        similar_pairs = []
        n = len(self.face_features)
        
        for i in range(n):
            for j in range(i + 1, n):
                if similarity_matrix[i][j] > self.similarity_threshold:
                    similar_pairs.append((
                        self.file_paths[i], 
                        self.file_paths[j], 
                        similarity_matrix[i][j]
                    ))
        
        # 聚类相似的人脸,形成群组
        groups = []
        used = set()
        
        for i in range(n):
            if i not in used:
                group = [i]
                used.add(i)
                
                # 寻找所有与当前人脸相似的人脸
                for j in range(n):
                    if j not in used and similarity_matrix[i][j] > self.similarity_threshold:
                        group.append(j)
                        used.add(j)
                
                if len(group) > 1:  # 只保留有多个相似人脸的组
                    groups.append(group)
        
        # 将索引转换为文件路径
        result_groups = []
        for group in groups:
            group_files = [self.file_paths[idx] for idx in group]
            # 计算组内平均相似度
            avg_sim = self._calculate_group_similarity(group, similarity_matrix)
            result_groups.append({
                'files': group_files,
                'average_similarity': avg_sim
            })
        
        # 按组内平均相似度排序
        result_groups.sort(key=lambda x: x['average_similarity'], reverse=True)
        
        return result_groups
    
    def _calculate_group_similarity(self, group, similarity_matrix):
        """计算组内平均相似度"""
        if len(group) <= 1:
            return 0.0
            
        total = 0.0
        count = 0
        
        for i in range(len(group)):
            for j in range(i + 1, len(group)):
                total = group[i]
                idx_j = group[j]
                total += similarity_matrix[idx_j][idx_j]
                count += 1
        
        return total / count if count > 0 else 0.0
    
    def save_similar_groups(self, output_dir, similar_groups):
        """
        将相似的人脸保存到不同的文件夹
        :param output_dir: 输出目录
        :param similar_groups: 相似人脸组
        """
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        # 保存结果到CSV
        results = []
        for i, group in enumerate(similar_groups):
            group_dir = os.path.join(output_dir, f"group_{i+1}_similarity_{group['average_similarity']:.4f}")
            os.makedirs(group_dir, exist_ok=True)
            
            for file_path in group['files']:
                file_name = os.path.basename(file_path)
                dest_path = os.path.join(group_dir, file_name)
                shutil.copy2(file_path, dest_path)
                
                results.append({
                    'group': i+1,
                    'file': file_name,
                    'similarity': group['average_similarity']
                })
        
        # 保存未分组的文件(没有找到相似人脸的)
        all_group_files = set()
        for group in similar_groups:
            # 注意:这里有一个小bug,应该是 group['files']
            all_group_files.update(file for file in group['files'])
        
        ungrouped_dir = os.path.join(output_dir, "ungrouped")
        os.makedirs(ungrouped_dir, exist_ok=True)
        
        for file_path in self.file_paths:
            if file_path not in all_group_files:
                file_name = os.path.basename(file_path)
                dest_path = os.path.join(ungrouped_dir, file_name)
                shutil.copy2(file_path, dest_path)
                results.append({
                    'group': 'ungrouped',
                    'file': file_name,
                    'similarity': 0.0
                })
        
        # 保存结果到CSV
        df = pd.DataFrame(results)
        df.to_csv(os.path.join(output_dir, "similarity_results.csv"), index=False)
        print(f"结果已保存到 {output_dir} 目录")

    def delete_one_duplicate_per_group(self, similar_groups):
        """
        从每个相似组中随机删除一张照片。
        :param similar_groups: 相似人脸组
        """
        if not similar_groups:
            print("没有发现相似的照片组,无需删除。")
            return

        print("\n开始随机删除每个组中的一张重复照片...")
        deleted_count = 0
        for i, group in enumerate(similar_groups):
            if len(group['files']) > 1:
                # 随机选择一张要删除的照片
                file_to_delete = random.choice(group['files'])
                try:
                    os.remove(file_to_delete)
                    print(f"组 {i+1}: 已删除照片 -> {os.path.basename(file_to_delete)}")
                    deleted_count += 1
                except OSError as e:
                    print(f"删除文件 {file_to_delete} 时出错: {e}")
        print(f"\n总共删除了 {deleted_count} 张重复照片。")

    def clear_results(self, output_dir):
        """
        清除生成的结果目录。
        :param output_dir: 输出目录
        """
        if os.path.exists(output_dir):
            try:
                shutil.rmtree(output_dir)
                print(f"\n已成功清除结果目录: {output_dir}")
            except OSError as e:
                print(f"清除目录 {output_dir} 时出错: {e}")
        else:
            print("\n结果目录不存在,无需清除。")

    def ensure_age_gender_models(self, base_dir):
        """加载年龄性别检测相关模型 (只加载一次)
        现在模型目录定位为: (当前脚本目录的上一级)/age_gender
        """
        if self.age_gender_loaded:
            return True
        try:
            # 原来 base_dir 是 face 子目录,需要取上一级目录
            project_root = os.path.dirname(base_dir)  # 上一级目录 (Lip_reading)
            model_dir = os.path.join(project_root, 'age_gender')

            age_prototxt = os.path.join(model_dir, 'age_deploy.prototxt')
            age_caffemodel = os.path.join(model_dir, 'age_net.caffemodel')
            gender_prototxt = os.path.join(model_dir, 'gender_deploy.prototxt')
            gender_caffemodel = os.path.join(model_dir, 'gender_net.caffemodel')
            face_pb = os.path.join(model_dir, 'opencv_face_detector_uint8.pb')
            face_pbtxt = os.path.join(model_dir, 'opencv_face_detector.pbtxt')

            needed = [age_prototxt, age_caffemodel, gender_prototxt, gender_caffemodel, face_pb, face_pbtxt]
            for p in needed:
                if not os.path.exists(p):
                    print(f"缺少模型文件: {p},跳过年龄性别统计。")
                    return False

            self.age_net = cv2.dnn.readNet(age_caffemodel, age_prototxt)
            self.gender_net = cv2.dnn.readNet(gender_caffemodel, gender_prototxt)
            self.detector_net = cv2.dnn.readNetFromTensorflow(face_pb, face_pbtxt)
            self.age_gender_loaded = True
            print(f"年龄/性别模型加载完成。(模型目录: {model_dir})")
            return True
        except Exception as e:
            print(f"加载年龄性别模型失败: {e}")
            return False

    def predict_age_gender_for_image(self, image_path):
        """对单张图片预测第一张人脸的年龄段与性别。返回 (gender, age_range) 或 (None, None)"""
        if not self.age_gender_loaded:
            return (None, None)
        img = cv2.imread(image_path)
        if img is None:
            return (None, None)
        h, w = img.shape[:2]
        blob = cv2.dnn.blobFromImage(img, 1.0, (300, 300), [104, 117, 123], False, False)
        self.detector_net.setInput(blob)
        detections = self.detector_net.forward()
        best_conf = 0
        face_box = None
        for i in range(detections.shape[2]):
            confidence = detections[0, 0, i, 2]
            if confidence > 0.6 and confidence > best_conf:
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                x1, y1, x2, y2 = box.astype(int)
                x1, y1 = max(0, x1), max(0, y1)
                x2, y2 = min(w - 1, x2), min(h - 1, y2)
                face_box = (x1, y1, x2, y2)
                best_conf = confidence
        if face_box is None:
            return (None, None)
        x1, y1, x2, y2 = face_box
        face_roi = img[y1:y2, x1:x2].copy()
        if face_roi.size == 0:
            return (None, None)
        blob_face = cv2.dnn.blobFromImage(face_roi, 1.0, (227, 227), (78.4263377603, 87.7689143744, 114.895847746), swapRB=False)
        # gender
        self.gender_net.setInput(blob_face)
        gender_preds = self.gender_net.forward()
        gender = self.gender_list[gender_preds[0].argmax()]
        # age
        self.age_net.setInput(blob_face)
        age_preds = self.age_net.forward()
        age_range = self.age_list[age_preds[0].argmax()]
        return (gender, age_range)

    def analyze_age_gender(self, base_dir, output_dir, similar_groups):
        """
        对所有图片进行年龄性别统计,并输出两个文件:
        1) age_gender_details.csv: 每张图 -> 文件名, group_id, gender, age_range
        2) age_gender_stats.txt: 汇总统计
        """
        if not self.ensure_age_gender_models(base_dir):
            return
        records = []
        # 构建文件到组的映射
        file_to_group = {}
        for idx, g in enumerate(similar_groups, start=1):
            for f in g['files']:
                file_to_group[os.path.basename(f)] = idx
        # 如果没有groups,也统计所有现有file_paths
        target_files = self.file_paths if self.file_paths else []

        if not target_files:
            print("没有可用于年龄性别统计的图片。")
            return
        print("开始进行年龄/性别统计...")
        gender_counter = {}
        age_counter = {}
        for path in tqdm(target_files, desc="Age/Gender"):
            gender, age_range = self.predict_age_gender_for_image(path)
            base_name = os.path.basename(path)
            group_id = file_to_group.get(base_name, 'ungrouped')
            records.append({
                'file': base_name,
                'group_id': group_id,
                'gender': gender if gender else 'Unknown',
                'age_range': age_range if age_range else 'Unknown'
            })
            if gender:
                gender_counter[gender] = gender_counter.get(gender, 0) + 1
            else:
                gender_counter['Unknown'] = gender_counter.get('Unknown', 0) + 1
            if age_range:
                age_counter[age_range] = age_counter.get(age_range, 0) + 1
            else:
                age_counter['Unknown'] = age_counter.get('Unknown', 0) + 1
        # 保存明细
        os.makedirs(output_dir, exist_ok=True)
        details_path = os.path.join(output_dir, 'age_gender_details.csv')
        pd.DataFrame(records).to_csv(details_path, index=False)
        # 汇总
        stats_lines = ["Gender Statistics:"]
        for k,v in gender_counter.items():
            stats_lines.append(f"  {k}: {v}")
        stats_lines.append("\nAge Range Statistics:")
        for k,v in age_counter.items():
            stats_lines.append(f"  {k}: {v}")
        stats_path = os.path.join(output_dir, 'age_gender_stats.txt')
        with open(stats_path, 'w', encoding='utf-8') as fw:
            fw.write('\n'.join(stats_lines))
        print(f"年龄/性别统计完成,已保存到: {details_path} 与 {stats_path}")

if __name__ == "__main__":
    # 配置
    script_dir = os.path.dirname(os.path.abspath(__file__))
    INPUT_DIRECTORY = os.path.join(script_dir, "face_dir")  # 替换为你的图片目录
    OUTPUT_DIRECTORY = os.path.join(script_dir, "face_recognition_results")
    SIMILARITY_THRESHOLD = 0.65  # 可根据需要调整
    
    # skip=0: 重新处理图片 | skip=1: 跳过处理,直接从结果文件操作
    skip = 1
    
    face_system = FaceRecognitionSystem(similarity_threshold=SIMILARITY_THRESHOLD)
    similar_groups = []

    if skip == 1:
        print("模式: skip=1。尝试从现有结果文件加载...")
        csv_path = os.path.join(OUTPUT_DIRECTORY, "similarity_results.csv")
        if os.path.exists(csv_path):
            df = pd.read_csv(csv_path)
            # 筛选出已分组的记录
            grouped_df = df[df['group'] != 'ungrouped'].copy()
            
            # pd.to_numeric to handle potential string group IDs
            grouped_df['group'] = pd.to_numeric(grouped_df['group'])

            for group_id, group_data in grouped_df.groupby('group'):
                # 从文件名重建完整路径
                files = [os.path.join(INPUT_DIRECTORY, fname) for fname in group_data['file']]
                avg_sim = group_data['similarity'].iloc[0]
                similar_groups.append({
                    'files': files,
                    'average_similarity': avg_sim
                })
            print(f"成功从 {csv_path} 加载了 {len(similar_groups)} 个相似组。")
        else:
            print(f"错误: 结果文件 {csv_path} 未找到。")
            print("将自动切换到 skip=0 模式重新处理图片。")
            # skip = 0 # 强制切换模式

    if skip == 0:
        print("模式: skip=0。开始处理图片...")
        # 创建系统实例
        face_system.process_directory(INPUT_DIRECTORY)
        # 查找相似的人脸
        similar_groups = face_system.find_similar_faces()
        print(f"\n发现 {len(similar_groups)} 组可能包含同一人的照片")
        # 保存结果
        if similar_groups:
            face_system.save_similar_groups(OUTPUT_DIRECTORY, similar_groups)

    # --- 后续操作 ---
    if similar_groups:
        # 询问是否删除重复照片
        choice_delete = input("\n是否要从每个相似组中随机删除一张照片? (y/n): ").lower()
        if choice_delete in ['y', 'yes']:
            face_system.delete_one_duplicate_per_group(similar_groups)
    else:
        print("\n没有发现可操作的相似组。")

    # 询问是否清除结果
    choice_clear = input("\n是否要清除本次运行生成的结果? (y/n): ").lower()
    if choice_clear in ['y', 'yes']:
        face_system.clear_results(OUTPUT_DIRECTORY)


总结

本工具通过整合MTCNN、InsightFace等经典模型,结合余弦相似度和聚类算法,实现了从人脸检测、特征提取到相似性分析的全流程自动化。主要优势包括:

  1. 高效性:多进程并行处理大幅提升大规模图片的处理速度;
  2. 实用性:支持相似人脸分组、重复删除和年龄性别分析,满足日常照片整理需求;
  3. 可扩展性:代码结构清晰,可通过调整相似度阈值、更换模型或优化聚类算法进一步提升性能。

希望这个工具能帮助大家更高效地管理照片,也为学习人脸识别技术的同学提供一个可参考的实践案例。

機器視覺:基於 Python 的人臉識別與照片管理工具——從檢測到整理的全流程實現

Python人臉聚類工具:MTCNN檢測、InsightFace特徵、餘弦相似度聚類,支援多進程批處理、相似分組保存與重複照片刪除,適用於說話人/人臉資料集整理。

來源:https://blog.csdn.net/2403_87969572/article/details/152168951

抓取時間(ISO本地):2026-05-18 05:16:52


文章目錄


基於Python的人臉聚類與相似性分析工具:從檢測到整理的全流程實現

前言

在數據集中,Speaker的數量統計是非常重要的一個環節。基於此,我開發了一個自動化的人臉聚類與相似性分析工具,能夠快速處理目錄中的所有圖片,檢測人臉、提取特徵、識別相似人臉並自動分組,提供重複照片刪除功能。本文將詳細介紹這個工具的實現原理、核心技術及代碼結構,幫助大家理解從人臉檢測到照片整理的全流程技術細節。

模型及算法介紹

該工具整合了多個計算機視覺領域的經典模型和算法,核心技術棧包括人臉檢測、特徵提取、相似性計算和聚類分析四部分:

1. 人臉檢測:MTCNN

MTCNN(Multi-task Cascaded Convolutional Networks)是一種多任務級聯卷積神經網絡,能夠同時完成人臉檢測、關鍵點定位等任務。相比傳統的人臉檢測算法,MTCNN具有檢測速度快、準確率高的特點,尤其在處理遮擋、多角度人臉時表現優異。

在本工具中,MTCNN的主要作用是定位圖片中的人臉位置,輸出人臉邊界框(bbox),為後續的特徵提取提供準確的區域範圍。

2. 人臉特徵提取:InsightFace

InsightFace是一個開源的人臉分析工具包,內置了高性能的人臉特徵提取模型。它能夠將人臉圖像轉換為固定維度的特徵向量(嵌入向量),同一人的不同照片會生成相似的向量,而不同人的向量差異較大。

工具中使用InsightFace的FaceAnalysis模塊提取特徵,生成的特徵向量具有良好的區分性,為後續的相似性計算奠定基礎。

3. 相似性計算:餘弦相似度

餘弦相似度是衡量兩個向量方向差異的指標,取值範圍為[-1, 1],值越接近1表示向量方向越相似。對於人臉特徵向量,餘弦相似度能夠有效反映兩張人臉的相似程度:

cosine_similarity ( A , B )

A ⋅ B ∣ ∣ A ∣ ∣ ⋅ ∣ ∣ B ∣ ∣ \text{cosine\_similarity}(A,B) = \frac{A \cdot B}{||A|| \cdot ||B||} cosine_similarity(A,B)=∣∣A∣∣⋅∣∣B∣∣A⋅B​

工具中設置了相似度閾值(默認0.65),超過該閾值的人臉被判定為同一人。

4. 聚類算法:基於相似度矩陣的分組

在得到所有圖片的特徵向量後,工具通過計算全量特徵的相似度矩陣,構建相似人臉對,再通過貪心聚類算法將相似的人臉合併為組。這種方法雖然簡單,但在中小規模圖片集(數千張)上效率較高,且能保證聚類結果的可解釋性。

代碼實現介紹

工具的核心代碼封裝在FaceRecognitionSystem類中,配合多進程處理提高效率,整體結構清晰,分為初始化、圖片處理、相似性分析、結果處理和輔助分析五大模塊。

1. 初始化與多進程配置

為了提高大規模圖片處理的效率,工具使用多進程並行處理圖片。由於深度學習模型無法直接在多進程間共享,因此通過initialize_worker函數為每個進程單獨初始化MTCNN和InsightFace模型:

def initialize_worker():
    """初始化每個工作進程的模型"""
    global detector, app
    if detector is None:
        detector = MTCNN()  # 初始化MTCNN人臉檢測器
    if app is None:
        app = FaceAnalysis(providers=['CPUExecutionProvider'])  # 初始化InsightFace
        app.prepare(ctx_id=0, det_size=(640, 640))

2. 圖片處理流程

單張圖片處理(process_image函數)

該函數是多進程的核心任務單元,負責:

  • 讀取圖片並轉換為RGB格式(MTCNN要求輸入為RGB)
  • 使用MTCNN檢測人臉,獲取邊界框
  • 使用InsightFace提取人臉特徵向量
  • 返回圖片路徑、特徵向量和邊界框(若檢測到人臉)
def process_image(image_path):
    try:
        image = cv2.imread(image_path)
        if image is None:
            return None
        rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # BGR轉RGB
        results = detector.detect_faces(rgb_image)  # MTCNN檢測人臉
        if results:
            faces = app.get(rgb_image)  # InsightFace提取特徵
            if faces:
                x, y, w, h = results[0]['box']  # 取第一個人臉框
                return image_path, faces[0].embedding, (int(x), int(y), int(w), int(h))
    except Exception as e:
        print(f"處理圖片 {image_path} 時出錯: {str(e)}")
    return None
目錄批量處理(process_directory方法)

該方法遍歷指定目錄下的所有圖片文件,使用進程池並行調用process_image,收集有效結果(包含人臉的圖片)並存儲特徵向量、路徑和邊界框:

def process_directory(self, input_dir):
    image_extensions = ['.jpg', '.jpeg', '.png', '.bmp']
    image_files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) 
                  if os.path.isfile(os.path.join(input_dir, f)) 
                  and os.path.splitext(f)[1].lower() in image_extensions]
    
    with Pool(processes=2, initializer=initialize_worker) as pool:  # 2個進程並行
        results = list(tqdm(pool.imap_unordered(process_image, image_files), 
                          total=len(image_files), desc="處理圖片"))
    
    for result in results:
        if result:
            image_path, feature, bbox = result
            self.face_features.append(feature)
            self.file_paths.append(image_path)
            self.bboxes.append(bbox)

3. 相似人臉分析(find_similar_faces方法)

該方法是核心分析邏輯,分為三步:

  1. 計算所有特徵向量的餘弦相似度矩陣
  2. 基於閾值篩選相似人臉對
  3. 聚類相似人臉,形成群組並計算組內平均相似度
def find_similar_faces(self):
    if len(self.face_features) < 2:
        return []
    
    # 計算餘弦相似度矩陣
    features_array = np.array(self.face_features)
    similarity_matrix = cosine_similarity(features_array)
    
    # 聚類相似人臉
    groups = []
    used = set()
    for i in range(len(self.face_features)):
        if i not in used:
            group = [i]
            used.add(i)
            # 尋找所有與當前人臉相似的人臉
            for j in range(len(self.face_features)):
                if j not in used and similarity_matrix[i][j] > self.similarity_threshold:
                    group.append(j)
                    used.add(j)
            if len(group) > 1:
                groups.append(group)
    
    # 轉換為文件路徑並計算組內平均相似度
    result_groups = []
    for group in groups:
        group_files = [self.file_paths[idx] for idx in group]
        avg_sim = self._calculate_group_similarity(group, similarity_matrix)
        result_groups.append({'files': group_files, 'average_similarity': avg_sim})
    
    return sorted(result_groups, key=lambda x: x['average_similarity'], reverse=True)

4. 結果處理功能

保存分組結果(save_similar_groups方法)

將相似人臉組保存到不同文件夾,並生成CSV結果文件,包含每組的圖片路徑和平均相似度;未分組的圖片保存到ungrouped目錄。

    def save_similar_groups(self, output_dir, similar_groups):
        """
        將相似的人臉保存到不同的文件夾
        :param output_dir: 輸出目錄
        :param similar_groups: 相似人臉組
        """
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        # 保存結果到CSV
        results = []
        for i, group in enumerate(similar_groups):
            group_dir = os.path.join(output_dir, f"group_{i+1}_similarity_{group['average_similarity']:.4f}")
            os.makedirs(group_dir, exist_ok=True)
            
            for file_path in group['files']:
                file_name = os.path.basename(file_path)
                dest_path = os.path.join(group_dir, file_name)
                shutil.copy2(file_path, dest_path)
                
                results.append({
                    'group': i+1,
                    'file': file_name,
                    'similarity': group['average_similarity']
                })
        
        # 保存未分組的文件(沒有找到相似人臉的)
        all_group_files = set()
        for group in similar_groups:
            # 注意:這裡有一個小bug,應該是 group['files']
            all_group_files.update(file for file in group['files'])
        
        ungrouped_dir = os.path.join(output_dir, "ungrouped")
        os.makedirs(ungrouped_dir, exist_ok=True)
        
        for file_path in self.file_paths:
            if file_path not in all_group_files:
                file_name = os.path.basename(file_path)
                dest_path = os.path.join(ungrouped_dir, file_name)
                shutil.copy2(file_path, dest_path)
                results.append({
                    'group': 'ungrouped',
                    'file': file_name,
                    'similarity': 0.0
                })
        
        # 保存結果到CSV
        df = pd.DataFrame(results)
        df.to_csv(os.path.join(output_dir, "similarity_results.csv"), index=False)
        print(f"結果已保存到 {output_dir} 目錄")

重複照片刪除(delete_one_duplicate_per_group方法)

從每個相似組中隨機刪除一張照片,減少重複存儲(需用戶確認)。

    def delete_one_duplicate_per_group(self, similar_groups):
        """
        從每個相似組中隨機刪除一張照片。
        :param similar_groups: 相似人臉組
        """
        if not similar_groups:
            print("沒有發現相似的照片組,無需刪除。")
            return

        print("\n開始隨機刪除每個組中的一張重複照片...")
        deleted_count = 0
        for i, group in enumerate(similar_groups):
            if len(group['files']) > 1:
                # 隨機選擇一張要刪除的照片
                file_to_delete = random.choice(group['files'])
                try:
                    os.remove(file_to_delete)
                    print(f"組 {i+1}: 已刪除照片 -> {os.path.basename(file_to_delete)}")
                    deleted_count += 1
                except OSError as e:
                    print(f"刪除文件 {file_to_delete} 時出錯: {e}")
        print(f"\n總共刪除了 {deleted_count} 張重複照片。")

結果清理(clear_results方法)

刪除生成的結果目錄,方便重新處理。

    def clear_results(self, output_dir):
        """
        清除生成的結果目錄。
        :param output_dir: 輸出目錄
        """
        if os.path.exists(output_dir):
            try:
                shutil.rmtree(output_dir)
                print(f"\n已成功清除結果目錄: {output_dir}")
            except OSError as e:
                print(f"清除目錄 {output_dir} 時出錯: {e}")
        else:
            print("\n結果目錄不存在,無需清除。")

6. 主函數邏輯

主函數支持兩種模式(skip=0重新處理圖片,skip=1從已有結果加載),用戶可通過輸入目錄、輸出目錄和相似度閾值進行配置,並支持交互式選擇是否刪除重複照片和清理結果。

if __name__ == "__main__":
    # 配置
    script_dir = os.path.dirname(os.path.abspath(__file__))
    INPUT_DIRECTORY = os.path.join(script_dir, "face_dir")  # 替換為你的圖片目錄
    OUTPUT_DIRECTORY = os.path.join(script_dir, "face_recognition_results")
    SIMILARITY_THRESHOLD = 0.65  # 可根據需要調整
    
    # skip=0: 重新處理圖片 | skip=1: 跳過處理,直接從結果文件操作
    skip = 1
    
    face_system = FaceRecognitionSystem(similarity_threshold=SIMILARITY_THRESHOLD)
    similar_groups = []

    if skip == 1:
        print("模式: skip=1。嘗試從現有結果文件加載...")
        csv_path = os.path.join(OUTPUT_DIRECTORY, "similarity_results.csv")
        if os.path.exists(csv_path):
            df = pd.read_csv(csv_path)
            # 篩選出已分組的記錄
            grouped_df = df[df['group'] != 'ungrouped'].copy()
            
            # pd.to_numeric to handle potential string group IDs
            grouped_df['group'] = pd.to_numeric(grouped_df['group'])

            for group_id, group_data in grouped_df.groupby('group'):
                # 從文件名重建完整路徑
                files = [os.path.join(INPUT_DIRECTORY, fname) for fname in group_data['file']]
                avg_sim = group_data['similarity'].iloc[0]
                similar_groups.append({
                    'files': files,
                    'average_similarity': avg_sim
                })
            print(f"成功從 {csv_path} 加載了 {len(similar_groups)} 個相似組。")
        else:
            print(f"錯誤: 結果文件 {csv_path} 未找到。")
            print("將自動切換到 skip=0 模式重新處理圖片。")
            # skip = 0 # 強制切換模式

    if skip == 0:
        print("模式: skip=0。開始處理圖片...")
        # 創建系統實例
        face_system.process_directory(INPUT_DIRECTORY)
        # 查找相似的人臉
        similar_groups = face_system.find_similar_faces()
        print(f"\n發現 {len(similar_groups)} 組可能包含同一人的照片")
        # 保存結果
        if similar_groups:
            face_system.save_similar_groups(OUTPUT_DIRECTORY, similar_groups)

    # --- 後續操作 ---
    if similar_groups:
        # 詢問是否刪除重複照片
        choice_delete = input("\n是否要從每個相似組中隨機刪除一張照片? (y/n): ").lower()
        if choice_delete in ['y', 'yes']:
            face_system.delete_one_duplicate_per_group(similar_groups)
    else:
        print("\n沒有發現可操作的相似組。")

    # 詢問是否清除結果
    choice_clear = input("\n是否要清除本次運行生成的結果? (y/n): ").lower()
    if choice_clear in ['y', 'yes']:
        face_system.clear_results(OUTPUT_DIRECTORY)

完整代碼

import os
import cv2
import numpy as np
import pandas as pd
from mtcnn import MTCNN
from insightface.app import FaceAnalysis
from sklearn.metrics.pairwise import cosine_similarity
import shutil
from tqdm import tqdm
from multiprocessing import Pool, cpu_count
import random

# 全局變量,用於在多進程中初始化模型
detector = None
app = None

def initialize_worker():
    """初始化每個工作進程的模型"""
    global detector, app
    if detector is None:
        detector = MTCNN()
    if app is None:
        app = FaceAnalysis(providers=['CPUExecutionProvider'])
        app.prepare(ctx_id=0, det_size=(640, 640))

def process_image(image_path):
    """
    處理單個圖片,檢測人臉並提取特徵(設計為在多進程中運行)
    :param image_path: 圖片路徑
    :return: (image_path, feature, bbox) 或 None; bbox=(x,y,w,h)
    """
    try:
        image = cv2.imread(image_path)
        if image is None:
            return None
        rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = detector.detect_faces(rgb_image)
        if results:
            faces = app.get(rgb_image)
            if faces:
                # MTCNN與insightface可能檢測順序不同,這裡優先使用MTCNN第一個框
                x, y, w, h = results[0]['box']
                return image_path, faces[0].embedding, (int(x), int(y), int(w), int(h))
    except Exception as e:
        print(f"處理圖片 {image_path} 時出錯: {str(e)}")
    return None

class FaceRecognitionSystem:
    def __init__(self, similarity_threshold=0.65):
        """
        初始化人臉檢測和識別系統
        :param similarity_threshold: 判定為同一人的相似度閾值
        """
        # 相似度閾值,超過此值判定為同一人
        self.similarity_threshold = similarity_threshold
        
        # 存儲人臉特徵和對應的文件路徑
        self.face_features = []
        self.file_paths = []
        self.valid_files = []
        self.bboxes = []  # 與file_paths一一對應的人臉框 (x,y,w,h)
        # 年齡性別模型相關
        self.age_net = None
        self.gender_net = None
        self.detector_net = None
        self.age_list = ['(0-2)', '(4-6)', '(8-12)', '(15-20)', '(25-32)', '(38-43)', '(48-53)', '(60-100)']
        self.gender_list = ['Male', 'Female']
        self.age_gender_loaded = False
        
    def process_directory(self, input_dir):
        """
        使用多進程處理目錄中的所有圖片
        :param input_dir: 圖片所在目錄
        """
        image_extensions = ['.jpg', '.jpeg', '.png', '.bmp']
        image_files = [
            os.path.join(input_dir, f) 
            for f in os.listdir(input_dir) 
            if os.path.isfile(os.path.join(input_dir, f)) 
            and os.path.splitext(f)[1].lower() in image_extensions
        ]
        
        # 在這裡設置要使用的進程數 (max_workers)
        # 設置為 None 將自動使用所有可用的CPU核心
        max_workers = 2
        
        num_to_print = max_workers if max_workers is not None else cpu_count()
        print(f"發現 {len(image_files)} 個圖片文件,使用 {num_to_print} 個進程開始處理...")
        
        # 創建進程池
        with Pool(processes=max_workers, initializer=initialize_worker) as pool:
            # 使用imap_unordered來獲取進度條
            results = list(tqdm(
                pool.imap_unordered(process_image, image_files), 
                total=len(image_files),
                desc="處理圖片"
            ))

        for result in results:
            if result:
                image_path, feature, bbox = result
                self.face_features.append(feature)
                self.file_paths.append(image_path)
                self.valid_files.append(os.path.basename(image_path))
                self.bboxes.append(bbox)
        
        print(f"成功處理 {len(self.face_features)} 張包含人臉的圖片")
    
    def find_similar_faces(self):
        """
        找出相似的人臉(可能是同一個人)
        :return: 相似人臉組的列表
        """
        if len(self.face_features) < 2:
            return []
            
        # 計算所有特徵之間的餘弦相似度
        features_array = np.array(self.face_features)
        similarity_matrix = cosine_similarity(features_array)
        
        # 找出相似的人臉對
        similar_pairs = []
        n = len(self.face_features)
        
        for i in range(n):
            for j in range(i + 1, n):
                if similarity_matrix[i][j] > self.similarity_threshold:
                    similar_pairs.append((
                        self.file_paths[i], 
                        self.file_paths[j], 
                        similarity_matrix[i][j]
                    ))
        
        # 聚類相似的人臉,形成群組
        groups = []
        used = set()
        
        for i in range(n):
            if i not in used:
                group = [i]
                used.add(i)
                
                # 尋找所有與當前人臉相似的人臉
                for j in range(n):
                    if j not in used and similarity_matrix[i][j] > self.similarity_threshold:
                        group.append(j)
                        used.add(j)
                
                if len(group) > 1:  # 只保留有多個相似人臉的組
                    groups.append(group)
        
        # 將索引轉換為文件路徑
        result_groups = []
        for group in groups:
            group_files = [self.file_paths[idx] for idx in group]
            # 計算組內平均相似度
            avg_sim = self._calculate_group_similarity(group, similarity_matrix)
            result_groups.append({
                'files': group_files,
                'average_similarity': avg_sim
            })
        
        # 按組內平均相似度排序
        result_groups.sort(key=lambda x: x['average_similarity'], reverse=True)
        
        return result_groups
    
    def _calculate_group_similarity(self, group, similarity_matrix):
        """計算組內平均相似度"""
        if len(group) <= 1:
            return 0.0
            
        total = 0.0
        count = 0
        
        for i in range(len(group)):
            for j in range(i + 1, len(group)):
                total = group[i]
                idx_j = group[j]
                total += similarity_matrix[idx_j][idx_j]
                count += 1
        
        return total / count if count > 0 else 0.0
    
    def save_similar_groups(self, output_dir, similar_groups):
        """
        將相似的人臉保存到不同的文件夾
        :param output_dir: 輸出目錄
        :param similar_groups: 相似人臉組
        """
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        # 保存結果到CSV
        results = []
        for i, group in enumerate(similar_groups):
            group_dir = os.path.join(output_dir, f"group_{i+1}_similarity_{group['average_similarity']:.4f}")
            os.makedirs(group_dir, exist_ok=True)
            
            for file_path in group['files']:
                file_name = os.path.basename(file_path)
                dest_path = os.path.join(group_dir, file_name)
                shutil.copy2(file_path, dest_path)
                
                results.append({
                    'group': i+1,
                    'file': file_name,
                    'similarity': group['average_similarity']
                })
        
        # 保存未分組的文件(沒有找到相似人臉的)
        all_group_files = set()
        for group in similar_groups:
            # 注意:這裡有一個小bug,應該是 group['files']
            all_group_files.update(file for file in group['files'])
        
        ungrouped_dir = os.path.join(output_dir, "ungrouped")
        os.makedirs(ungrouped_dir, exist_ok=True)
        
        for file_path in self.file_paths:
            if file_path not in all_group_files:
                file_name = os.path.basename(file_path)
                dest_path = os.path.join(ungrouped_dir, file_name)
                shutil.copy2(file_path, dest_path)
                results.append({
                    'group': 'ungrouped',
                    'file': file_name,
                    'similarity': 0.0
                })
        
        # 保存結果到CSV
        df = pd.DataFrame(results)
        df.to_csv(os.path.join(output_dir, "similarity_results.csv"), index=False)
        print(f"結果已保存到 {output_dir} 目錄")

    def delete_one_duplicate_per_group(self, similar_groups):
        """
        從每個相似組中隨機刪除一張照片。
        :param similar_groups: 相似人臉組
        """
        if not similar_groups:
            print("沒有發現相似的照片組,無需刪除。")
            return

        print("\n開始隨機刪除每個組中的一張重複照片...")
        deleted_count = 0
        for i, group in enumerate(similar_groups):
            if len(group['files']) > 1:
                # 隨機選擇一張要刪除的照片
                file_to_delete = random.choice(group['files'])
                try:
                    os.remove(file_to_delete)
                    print(f"組 {i+1}: 已刪除照片 -> {os.path.basename(file_to_delete)}")
                    deleted_count += 1
                except OSError as e:
                    print(f"刪除文件 {file_to_delete} 時出錯: {e}")
        print(f"\n總共刪除了 {deleted_count} 張重複照片。")

    def clear_results(self, output_dir):
        """
        清除生成的結果目錄。
        :param output_dir: 輸出目錄
        """
        if os.path.exists(output_dir):
            try:
                shutil.rmtree(output_dir)
                print(f"\n已成功清除結果目錄: {output_dir}")
            except OSError as e:
                print(f"清除目錄 {output_dir} 時出錯: {e}")
        else:
            print("\n結果目錄不存在,無需清除。")

    def ensure_age_gender_models(self, base_dir):
        """加載年齡性別檢測相關模型 (只加載一次)
        現在模型目錄定位為: (當前腳本目錄的上一級)/age_gender
        """
        if self.age_gender_loaded:
            return True
        try:
            # 原來 base_dir 是 face 子目錄,需要取上一級目錄
            project_root = os.path.dirname(base_dir)  # 上一級目錄 (Lip_reading)
            model_dir = os.path.join(project_root, 'age_gender')

            age_prototxt = os.path.join(model_dir, 'age_deploy.prototxt')
            age_caffemodel = os.path.join(model_dir, 'age_net.caffemodel')
            gender_prototxt = os.path.join(model_dir, 'gender_deploy.prototxt')
            gender_caffemodel = os.path.join(model_dir, 'gender_net.caffemodel')
            face_pb = os.path.join(model_dir, 'opencv_face_detector_uint8.pb')
            face_pbtxt = os.path.join(model_dir, 'opencv_face_detector.pbtxt')

            needed = [age_prototxt, age_caffemodel, gender_prototxt, gender_caffemodel, face_pb, face_pbtxt]
            for p in needed:
                if not os.path.exists(p):
                    print(f"缺少模型文件: {p},跳過年齡性別統計。")
                    return False

            self.age_net = cv2.dnn.readNet(age_caffemodel, age_prototxt)
            self.gender_net = cv2.dnn.readNet(gender_caffemodel, gender_prototxt)
            self.detector_net = cv2.dnn.readNetFromTensorflow(face_pb, face_pbtxt)
            self.age_gender_loaded = True
            print(f"年齡/性別模型加載完成。(模型目錄: {model_dir})")
            return True
        except Exception as e:
            print(f"加載年齡性別模型失敗: {e}")
            return False

    def predict_age_gender_for_image(self, image_path):
        """對單張圖片預測第一張人臉的年齡段與性別。返回 (gender, age_range) 或 (None, None)"""
        if not self.age_gender_loaded:
            return (None, None)
        img = cv2.imread(image_path)
        if img is None:
            return (None, None)
        h, w = img.shape[:2]
        blob = cv2.dnn.blobFromImage(img, 1.0, (300, 300), [104, 117, 123], False, False)
        self.detector_net.setInput(blob)
        detections = self.detector_net.forward()
        best_conf = 0
        face_box = None
        for i in range(detections.shape[2]):
            confidence = detections[0, 0, i, 2]
            if confidence > 0.6 and confidence > best_conf:
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                x1, y1, x2, y2 = box.astype(int)
                x1, y1 = max(0, x1), max(0, y1)
                x2, y2 = min(w - 1, x2), min(h - 1, y2)
                face_box = (x1, y1, x2, y2)
                best_conf = confidence
        if face_box is None:
            return (None, None)
        x1, y1, x2, y2 = face_box
        face_roi = img[y1:y2, x1:x2].copy()
        if face_roi.size == 0:
            return (None, None)
        blob_face = cv2.dnn.blobFromImage(face_roi, 1.0, (227, 227), (78.4263377603, 87.7689143744, 114.895847746), swapRB=False)
        # gender
        self.gender_net.setInput(blob_face)
        gender_preds = self.gender_net.forward()
        gender = self.gender_list[gender_preds[0].argmax()]
        # age
        self.age_net.setInput(blob_face)
        age_preds = self.age_net.forward()
        age_range = self.age_list[age_preds[0].argmax()]
        return (gender, age_range)

    def analyze_age_gender(self, base_dir, output_dir, similar_groups):
        """
        對所有圖片進行年齡性別統計,並輸出兩個文件:
        1) age_gender_details.csv: 每張圖 -> 文件名, group_id, gender, age_range
        2) age_gender_stats.txt: 彙總統計
        """
        if not self.ensure_age_gender_models(base_dir):
            return
        records = []
        # 構建文件到組的映射
        file_to_group = {}
        for idx, g in enumerate(similar_groups, start=1):
            for f in g['files']:
                file_to_group[os.path.basename(f)] = idx
        # 如果沒有groups,也統計所有現有file_paths
        target_files = self.file_paths if self.file_paths else []

        if not target_files:
            print("沒有可用於年齡性別統計的圖片。")
            return
        print("開始進行年齡/性別統計...")
        gender_counter = {}
        age_counter = {}
        for path in tqdm(target_files, desc="Age/Gender"):
            gender, age_range = self.predict_age_gender_for_image(path)
            base_name = os.path.basename(path)
            group_id = file_to_group.get(base_name, 'ungrouped')
            records.append({
                'file': base_name,
                'group_id': group_id,
                'gender': gender if gender else 'Unknown',
                'age_range': age_range if age_range else 'Unknown'
            })
            if gender:
                gender_counter[gender] = gender_counter.get(gender, 0) + 1
            else:
                gender_counter['Unknown'] = gender_counter.get('Unknown', 0) + 1
            if age_range:
                age_counter[age_range] = age_counter.get(age_range, 0) + 1
            else:
                age_counter['Unknown'] = age_counter.get('Unknown', 0) + 1
        # 保存明細
        os.makedirs(output_dir, exist_ok=True)
        details_path = os.path.join(output_dir, 'age_gender_details.csv')
        pd.DataFrame(records).to_csv(details_path, index=False)
        # 彙總
        stats_lines = ["Gender Statistics:"]
        for k,v in gender_counter.items():
            stats_lines.append(f"  {k}: {v}")
        stats_lines.append("\nAge Range Statistics:")
        for k,v in age_counter.items():
            stats_lines.append(f"  {k}: {v}")
        stats_path = os.path.join(output_dir, 'age_gender_stats.txt')
        with open(stats_path, 'w', encoding='utf-8') as fw:
            fw.write('\n'.join(stats_lines))
        print(f"年齡/性別統計完成,已保存到: {details_path} 與 {stats_path}")

if __name__ == "__main__":
    # 配置
    script_dir = os.path.dirname(os.path.abspath(__file__))
    INPUT_DIRECTORY = os.path.join(script_dir, "face_dir")  # 替換為你的圖片目錄
    OUTPUT_DIRECTORY = os.path.join(script_dir, "face_recognition_results")
    SIMILARITY_THRESHOLD = 0.65  # 可根據需要調整
    
    # skip=0: 重新處理圖片 | skip=1: 跳過處理,直接從結果文件操作
    skip = 1
    
    face_system = FaceRecognitionSystem(similarity_threshold=SIMILARITY_THRESHOLD)
    similar_groups = []

    if skip == 1:
        print("模式: skip=1。嘗試從現有結果文件加載...")
        csv_path = os.path.join(OUTPUT_DIRECTORY, "similarity_results.csv")
        if os.path.exists(csv_path):
            df = pd.read_csv(csv_path)
            # 篩選出已分組的記錄
            grouped_df = df[df['group'] != 'ungrouped'].copy()
            
            # pd.to_numeric to handle potential string group IDs
            grouped_df['group'] = pd.to_numeric(grouped_df['group'])

            for group_id, group_data in grouped_df.groupby('group'):
                # 從文件名重建完整路徑
                files = [os.path.join(INPUT_DIRECTORY, fname) for fname in group_data['file']]
                avg_sim = group_data['similarity'].iloc[0]
                similar_groups.append({
                    'files': files,
                    'average_similarity': avg_sim
                })
            print(f"成功從 {csv_path} 加載了 {len(similar_groups)} 個相似組。")
        else:
            print(f"錯誤: 結果文件 {csv_path} 未找到。")
            print("將自動切換到 skip=0 模式重新處理圖片。")
            # skip = 0 # 強制切換模式

    if skip == 0:
        print("模式: skip=0。開始處理圖片...")
        # 創建系統實例
        face_system.process_directory(INPUT_DIRECTORY)
        # 查找相似的人臉
        similar_groups = face_system.find_similar_faces()
        print(f"\n發現 {len(similar_groups)} 組可能包含同一人的照片")
        # 保存結果
        if similar_groups:
            face_system.save_similar_groups(OUTPUT_DIRECTORY, similar_groups)

    # --- 後續操作 ---
    if similar_groups:
        # 詢問是否刪除重複照片
        choice_delete = input("\n是否要從每個相似組中隨機刪除一張照片? (y/n): ").lower()
        if choice_delete in ['y', 'yes']:
            face_system.delete_one_duplicate_per_group(similar_groups)
    else:
        print("\n沒有發現可操作的相似組。")

    # 詢問是否清除結果
    choice_clear = input("\n是否要清除本次運行生成的結果? (y/n): ").lower()
    if choice_clear in ['y', 'yes']:
        face_system.clear_results(OUTPUT_DIRECTORY)


總結

本工具通過整合MTCNN、InsightFace等經典模型,結合餘弦相似度和聚類算法,實現了從人臉檢測、特徵提取到相似性分析的全流程自動化。主要優勢包括:

  1. 高效性:多進程並行處理大幅提升大規模圖片的處理速度;
  2. 實用性:支持相似人臉分組、重複刪除和年齡性別分析,滿足日常照片整理需求;
  3. 可擴展性:代碼結構清晰,可通過調整相似度閾值、更換模型或優化聚類算法進一步提升性能。

希望這個工具能幫助大家更高效地管理照片,也為學習人臉識別技術的同學提供一個可參考的實踐案例。

Computer Vision: A Python-Based Face Recognition and Photo Management Tool — Full Pipeline from Detection to Organization

Python face clustering with MTCNN, InsightFace embeddings, cosine similarity grouping, multiprocessing, and duplicate photo cleanup.

Captured at (local ISO): 2026-05-18 05:16:52


Python-Based Face Clustering and Similarity Analysis Tool: Full Pipeline from Detection to Organization

Introduction

In many datasets, counting speakers is an important step. Building on that need, I developed an automated face clustering and similarity analysis tool that can quickly process all images in a directory: detect faces, extract embeddings, identify similar faces and group them automatically, and optionally remove duplicate photos. This article explains how the tool works — the core techniques and code structure — so readers can follow the full pipeline from face detection through photo organization.

Models and Algorithms

The tool combines several classic models and algorithms from computer vision. The technical stack has four main parts: face detection, feature extraction, similarity computation, and clustering.

1. Face Detection: MTCNN

MTCNN (Multi-task Cascaded Convolutional Networks) is a multi-task cascaded convolutional network that performs face detection and landmark localization jointly. Compared with traditional face detectors, MTCNN is fast and accurate, and it handles occlusion and varied poses well.

In this tool, MTCNN locates faces in each image and outputs bounding boxes (bbox), providing precise regions for downstream feature extraction.

2. Face Feature Extraction: InsightFace

InsightFace is an open-source face analysis toolkit with high-performance embedding models. It maps a face crop to a fixed-dimensional feature vector (embedding): images of the same person yield similar vectors, while different people tend to differ more strongly.

The tool uses InsightFace’s FaceAnalysis module for embeddings. The resulting vectors are discriminative enough to support reliable similarity scoring.

3. Similarity Calculation: Cosine Similarity

Cosine similarity measures how aligned two vectors are. Its range is [-1, 1]; values closer to 1 mean more similar directions. For face embeddings, cosine similarity reflects how similar two faces appear:

cosine_similarity ( A , B )

A ⋅ B ∣ ∣ A ∣ ∣ ⋅ ∣ ∣ B ∣ ∣ \text{cosine\_similarity}(A,B) = \frac{A \cdot B}{||A|| \cdot ||B||} cosine_similarity(A,B)=∣∣A∣∣⋅∣∣B∣∣A⋅B​

The tool uses a similarity threshold (default 0.65). Pairs above the threshold are treated as the same identity.

4. Clustering Algorithm: Grouping via Similarity Matrix

After embeddings are computed for all images, the tool builds a full pairwise similarity matrix, derives similar face pairs, and merges them with a greedy clustering procedure. The approach is simple but efficient on small-to-medium collections (thousands of images) and keeps results easy to interpret.

Code Implementation Overview

The core logic lives in the FaceRecognitionSystem class, with multiprocessing for throughput. The design splits into initialization, image processing, similarity analysis, result handling, and auxiliary analysis.

1. Initialization and Multiprocessing Configuration

To speed up large batches, images are processed in parallel. Deep models cannot be shared trivially across processes, so initialize_worker loads MTCNN and InsightFace per worker process:

def initialize_worker():
    """初始化每个工作进程的模型"""
    global detector, app
    if detector is None:
        detector = MTCNN()  # 初始化MTCNN人脸检测器
    if app is None:
        app = FaceAnalysis(providers=['CPUExecutionProvider'])  # 初始化InsightFace
        app.prepare(ctx_id=0, det_size=(640, 640))

2. Image Processing Pipeline

Single Image Processing (process_image function)

This function is the main worker task. It:

  • Loads the image and converts it to RGB (required by MTCNN)
  • Runs MTCNN to obtain bounding boxes
  • Uses InsightFace to extract a face embedding
  • Returns the image path, embedding, and bbox when a face is found
def process_image(image_path):
    try:
        image = cv2.imread(image_path)
        if image is None:
            return None
        rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # BGR转RGB
        results = detector.detect_faces(rgb_image)  # MTCNN检测人脸
        if results:
            faces = app.get(rgb_image)  # InsightFace提取特征
            if faces:
                x, y, w, h = results[0]['box']  # 取第一个人脸框
                return image_path, faces[0].embedding, (int(x), int(y), int(w), int(h))
    except Exception as e:
        print(f"处理图片 {image_path} 时出错: {str(e)}")
    return None
Directory Batch Processing (process_directory method)

This method walks the given directory for image files, runs process_image in parallel via a process pool, and collects valid results (paths with detected faces), storing embeddings, paths, and boxes:

def process_directory(self, input_dir):
    image_extensions = ['.jpg', '.jpeg', '.png', '.bmp']
    image_files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) 
                  if os.path.isfile(os.path.join(input_dir, f)) 
                  and os.path.splitext(f)[1].lower() in image_extensions]
    
    with Pool(processes=2, initializer=initialize_worker) as pool:  # 2个进程并行
        results = list(tqdm(pool.imap_unordered(process_image, image_files), 
                          total=len(image_files), desc="处理图片"))
    
    for result in results:
        if result:
            image_path, feature, bbox = result
            self.face_features.append(feature)
            self.file_paths.append(image_path)
            self.bboxes.append(bbox)

3. Similar Face Analysis (find_similar_faces method)

This is the core analysis routine. It:

  1. Builds the cosine similarity matrix over all embeddings
  2. Filters pairs above the threshold
  3. Clusters similar faces into groups and computes average within-group similarity
def find_similar_faces(self):
    if len(self.face_features) < 2:
        return []
    
    # 计算余弦相似度矩阵
    features_array = np.array(self.face_features)
    similarity_matrix = cosine_similarity(features_array)
    
    # 聚类相似人脸
    groups = []
    used = set()
    for i in range(len(self.face_features)):
        if i not in used:
            group = [i]
            used.add(i)
            # 寻找所有与当前人脸相似的人脸
            for j in range(len(self.face_features)):
                if j not in used and similarity_matrix[i][j] > self.similarity_threshold:
                    group.append(j)
                    used.add(j)
            if len(group) > 1:
                groups.append(group)
    
    # 转换为文件路径并计算组内平均相似度
    result_groups = []
    for group in groups:
        group_files = [self.file_paths[idx] for idx in group]
        avg_sim = self._calculate_group_similarity(group, similarity_matrix)
        result_groups.append({'files': group_files, 'average_similarity': avg_sim})
    
    return sorted(result_groups, key=lambda x: x['average_similarity'], reverse=True)

4. Result Handling Features

Saving Group Results (save_similar_groups method)

Similar-face groups are copied into separate folders, and a CSV is written with paths and average similarity per group. Images that never matched anyone go into an ungrouped folder.

    def save_similar_groups(self, output_dir, similar_groups):
        """
        将相似的人脸保存到不同的文件夹
        :param output_dir: 输出目录
        :param similar_groups: 相似人脸组
        """
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        # 保存结果到CSV
        results = []
        for i, group in enumerate(similar_groups):
            group_dir = os.path.join(output_dir, f"group_{i+1}_similarity_{group['average_similarity']:.4f}")
            os.makedirs(group_dir, exist_ok=True)
            
            for file_path in group['files']:
                file_name = os.path.basename(file_path)
                dest_path = os.path.join(group_dir, file_name)
                shutil.copy2(file_path, dest_path)
                
                results.append({
                    'group': i+1,
                    'file': file_name,
                    'similarity': group['average_similarity']
                })
        
        # 保存未分组的文件(没有找到相似人脸的)
        all_group_files = set()
        for group in similar_groups:
            # 注意:这里有一个小bug,应该是 group['files']
            all_group_files.update(file for file in group['files'])
        
        ungrouped_dir = os.path.join(output_dir, "ungrouped")
        os.makedirs(ungrouped_dir, exist_ok=True)
        
        for file_path in self.file_paths:
            if file_path not in all_group_files:
                file_name = os.path.basename(file_path)
                dest_path = os.path.join(ungrouped_dir, file_name)
                shutil.copy2(file_path, dest_path)
                results.append({
                    'group': 'ungrouped',
                    'file': file_name,
                    'similarity': 0.0
                })
        
        # 保存结果到CSV
        df = pd.DataFrame(results)
        df.to_csv(os.path.join(output_dir, "similarity_results.csv"), index=False)
        print(f"结果已保存到 {output_dir} 目录")

Duplicate Photo Deletion (delete_one_duplicate_per_group method)

Randomly deletes one image per similar group to reduce redundant storage (requires user confirmation).

    def delete_one_duplicate_per_group(self, similar_groups):
        """
        从每个相似组中随机删除一张照片。
        :param similar_groups: 相似人脸组
        """
        if not similar_groups:
            print("没有发现相似的照片组,无需删除。")
            return

        print("\n开始随机删除每个组中的一张重复照片...")
        deleted_count = 0
        for i, group in enumerate(similar_groups):
            if len(group['files']) > 1:
                # 随机选择一张要删除的照片
                file_to_delete = random.choice(group['files'])
                try:
                    os.remove(file_to_delete)
                    print(f"组 {i+1}: 已删除照片 -> {os.path.basename(file_to_delete)}")
                    deleted_count += 1
                except OSError as e:
                    print(f"删除文件 {file_to_delete} 时出错: {e}")
        print(f"\n总共删除了 {deleted_count} 张重复照片。")

Result Cleanup (clear_results method)

Deletes generated output directories so you can rerun from scratch.

    def clear_results(self, output_dir):
        """
        清除生成的结果目录。
        :param output_dir: 输出目录
        """
        if os.path.exists(output_dir):
            try:
                shutil.rmtree(output_dir)
                print(f"\n已成功清除结果目录: {output_dir}")
            except OSError as e:
                print(f"清除目录 {output_dir} 时出错: {e}")
        else:
            print("\n结果目录不存在,无需清除。")

6. Main Function Logic

The main entry supports two modes (skip=0 reprocesses images; skip=1 loads existing CSV results). Users configure input/output directories and the similarity threshold, then interactively choose whether to delete duplicates and clear outputs.

if __name__ == "__main__":
    # 配置
    script_dir = os.path.dirname(os.path.abspath(__file__))
    INPUT_DIRECTORY = os.path.join(script_dir, "face_dir")  # 替换为你的图片目录
    OUTPUT_DIRECTORY = os.path.join(script_dir, "face_recognition_results")
    SIMILARITY_THRESHOLD = 0.65  # 可根据需要调整
    
    # skip=0: 重新处理图片 | skip=1: 跳过处理,直接从结果文件操作
    skip = 1
    
    face_system = FaceRecognitionSystem(similarity_threshold=SIMILARITY_THRESHOLD)
    similar_groups = []

    if skip == 1:
        print("模式: skip=1。尝试从现有结果文件加载...")
        csv_path = os.path.join(OUTPUT_DIRECTORY, "similarity_results.csv")
        if os.path.exists(csv_path):
            df = pd.read_csv(csv_path)
            # 筛选出已分组的记录
            grouped_df = df[df['group'] != 'ungrouped'].copy()
            
            # pd.to_numeric to handle potential string group IDs
            grouped_df['group'] = pd.to_numeric(grouped_df['group'])

            for group_id, group_data in grouped_df.groupby('group'):
                # 从文件名重建完整路径
                files = [os.path.join(INPUT_DIRECTORY, fname) for fname in group_data['file']]
                avg_sim = group_data['similarity'].iloc[0]
                similar_groups.append({
                    'files': files,
                    'average_similarity': avg_sim
                })
            print(f"成功从 {csv_path} 加载了 {len(similar_groups)} 个相似组。")
        else:
            print(f"错误: 结果文件 {csv_path} 未找到。")
            print("将自动切换到 skip=0 模式重新处理图片。")
            # skip = 0 # 强制切换模式

    if skip == 0:
        print("模式: skip=0。开始处理图片...")
        # 创建系统实例
        face_system.process_directory(INPUT_DIRECTORY)
        # 查找相似的人脸
        similar_groups = face_system.find_similar_faces()
        print(f"\n发现 {len(similar_groups)} 组可能包含同一人的照片")
        # 保存结果
        if similar_groups:
            face_system.save_similar_groups(OUTPUT_DIRECTORY, similar_groups)

    # --- 后续操作 ---
    if similar_groups:
        # 询问是否删除重复照片
        choice_delete = input("\n是否要从每个相似组中随机删除一张照片? (y/n): ").lower()
        if choice_delete in ['y', 'yes']:
            face_system.delete_one_duplicate_per_group(similar_groups)
    else:
        print("\n没有发现可操作的相似组。")

    # 询问是否清除结果
    choice_clear = input("\n是否要清除本次运行生成的结果? (y/n): ").lower()
    if choice_clear in ['y', 'yes']:
        face_system.clear_results(OUTPUT_DIRECTORY)

Complete Code

import os
import cv2
import numpy as np
import pandas as pd
from mtcnn import MTCNN
from insightface.app import FaceAnalysis
from sklearn.metrics.pairwise import cosine_similarity
import shutil
from tqdm import tqdm
from multiprocessing import Pool, cpu_count
import random

# 全局变量,用于在多进程中初始化模型
detector = None
app = None

def initialize_worker():
    """初始化每个工作进程的模型"""
    global detector, app
    if detector is None:
        detector = MTCNN()
    if app is None:
        app = FaceAnalysis(providers=['CPUExecutionProvider'])
        app.prepare(ctx_id=0, det_size=(640, 640))

def process_image(image_path):
    """
    处理单个图片,检测人脸并提取特征(设计为在多进程中运行)
    :param image_path: 图片路径
    :return: (image_path, feature, bbox) 或 None; bbox=(x,y,w,h)
    """
    try:
        image = cv2.imread(image_path)
        if image is None:
            return None
        rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = detector.detect_faces(rgb_image)
        if results:
            faces = app.get(rgb_image)
            if faces:
                # MTCNN与insightface可能检测顺序不同,这里优先使用MTCNN第一个框
                x, y, w, h = results[0]['box']
                return image_path, faces[0].embedding, (int(x), int(y), int(w), int(h))
    except Exception as e:
        print(f"处理图片 {image_path} 时出错: {str(e)}")
    return None

class FaceRecognitionSystem:
    def __init__(self, similarity_threshold=0.65):
        """
        初始化人脸检测和识别系统
        :param similarity_threshold: 判定为同一人的相似度阈值
        """
        # 相似度阈值,超过此值判定为同一人
        self.similarity_threshold = similarity_threshold
        
        # 存储人脸特征和对应的文件路径
        self.face_features = []
        self.file_paths = []
        self.valid_files = []
        self.bboxes = []  # 与file_paths一一对应的人脸框 (x,y,w,h)
        # 年龄性别模型相关
        self.age_net = None
        self.gender_net = None
        self.detector_net = None
        self.age_list = ['(0-2)', '(4-6)', '(8-12)', '(15-20)', '(25-32)', '(38-43)', '(48-53)', '(60-100)']
        self.gender_list = ['Male', 'Female']
        self.age_gender_loaded = False
        
    def process_directory(self, input_dir):
        """
        使用多进程处理目录中的所有图片
        :param input_dir: 图片所在目录
        """
        image_extensions = ['.jpg', '.jpeg', '.png', '.bmp']
        image_files = [
            os.path.join(input_dir, f) 
            for f in os.listdir(input_dir) 
            if os.path.isfile(os.path.join(input_dir, f)) 
            and os.path.splitext(f)[1].lower() in image_extensions
        ]
        
        # 在这里设置要使用的进程数 (max_workers)
        # 设置为 None 将自动使用所有可用的CPU核心
        max_workers = 2
        
        num_to_print = max_workers if max_workers is not None else cpu_count()
        print(f"发现 {len(image_files)} 个图片文件,使用 {num_to_print} 个进程开始处理...")
        
        # 创建进程池
        with Pool(processes=max_workers, initializer=initialize_worker) as pool:
            # 使用imap_unordered来获取进度条
            results = list(tqdm(
                pool.imap_unordered(process_image, image_files), 
                total=len(image_files),
                desc="处理图片"
            ))

        for result in results:
            if result:
                image_path, feature, bbox = result
                self.face_features.append(feature)
                self.file_paths.append(image_path)
                self.valid_files.append(os.path.basename(image_path))
                self.bboxes.append(bbox)
        
        print(f"成功处理 {len(self.face_features)} 张包含人脸的图片")
    
    def find_similar_faces(self):
        """
        找出相似的人脸(可能是同一个人)
        :return: 相似人脸组的列表
        """
        if len(self.face_features) < 2:
            return []
            
        # 计算所有特征之间的余弦相似度
        features_array = np.array(self.face_features)
        similarity_matrix = cosine_similarity(features_array)
        
        # 找出相似的人脸对
        similar_pairs = []
        n = len(self.face_features)
        
        for i in range(n):
            for j in range(i + 1, n):
                if similarity_matrix[i][j] > self.similarity_threshold:
                    similar_pairs.append((
                        self.file_paths[i], 
                        self.file_paths[j], 
                        similarity_matrix[i][j]
                    ))
        
        # 聚类相似的人脸,形成群组
        groups = []
        used = set()
        
        for i in range(n):
            if i not in used:
                group = [i]
                used.add(i)
                
                # 寻找所有与当前人脸相似的人脸
                for j in range(n):
                    if j not in used and similarity_matrix[i][j] > self.similarity_threshold:
                        group.append(j)
                        used.add(j)
                
                if len(group) > 1:  # 只保留有多个相似人脸的组
                    groups.append(group)
        
        # 将索引转换为文件路径
        result_groups = []
        for group in groups:
            group_files = [self.file_paths[idx] for idx in group]
            # 计算组内平均相似度
            avg_sim = self._calculate_group_similarity(group, similarity_matrix)
            result_groups.append({
                'files': group_files,
                'average_similarity': avg_sim
            })
        
        # 按组内平均相似度排序
        result_groups.sort(key=lambda x: x['average_similarity'], reverse=True)
        
        return result_groups
    
    def _calculate_group_similarity(self, group, similarity_matrix):
        """计算组内平均相似度"""
        if len(group) <= 1:
            return 0.0
            
        total = 0.0
        count = 0
        
        for i in range(len(group)):
            for j in range(i + 1, len(group)):
                total = group[i]
                idx_j = group[j]
                total += similarity_matrix[idx_j][idx_j]
                count += 1
        
        return total / count if count > 0 else 0.0
    
    def save_similar_groups(self, output_dir, similar_groups):
        """
        将相似的人脸保存到不同的文件夹
        :param output_dir: 输出目录
        :param similar_groups: 相似人脸组
        """
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        # 保存结果到CSV
        results = []
        for i, group in enumerate(similar_groups):
            group_dir = os.path.join(output_dir, f"group_{i+1}_similarity_{group['average_similarity']:.4f}")
            os.makedirs(group_dir, exist_ok=True)
            
            for file_path in group['files']:
                file_name = os.path.basename(file_path)
                dest_path = os.path.join(group_dir, file_name)
                shutil.copy2(file_path, dest_path)
                
                results.append({
                    'group': i+1,
                    'file': file_name,
                    'similarity': group['average_similarity']
                })
        
        # 保存未分组的文件(没有找到相似人脸的)
        all_group_files = set()
        for group in similar_groups:
            # 注意:这里有一个小bug,应该是 group['files']
            all_group_files.update(file for file in group['files'])
        
        ungrouped_dir = os.path.join(output_dir, "ungrouped")
        os.makedirs(ungrouped_dir, exist_ok=True)
        
        for file_path in self.file_paths:
            if file_path not in all_group_files:
                file_name = os.path.basename(file_path)
                dest_path = os.path.join(ungrouped_dir, file_name)
                shutil.copy2(file_path, dest_path)
                results.append({
                    'group': 'ungrouped',
                    'file': file_name,
                    'similarity': 0.0
                })
        
        # 保存结果到CSV
        df = pd.DataFrame(results)
        df.to_csv(os.path.join(output_dir, "similarity_results.csv"), index=False)
        print(f"结果已保存到 {output_dir} 目录")

    def delete_one_duplicate_per_group(self, similar_groups):
        """
        从每个相似组中随机删除一张照片。
        :param similar_groups: 相似人脸组
        """
        if not similar_groups:
            print("没有发现相似的照片组,无需删除。")
            return

        print("\n开始随机删除每个组中的一张重复照片...")
        deleted_count = 0
        for i, group in enumerate(similar_groups):
            if len(group['files']) > 1:
                # 随机选择一张要删除的照片
                file_to_delete = random.choice(group['files'])
                try:
                    os.remove(file_to_delete)
                    print(f"组 {i+1}: 已删除照片 -> {os.path.basename(file_to_delete)}")
                    deleted_count += 1
                except OSError as e:
                    print(f"删除文件 {file_to_delete} 时出错: {e}")
        print(f"\n总共删除了 {deleted_count} 张重复照片。")

    def clear_results(self, output_dir):
        """
        清除生成的结果目录。
        :param output_dir: 输出目录
        """
        if os.path.exists(output_dir):
            try:
                shutil.rmtree(output_dir)
                print(f"\n已成功清除结果目录: {output_dir}")
            except OSError as e:
                print(f"清除目录 {output_dir} 时出错: {e}")
        else:
            print("\n结果目录不存在,无需清除。")

    def ensure_age_gender_models(self, base_dir):
        """加载年龄性别检测相关模型 (只加载一次)
        现在模型目录定位为: (当前脚本目录的上一级)/age_gender
        """
        if self.age_gender_loaded:
            return True
        try:
            # 原来 base_dir 是 face 子目录,需要取上一级目录
            project_root = os.path.dirname(base_dir)  # 上一级目录 (Lip_reading)
            model_dir = os.path.join(project_root, 'age_gender')

            age_prototxt = os.path.join(model_dir, 'age_deploy.prototxt')
            age_caffemodel = os.path.join(model_dir, 'age_net.caffemodel')
            gender_prototxt = os.path.join(model_dir, 'gender_deploy.prototxt')
            gender_caffemodel = os.path.join(model_dir, 'gender_net.caffemodel')
            face_pb = os.path.join(model_dir, 'opencv_face_detector_uint8.pb')
            face_pbtxt = os.path.join(model_dir, 'opencv_face_detector.pbtxt')

            needed = [age_prototxt, age_caffemodel, gender_prototxt, gender_caffemodel, face_pb, face_pbtxt]
            for p in needed:
                if not os.path.exists(p):
                    print(f"缺少模型文件: {p},跳过年龄性别统计。")
                    return False

            self.age_net = cv2.dnn.readNet(age_caffemodel, age_prototxt)
            self.gender_net = cv2.dnn.readNet(gender_caffemodel, gender_prototxt)
            self.detector_net = cv2.dnn.readNetFromTensorflow(face_pb, face_pbtxt)
            self.age_gender_loaded = True
            print(f"年龄/性别模型加载完成。(模型目录: {model_dir})")
            return True
        except Exception as e:
            print(f"加载年龄性别模型失败: {e}")
            return False

    def predict_age_gender_for_image(self, image_path):
        """对单张图片预测第一张人脸的年龄段与性别。返回 (gender, age_range) 或 (None, None)"""
        if not self.age_gender_loaded:
            return (None, None)
        img = cv2.imread(image_path)
        if img is None:
            return (None, None)
        h, w = img.shape[:2]
        blob = cv2.dnn.blobFromImage(img, 1.0, (300, 300), [104, 117, 123], False, False)
        self.detector_net.setInput(blob)
        detections = self.detector_net.forward()
        best_conf = 0
        face_box = None
        for i in range(detections.shape[2]):
            confidence = detections[0, 0, i, 2]
            if confidence > 0.6 and confidence > best_conf:
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                x1, y1, x2, y2 = box.astype(int)
                x1, y1 = max(0, x1), max(0, y1)
                x2, y2 = min(w - 1, x2), min(h - 1, y2)
                face_box = (x1, y1, x2, y2)
                best_conf = confidence
        if face_box is None:
            return (None, None)
        x1, y1, x2, y2 = face_box
        face_roi = img[y1:y2, x1:x2].copy()
        if face_roi.size == 0:
            return (None, None)
        blob_face = cv2.dnn.blobFromImage(face_roi, 1.0, (227, 227), (78.4263377603, 87.7689143744, 114.895847746), swapRB=False)
        # gender
        self.gender_net.setInput(blob_face)
        gender_preds = self.gender_net.forward()
        gender = self.gender_list[gender_preds[0].argmax()]
        # age
        self.age_net.setInput(blob_face)
        age_preds = self.age_net.forward()
        age_range = self.age_list[age_preds[0].argmax()]
        return (gender, age_range)

    def analyze_age_gender(self, base_dir, output_dir, similar_groups):
        """
        对所有图片进行年龄性别统计,并输出两个文件:
        1) age_gender_details.csv: 每张图 -> 文件名, group_id, gender, age_range
        2) age_gender_stats.txt: 汇总统计
        """
        if not self.ensure_age_gender_models(base_dir):
            return
        records = []
        # 构建文件到组的映射
        file_to_group = {}
        for idx, g in enumerate(similar_groups, start=1):
            for f in g['files']:
                file_to_group[os.path.basename(f)] = idx
        # 如果没有groups,也统计所有现有file_paths
        target_files = self.file_paths if self.file_paths else []

        if not target_files:
            print("没有可用于年龄性别统计的图片。")
            return
        print("开始进行年龄/性别统计...")
        gender_counter = {}
        age_counter = {}
        for path in tqdm(target_files, desc="Age/Gender"):
            gender, age_range = self.predict_age_gender_for_image(path)
            base_name = os.path.basename(path)
            group_id = file_to_group.get(base_name, 'ungrouped')
            records.append({
                'file': base_name,
                'group_id': group_id,
                'gender': gender if gender else 'Unknown',
                'age_range': age_range if age_range else 'Unknown'
            })
            if gender:
                gender_counter[gender] = gender_counter.get(gender, 0) + 1
            else:
                gender_counter['Unknown'] = gender_counter.get('Unknown', 0) + 1
            if age_range:
                age_counter[age_range] = age_counter.get(age_range, 0) + 1
            else:
                age_counter['Unknown'] = age_counter.get('Unknown', 0) + 1
        # 保存明细
        os.makedirs(output_dir, exist_ok=True)
        details_path = os.path.join(output_dir, 'age_gender_details.csv')
        pd.DataFrame(records).to_csv(details_path, index=False)
        # 汇总
        stats_lines = ["Gender Statistics:"]
        for k,v in gender_counter.items():
            stats_lines.append(f"  {k}: {v}")
        stats_lines.append("\nAge Range Statistics:")
        for k,v in age_counter.items():
            stats_lines.append(f"  {k}: {v}")
        stats_path = os.path.join(output_dir, 'age_gender_stats.txt')
        with open(stats_path, 'w', encoding='utf-8') as fw:
            fw.write('\n'.join(stats_lines))
        print(f"年龄/性别统计完成,已保存到: {details_path} 与 {stats_path}")

if __name__ == "__main__":
    # 配置
    script_dir = os.path.dirname(os.path.abspath(__file__))
    INPUT_DIRECTORY = os.path.join(script_dir, "face_dir")  # 替换为你的图片目录
    OUTPUT_DIRECTORY = os.path.join(script_dir, "face_recognition_results")
    SIMILARITY_THRESHOLD = 0.65  # 可根据需要调整
    
    # skip=0: 重新处理图片 | skip=1: 跳过处理,直接从结果文件操作
    skip = 1
    
    face_system = FaceRecognitionSystem(similarity_threshold=SIMILARITY_THRESHOLD)
    similar_groups = []

    if skip == 1:
        print("模式: skip=1。尝试从现有结果文件加载...")
        csv_path = os.path.join(OUTPUT_DIRECTORY, "similarity_results.csv")
        if os.path.exists(csv_path):
            df = pd.read_csv(csv_path)
            # 筛选出已分组的记录
            grouped_df = df[df['group'] != 'ungrouped'].copy()
            
            # pd.to_numeric to handle potential string group IDs
            grouped_df['group'] = pd.to_numeric(grouped_df['group'])

            for group_id, group_data in grouped_df.groupby('group'):
                # 从文件名重建完整路径
                files = [os.path.join(INPUT_DIRECTORY, fname) for fname in group_data['file']]
                avg_sim = group_data['similarity'].iloc[0]
                similar_groups.append({
                    'files': files,
                    'average_similarity': avg_sim
                })
            print(f"成功从 {csv_path} 加载了 {len(similar_groups)} 个相似组。")
        else:
            print(f"错误: 结果文件 {csv_path} 未找到。")
            print("将自动切换到 skip=0 模式重新处理图片。")
            # skip = 0 # 强制切换模式

    if skip == 0:
        print("模式: skip=0。开始处理图片...")
        # 创建系统实例
        face_system.process_directory(INPUT_DIRECTORY)
        # 查找相似的人脸
        similar_groups = face_system.find_similar_faces()
        print(f"\n发现 {len(similar_groups)} 组可能包含同一人的照片")
        # 保存结果
        if similar_groups:
            face_system.save_similar_groups(OUTPUT_DIRECTORY, similar_groups)

    # --- 后续操作 ---
    if similar_groups:
        # 询问是否删除重复照片
        choice_delete = input("\n是否要从每个相似组中随机删除一张照片? (y/n): ").lower()
        if choice_delete in ['y', 'yes']:
            face_system.delete_one_duplicate_per_group(similar_groups)
    else:
        print("\n没有发现可操作的相似组。")

    # 询问是否清除结果
    choice_clear = input("\n是否要清除本次运行生成的结果? (y/n): ").lower()
    if choice_clear in ['y', 'yes']:
        face_system.clear_results(OUTPUT_DIRECTORY)


Summary

By combining classic components such as MTCNN and InsightFace with cosine similarity and clustering, this tool automates the full path from detection and embedding extraction to similarity grouping. Highlights:

  1. Efficiency: Multiprocessing speeds up large batches substantially.
  2. Practicality: Face grouping, optional duplicate removal, and age/gender statistics cover typical photo-cleanup workflows.
  3. Extensibility: The layout makes it straightforward to tune thresholds, swap models, or refine clustering.

I hope this helps you organize photos faster and gives learners of face recognition a concrete reference implementation.