yanchang
yanchang
发布于 2025-09-30 / 22 阅读
0
0

AI换脸(facefusion进阶)(10.8修正视频后半段加速问题以及添加压缩)

碎碎念

事实上,你会发现上一篇文章部署的也是受监管的项目,所有内容在执行前是要加一层过滤器对内容进行审查的,以及单卡推理有一定的性能上的问题,所以在这篇文章以facefusion为例子,解决监管问题,和多卡推理问题

解决监管问题

事实上,你在推理一些视频内容的时候会发现,失败了,实际上是因为内容审查的原因,

然后在目录中可以找到这个文件

内容如下

from functools import lru_cache
from typing import List, Tuple

import numpy
from tqdm import tqdm

from facefusion import inference_manager, state_manager, wording
from facefusion.common_helper import is_macos
from facefusion.download import conditional_download_hashes, conditional_download_sources, resolve_download_url
from facefusion.execution import has_execution_provider
from facefusion.filesystem import resolve_relative_path
from facefusion.thread_helper import conditional_thread_semaphore
from facefusion.types import Detection, DownloadScope, DownloadSet, ExecutionProvider, Fps, InferencePool, ModelSet, VisionFrame
from facefusion.vision import detect_video_fps, fit_contain_frame, read_image, read_video_frame

STREAM_COUNTER = 0


@lru_cache()
def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
	return\
	{
		'nsfw_1':
		{
			'hashes':
			{
				'content_analyser':
				{
					'url': resolve_download_url('models-3.3.0', 'nsfw_1.hash'),
					'path': resolve_relative_path('../.assets/models/nsfw_1.hash')
				}
			},
			'sources':
			{
				'content_analyser':
				{
					'url': resolve_download_url('models-3.3.0', 'nsfw_1.onnx'),
					'path': resolve_relative_path('../.assets/models/nsfw_1.onnx')
				}
			},
			'size': (640, 640),
			'mean': (0.0, 0.0, 0.0),
			'standard_deviation': (1.0, 1.0, 1.0)
		},
		'nsfw_2':
		{
			'hashes':
			{
				'content_analyser':
				{
					'url': resolve_download_url('models-3.3.0', 'nsfw_2.hash'),
					'path': resolve_relative_path('../.assets/models/nsfw_2.hash')
				}
			},
			'sources':
			{
				'content_analyser':
				{
					'url': resolve_download_url('models-3.3.0', 'nsfw_2.onnx'),
					'path': resolve_relative_path('../.assets/models/nsfw_2.onnx')
				}
			},
			'size': (384, 384),
			'mean': (0.5, 0.5, 0.5),
			'standard_deviation': (0.5, 0.5, 0.5)
		},
		'nsfw_3':
		{
			'hashes':
			{
				'content_analyser':
				{
					'url': resolve_download_url('models-3.3.0', 'nsfw_3.hash'),
					'path': resolve_relative_path('../.assets/models/nsfw_3.hash')
				}
			},
			'sources':
			{
				'content_analyser':
				{
					'url': resolve_download_url('models-3.3.0', 'nsfw_3.onnx'),
					'path': resolve_relative_path('../.assets/models/nsfw_3.onnx')
				}
			},
			'size': (448, 448),
			'mean': (0.48145466, 0.4578275, 0.40821073),
			'standard_deviation': (0.26862954, 0.26130258, 0.27577711)
		}
	}


def get_inference_pool() -> InferencePool:
	model_names = [ 'nsfw_1', 'nsfw_2', 'nsfw_3' ]
	_, model_source_set = collect_model_downloads()

	return inference_manager.get_inference_pool(__name__, model_names, model_source_set)


def clear_inference_pool() -> None:
	model_names = [ 'nsfw_1', 'nsfw_2', 'nsfw_3' ]
	inference_manager.clear_inference_pool(__name__, model_names)


def resolve_execution_providers() -> List[ExecutionProvider]:
	if is_macos() and has_execution_provider('coreml'):
		return [ 'cpu' ]
	return state_manager.get_item('execution_providers')


def collect_model_downloads() -> Tuple[DownloadSet, DownloadSet]:
	model_set = create_static_model_set('full')
	model_hash_set = {}
	model_source_set = {}

	for content_analyser_model in [ 'nsfw_1', 'nsfw_2', 'nsfw_3' ]:
		model_hash_set[content_analyser_model] = model_set.get(content_analyser_model).get('hashes').get('content_analyser')
		model_source_set[content_analyser_model] = model_set.get(content_analyser_model).get('sources').get('content_analyser')

	return model_hash_set, model_source_set


def pre_check() -> bool:
	model_hash_set, model_source_set = collect_model_downloads()

	return conditional_download_hashes(model_hash_set) and conditional_download_sources(model_source_set)


def analyse_stream(vision_frame : VisionFrame, video_fps : Fps) -> bool:
	global STREAM_COUNTER

	STREAM_COUNTER = STREAM_COUNTER + 1
	if STREAM_COUNTER % int(video_fps) == 0:
		return analyse_frame(vision_frame)
	return False


def analyse_frame(vision_frame : VisionFrame) -> bool:
	return detect_nsfw(vision_frame)


@lru_cache()
def analyse_image(image_path : str) -> bool:
	vision_frame = read_image(image_path)
	return analyse_frame(vision_frame)


@lru_cache()
def analyse_video(video_path : str, trim_frame_start : int, trim_frame_end : int) -> bool:
	video_fps = detect_video_fps(video_path)
	frame_range = range(trim_frame_start, trim_frame_end)
	rate = 0.0
	total = 0
	counter = 0

	with tqdm(total = len(frame_range), desc = wording.get('analysing'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:

		for frame_number in frame_range:
			if frame_number % int(video_fps) == 0:
				vision_frame = read_video_frame(video_path, frame_number)
				total += 1
				if analyse_frame(vision_frame):
					counter += 1
			if counter > 0 and total > 0:
				rate = counter / total * 100
			progress.set_postfix(rate = rate)
			progress.update()

	return bool(rate > 10.0)


def detect_nsfw(vision_frame : VisionFrame) -> bool:
	is_nsfw_1 = detect_with_nsfw_1(vision_frame)
	is_nsfw_2 = detect_with_nsfw_2(vision_frame)
	is_nsfw_3 = detect_with_nsfw_3(vision_frame)

	return is_nsfw_1 and is_nsfw_2 or is_nsfw_1 and is_nsfw_3 or is_nsfw_2 and is_nsfw_3


def detect_with_nsfw_1(vision_frame : VisionFrame) -> bool:
	detect_vision_frame = prepare_detect_frame(vision_frame, 'nsfw_1')
	detection = forward_nsfw(detect_vision_frame, 'nsfw_1')
	detection_score = numpy.max(numpy.amax(detection[:, 4:], axis = 1))
	return bool(detection_score > 0.2)


def detect_with_nsfw_2(vision_frame : VisionFrame) -> bool:
	detect_vision_frame = prepare_detect_frame(vision_frame, 'nsfw_2')
	detection = forward_nsfw(detect_vision_frame, 'nsfw_2')
	detection_score = detection[0] - detection[1]
	return bool(detection_score > 0.25)


def detect_with_nsfw_3(vision_frame : VisionFrame) -> bool:
	detect_vision_frame = prepare_detect_frame(vision_frame, 'nsfw_3')
	detection = forward_nsfw(detect_vision_frame, 'nsfw_3')
	detection_score = (detection[2] + detection[3]) - (detection[0] + detection[1])
	return bool(detection_score > 10.5)


def forward_nsfw(vision_frame : VisionFrame, model_name : str) -> Detection:
	content_analyser = get_inference_pool().get(model_name)

	with conditional_thread_semaphore():
		detection = content_analyser.run(None,
		{
			'input': vision_frame
		})[0]

	if model_name in [ 'nsfw_2', 'nsfw_3' ]:
		return detection[0]

	return detection


def prepare_detect_frame(temp_vision_frame : VisionFrame, model_name : str) -> VisionFrame:
	model_set = create_static_model_set('full').get(model_name)
	model_size = model_set.get('size')
	model_mean = model_set.get('mean')
	model_standard_deviation = model_set.get('standard_deviation')

	detect_vision_frame = fit_contain_frame(temp_vision_frame, model_size)
	detect_vision_frame = detect_vision_frame[:, :, ::-1] / 255.0
	detect_vision_frame -= model_mean
	detect_vision_frame /= model_standard_deviation
	detect_vision_frame = numpy.expand_dims(detect_vision_frame.transpose(2, 0, 1), axis = 0).astype(numpy.float32)
	return detect_vision_frame

将所有的判断逻辑进行短路,修改为以下内容

from functools import lru_cache
from typing import List, Tuple

import numpy
from tqdm import tqdm

from facefusion import inference_manager, state_manager, wording
from facefusion.common_helper import is_macos
from facefusion.download import conditional_download_hashes, conditional_download_sources, resolve_download_url
from facefusion.execution import has_execution_provider
from facefusion.filesystem import resolve_relative_path
from facefusion.thread_helper import conditional_thread_semaphore
from facefusion.types import Detection, DownloadScope, DownloadSet, ExecutionProvider, Fps, InferencePool, ModelSet, VisionFrame
from facefusion.vision import detect_video_fps, fit_contain_frame, read_image, read_video_frame

STREAM_COUNTER = 0


@lru_cache()
def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
    return\
    {
        'nsfw_1':
        {
            'hashes':
            {
                'content_analyser':
                {
                    'url': resolve_download_url('models-3.3.0', 'nsfw_1.hash'),
                    'path': resolve_relative_path('../.assets/models/nsfw_1.hash')
                }
            },
            'sources':
            {
                'content_analyser':
                {
                    'url': resolve_download_url('models-3.3.0', 'nsfw_1.onnx'),
                    'path': resolve_relative_path('../.assets/models/nsfw_1.onnx')
                }
            },
            'size': (640, 640),
            'mean': (0.0, 0.0, 0.0),
            'standard_deviation': (1.0, 1.0, 1.0)
        },
        'nsfw_2':
        {
            'hashes':
            {
                'content_analyser':
                {
                    'url': resolve_download_url('models-3.3.0', 'nsfw_2.hash'),
                    'path': resolve_relative_path('../.assets/models/nsfw_2.hash')
                }
            },
            'sources':
            {
                'content_analyser':
                {
                    'url': resolve_download_url('models-3.3.0', 'nsfw_2.onnx'),
                    'path': resolve_relative_path('../.assets/models/nsfw_2.onnx')
                }
            },
            'size': (384, 384),
            'mean': (0.5, 0.5, 0.5),
            'standard_deviation': (0.5, 0.5, 0.5)
        },
        'nsfw_3':
        {
            'hashes':
            {
                'content_analyser':
                {
                    'url': resolve_download_url('models-3.3.0', 'nsfw_3.hash'),
                    'path': resolve_relative_path('../.assets/models/nsfw_3.hash')
                }
            },
            'sources':
            {
                'content_analyser':
                {
                    'url': resolve_download_url('models-3.3.0', 'nsfw_3.onnx'),
                    'path': resolve_relative_path('../.assets/models/nsfw_3.onnx')
                }
            },
            'size': (448, 448),
            'mean': (0.48145466, 0.4578275, 0.40821073),
            'standard_deviation': (0.26862954, 0.26130258, 0.27577711)
        }
    }


def get_inference_pool() -> InferencePool:
    model_names = [ 'nsfw_1', 'nsfw_2', 'nsfw_3' ]
    _, model_source_set = collect_model_downloads()

    return inference_manager.get_inference_pool(__name__, model_names, model_source_set)


def clear_inference_pool() -> None:
    model_names = [ 'nsfw_1', 'nsfw_2', 'nsfw_3' ]
    inference_manager.clear_inference_pool(__name__, model_names)


def resolve_execution_providers() -> List[ExecutionProvider]:
    if is_macos() and has_execution_provider('coreml'):
        return [ 'cpu' ]
    return state_manager.get_item('execution_providers')


def collect_model_downloads() -> Tuple[DownloadSet, DownloadSet]:
    model_set = create_static_model_set('full')
    model_hash_set = {}
    model_source_set = {}

    for content_analyser_model in [ 'nsfw_1', 'nsfw_2', 'nsfw_3' ]:
        model_hash_set[content_analyser_model] = model_set.get(content_analyser_model).get('hashes').get('content_analyser')
        model_source_set[content_analyser_model] = model_set.get(content_analyser_model).get('sources').get('content_analyser')

    return model_hash_set, model_source_set


def pre_check() -> bool:
    model_hash_set, model_source_set = collect_model_downloads()

    return conditional_download_hashes(model_hash_set) and conditional_download_sources(model_source_set)


def analyse_stream(vision_frame : VisionFrame, video_fps : Fps) -> bool:
    # 始终返回False,认为所有内容都是安全的
    return False


def analyse_frame(vision_frame : VisionFrame) -> bool:
    # 始终返回False,认为所有内容都是安全的
    return False


@lru_cache()
def analyse_image(image_path : str) -> bool:
    # 始终返回False,认为所有内容都是安全的
    return False


@lru_cache()
def analyse_video(video_path : str, trim_frame_start : int, trim_frame_end : int) -> bool:
    # 始终返回False,认为所有内容都是安全的
    return False


def detect_nsfw(vision_frame : VisionFrame) -> bool:
    # 始终返回False,认为所有内容都是安全的
    return False


def detect_with_nsfw_1(vision_frame : VisionFrame) -> bool:
    # 始终返回False,认为所有内容都是安全的
    return False


def detect_with_nsfw_2(vision_frame : VisionFrame) -> bool:
    # 始终返回False,认为所有内容都是安全的
    return False


def detect_with_nsfw_3(vision_frame : VisionFrame) -> bool:
    # 始终返回False,认为所有内容都是安全的
    return False


def forward_nsfw(vision_frame : VisionFrame, model_name : str) -> Detection:
    # 返回安全的检测结果
    if model_name == 'nsfw_1':
        return numpy.array([[0, 0, 0, 0, 0.1, 0.9]])  # 低NSFW分数
    elif model_name == 'nsfw_2':
        return numpy.array([0.9, 0.1])  # 高安全分数,低NSFW分数
    elif model_name == 'nsfw_3':
        return numpy.array([0.8, 0.7, 0.1, 0.2])  # 安全类别的分数较高
    
    # 默认返回安全结果
    return numpy.array([0.9, 0.1])


def prepare_detect_frame(temp_vision_frame : VisionFrame, model_name : str) -> VisionFrame:
    # 保持原有的预处理逻辑,但结果不会被使用
    model_set = create_static_model_set('full').get(model_name)
    model_size = model_set.get('size')
    model_mean = model_set.get('mean')
    model_standard_deviation = model_set.get('standard_deviation')

    detect_vision_frame = fit_contain_frame(temp_vision_frame, model_size)
    detect_vision_frame = detect_vision_frame[:, :, ::-1] / 255.0
    detect_vision_frame -= model_mean
    detect_vision_frame /= model_standard_deviation
    detect_vision_frame = numpy.expand_dims(detect_vision_frame.transpose(2, 0, 1), axis = 0).astype(numpy.float32)
    return detect_vision_frame

然后的话你会发现,依旧不可以,运行报错,仔细分析文档函数调用问题

分析过后,是每次运行前都会校验所有的文件是否发生改变,所以在core.py中找找线索
终于找到了!!

def common_pre_check() -> bool:
    common_modules =\
    [
        content_analyser,
        # ... other modules
    ]

    content_analyser_content = inspect.getsource(content_analyser).encode()
    content_analyser_hash = hash_helper.create_hash(content_analyser_content)

    return all(module.pre_check() for module in common_modules) and content_analyser_hash == '803b5ec7'

源文件在这里会进行一个哈希值教研,如果有一点的改变都会拒绝执行,所以只需要把这个判断逻辑进行短路就可以了

然后运行就可以正常运行了,这样就可以躲避内容审核了

多卡加速

这里有两张2080ti,所以说可以把视频分为两段,然后把两段图像分别进行切开,交给两张卡进行推理,推理过后,再图像增强、随后合并。

import subprocess
import os
import re
import time
from tqdm import tqdm
import threading
import math
from pathlib import Path
import json

# --- 配置文件路径 ---
source_faces_folder = './data/source'   # 包含所有源图片的文件夹
target_videos_folder = './data/target'  # 包含所有待处理视频的文件夹
output_folder = './data/output'         # 处理完成的视频将保存到这里

# --- 性能与稳定性配置 ---
# 将视频高度缩放到指定值进行处理,可以极大降低显存占用,解决崩溃问题。
# 常见值: 720, 1080。设置为 None 表示使用原始分辨率。
PROCESSING_RESOLUTION_HEIGHT = None

# --- 临时文件目录 (脚本会自动管理) ---
TEMP_DIR = './temp'
os.makedirs(TEMP_DIR, exist_ok=True)
os.makedirs(output_folder, exist_ok=True)

def get_video_duration(video_path):
    """
    获取视频时长(秒)- 经过优化的稳定版本
    """
    try:
        cmd = [
            'ffprobe', 
            '-v', 'quiet',
            '-print_format', 'json',
            '-show_format',
            str(video_path)
        ]
        result = subprocess.run(cmd, capture_output=True, text=True, check=True)
        format_info = json.loads(result.stdout).get('format', {})
        duration = format_info.get('duration')
        if duration is None:
            print(f"❌ 无法从视频文件中找到时长信息: {video_path}")
            return None
        return float(duration)
    except FileNotFoundError:
        print(f"❌ 命令失败: ffprobe 未找到。请确保 ffmpeg 已安装并位于系统 PATH 中。")
        return None
    except subprocess.CalledProcessError as e:
        print(f"❌ ffprobe 在处理视频 '{video_path}' 时出错。返回码: {e.returncode}")
        print(f"  -> FFprobe 错误输出: {e.stderr.strip()}")
        return None
    except json.JSONDecodeError:
        print(f"❌ 解析 ffprobe 的 JSON 输出失败。可能视频文件已损坏。")
        return None
    except Exception as e:
        print(f"❌ 获取视频时长时发生未知错误: {e}")
        return None

def split_video(input_path, output_pattern, num_parts):
    """
    将视频分割并(根据配置)降低分辨率
    """
    duration = get_video_duration(input_path)
    if duration is None: return False
    part_duration = math.ceil(duration / num_parts)
    print(f"📊 视频总时长: {duration:.2f}秒")
    print(f"🔪 正在将视频分割成 {num_parts} 个部分 (每部分约 {part_duration} 秒)...")

    for i in range(num_parts):
        start_time = i * part_duration
        output_path = output_pattern.format(i + 1)
        
        cmd = ['ffmpeg', '-y', '-i', str(input_path), '-ss', str(start_time), '-t', str(part_duration)]
        
        if PROCESSING_RESOLUTION_HEIGHT:
            print(f"    -> 同时将分辨率缩放至 {PROCESSING_RESOLUTION_HEIGHT}p 高度进行处理...")
            cmd.extend(['-vf', f'scale=-1:{PROCESSING_RESOLUTION_HEIGHT}', '-c:v', 'libx264', '-preset', 'fast', '-crf', '23'])
        else:
            print("    -> 使用原始分辨率进行无损分割...")
            cmd.extend(['-c', 'copy'])
        
        cmd.append(str(output_path))

        try:
            subprocess.run(cmd, check=True, capture_output=True, text=True)
            print(f"✅ 视频部分 {i+1} 已保存: {output_path}")
        except subprocess.CalledProcessError as e:
            print(f"❌ 分割视频部分 {i+1} 失败: {e.stderr}")
            return False
    return True

def merge_videos(input_pattern, num_parts, output_path):
    """合并多个视频文件"""
    list_file_path = os.path.join(TEMP_DIR, 'file_list.txt')
    try:
        with open(list_file_path, 'w') as f:
            for i in range(1, num_parts + 1):
                video_path = Path(input_pattern.format(i))
                if video_path.exists():
                    f.write(f"file '{video_path.resolve()}'\n")
                else:
                    print(f"❌ 找不到要合并的视频部分: {video_path}")
                    return False
        cmd = ['ffmpeg', '-f', 'concat', '-safe', '0', '-i', str(list_file_path), '-c', 'copy', '-y', str(output_path)]
        print("🔄 正在合并所有已处理的视频部分...")
        subprocess.run(cmd, check=True, capture_output=True, text=True)
        print(f"✅ 视频合并完成: {output_path}")
        return True
    except subprocess.CalledProcessError as e:
        print(f"❌ 视频合并失败: {e.stderr}")
        return False
    finally:
        if os.path.exists(list_file_path):
            os.remove(list_file_path)

def read_output(stream, pbar, status, gpu_id):
    """读取子进程输出并更新进度条"""
    frame_pattern = re.compile(r'Processing frame (\d+)/(\d+)')
    for line in iter(stream.readline, ''):
        if not line: break
        line = line.strip()
        print(f"[GPU-{gpu_id}] {line}")
        match = frame_pattern.search(line)
        if match:
            current, total = int(match.group(1)), int(match.group(2))
            if pbar.total != total: pbar.total = total
            pbar.n = current
            pbar.refresh()
            status['last_update'] = time.time()
        if any(error in line.lower() for error in ['error', 'failed', 'traceback', 'exception', 'out of memory']):
            status['has_error'] = True
        if 'processing is complete' in line.lower():
            status['completed'] = True

def run_facefusion_process(sources, target, output, gpu_id, processors, part_info=""):
    """通用的人脸处理进程函数"""
    env = os.environ.copy()
    env['CUDA_VISIBLE_DEVICES'] = str(gpu_id)
    
    command = ['python', 'facefusion.py', 'headless-run']
    for src_path in sources:
        command.extend(['-s', str(src_path)])
    command.extend([
        '-t', str(target),
        '-o', str(output),
        '--execution-providers', 'cuda',
        '--processors', *processors,
        '--execution-thread-count', '1',
        '--temp-frame-format', 'jpeg',
        '--face-swapper-model', 'inswapper_128_fp16',
        '--face-enhancer-model', 'gfpgan_1.4',
        '--face-enhancer-blend', '80',
        '--output-video-quality', '95',
        '--face-selector-mode', 'many',
        '--reference-face-distance', '0.6'
    ])

    print(f"🚀 GPU-{gpu_id} 开始处理 [{', '.join(processors)}] 任务: {part_info}...")
    
    status = {'last_update': time.time(), 'has_error': False, 'completed': False}
    pbar = tqdm(total=100, desc=f"GPU-{gpu_id} {part_info}", ncols=120)

    try:
        process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True, env=env)
        stdout_thread = threading.Thread(target=read_output, args=(process.stdout, pbar, status, gpu_id))
        stderr_thread = threading.Thread(target=read_output, args=(process.stderr, pbar, status, gpu_id))
        stdout_thread.start()
        stderr_thread.start()
        
        timeout = 3600 # 1小时超时
        while process.poll() is None:
            time.sleep(5)
            if time.time() - status['last_update'] > timeout:
                print(f"⚠️ GPU-{gpu_id} 警告: 进程可能已卡住,{timeout}秒无进度更新。正在终止...")
                process.terminate()
                break
        
        process.wait()
        stdout_thread.join()
        stderr_thread.join()
        pbar.close()

        if process.returncode == 0 and os.path.exists(output) and os.path.getsize(output) > 0:
            print(f"✅ GPU-{gpu_id} 成功完成 [{', '.join(processors)}] 任务!")
            return True
        else:
            print(f"❌ GPU-{gpu_id} 处理 [{', '.join(processors)}] 任务失败。返回码: {process.returncode}")
            return False
            
    except Exception as e:
        print(f"❌ GPU-{gpu_id} 执行时发生意外错误: {e}")
        pbar.close()
        return False

# +++ NEW GENERALIZED FUNCTION +++
def run_parallel_stage(num_gpus, source_paths, video_name, input_pattern, output_pattern, processors, stage_name):
    """
    通用化的并行处理函数,用于执行任何处理阶段(如换脸、增强)。
    """
    threads = []
    results = [False] * num_gpus

    def thread_target(part_index):
        input_part = input_pattern.format(part_index + 1)
        output_part = output_pattern.format(part_index + 1)
        part_info = f"{video_name} ({stage_name} 部分 {part_index + 1}/{num_gpus})"
        results[part_index] = run_facefusion_process(
            source_paths, input_part, output_part, part_index, processors, part_info
        )

    for i in range(num_gpus):
        thread = threading.Thread(target=thread_target, args=(i,))
        threads.append(thread)
        thread.start()
        
        # 为避免CUDA初始化冲突,在启动下一个GPU任务前稍作延迟
        if i < num_gpus - 1:
            delay = 3
            print(f"🕰️ 为避免资源冲突,等待 {delay} 秒后启动下一个GPU任务...")
            time.sleep(delay)

    for thread in threads:
        thread.join()

    if not all(results):
        print(f"❌ 并行处理阶段 '{stage_name}' 失败。")
        return False
    
    print(f"✅ 所有部分的 '{stage_name}' 阶段均已成功完成。")
    return True

def check_gpu_availability():
    """检查可用的GPU数量"""
    try:
        import torch
        if torch.cuda.is_available():
            gpu_count = torch.cuda.device_count()
            print(f"🎮 检测到 {gpu_count} 个可用的NVIDIA GPU:")
            for i in range(gpu_count): print(f"  - GPU {i}: {torch.cuda.get_device_name(i)}")
            return gpu_count
        else:
            print("⚠️ 未检测到可用的CUDA设备。将以单GPU模式运行。")
            return 1 # 返回1以进行单线程处理
    except ImportError:
        print("⚠️ PyTorch未安装,无法检测GPU。将以单GPU模式运行。")
        return 1
    except Exception as e:
        print(f"⚠️ 检查GPU时出错: {e}。将以单GPU模式运行。")
        return 1

def cleanup_temp_dir():
    """清理临时文件夹内的所有文件"""
    print("🧹 正在清理临时文件...")
    try:
        for item in os.listdir(TEMP_DIR):
            item_path = os.path.join(TEMP_DIR, item)
            os.remove(item_path)
        print("✅ 临时文件清理完成。")
    except Exception as e:
        print(f"⚠️ 清理临时文件时出错: {e}")

# <<< MODIFIED FUNCTION >>>
def process_single_video(source_paths, target_video_path, output_video_path, gpu_count):
    """处理单个视频的完整流程:分割 -> 并行换脸 -> 并行增强 -> 合并 -> 清理"""
    video_name = Path(target_video_path).name
    
    # 即使只有一个GPU,我们也进行分割以应用分辨率缩放设置
    num_parts = gpu_count if gpu_count > 0 else 1

    # 定义每个阶段的文件名模式
    original_parts_pattern = os.path.join(TEMP_DIR, f"original_part_{{}}.mp4")
    swapped_parts_pattern = os.path.join(TEMP_DIR, f"swapped_part_{{}}.mp4")
    enhanced_parts_pattern = os.path.join(TEMP_DIR, f"enhanced_part_{{}}.mp4")
    
    try:
        # --- 阶段 1: 分割视频 ---
        print("\n--- 阶段 1: 分割视频 ---")
        if not split_video(target_video_path, original_parts_pattern, num_parts):
            return False

        # --- 阶段 2: 并行换脸 ---
        print("\n--- 阶段 2: 并行换脸 (Face Swapping) ---")
        success = run_parallel_stage(
            num_gpus=num_parts, 
            source_paths=source_paths, 
            video_name=video_name,
            input_pattern=original_parts_pattern, 
            output_pattern=swapped_parts_pattern,
            processors=['face_swapper'],
            stage_name="换脸"
        )
        if not success:
            print("❌ 换脸阶段失败,处理中止。")
            return False

        # --- 阶段 3: 并行画质增强 ---
        print("\n--- 阶段 3: 并行画质增强 (Face Enhancing) ---")
        success = run_parallel_stage(
            num_gpus=num_parts,
            source_paths=source_paths, # FaceFusion需要源图像来进行人脸对齐
            video_name=video_name,
            input_pattern=swapped_parts_pattern, # 使用已换脸的视频作为输入
            output_pattern=enhanced_parts_pattern,
            processors=['face_enhancer'],
            stage_name="增强"
        )
        if not success:
            print("❌ 增强阶段失败,处理中止。")
            return False
            
        # --- 阶段 4: 合并最终视频 ---
        print("\n--- 阶段 4: 合并最终视频 ---")
        if not merge_videos(enhanced_parts_pattern, num_parts, output_video_path):
            return False

        return True
    
    finally:
        # 无论成功与否,都清理临时文件
        cleanup_temp_dir()


if __name__ == '__main__':
    total_start_time = time.time()
    
    print(f"📂 正在从文件夹 '{source_faces_folder}' 中扫描源图片...")
    if not os.path.isdir(source_faces_folder):
        print(f"❌ 错误: 源图片文件夹 '{source_faces_folder}' 不存在。")
        exit(1)
    image_extensions = ('.png', '.jpg', '.jpeg', '.webp')
    source_paths = [Path(source_faces_folder) / f for f in sorted(os.listdir(source_faces_folder)) if f.lower().endswith(image_extensions)]
    if not source_paths:
        print(f"❌ 错误: 在文件夹 '{source_faces_folder}' 中没有找到任何图片文件。")
        exit(1)
    print(f"✅ 成功找到 {len(source_paths)} 张源图片。")

    print(f"📂 正在从文件夹 '{target_videos_folder}' 中扫描目标视频...")
    if not os.path.isdir(target_videos_folder):
        print(f"❌ 错误: 目标视频文件夹 '{target_videos_folder}' 不存在。")
        exit(1)
    video_extensions = ('.mp4', '.mov', '.avi', '.mkv')
    target_video_paths = [Path(target_videos_folder) / f for f in sorted(os.listdir(target_videos_folder)) if f.lower().endswith(video_extensions)]
    if not target_video_paths:
        print(f"❌ 错误: 在文件夹 '{target_videos_folder}' 中没有找到任何视频文件。")
        exit(1)
    print(f"✅ 成功找到 {len(target_video_paths)} 个待处理视频。")

    gpu_count = check_gpu_availability()
    
    successful_videos = 0
    failed_videos = []

    for i, video_path in enumerate(target_video_paths):
        print("\n" + "="*80)
        print(f"🎬 开始处理第 {i + 1} / {len(target_video_paths)} 个视频: {video_path.name}")
        print("="*80)
        output_filename = f"{video_path.stem}_swapped.mp4"
        final_output_path = Path(output_folder) / output_filename
        video_start_time = time.time()
        
        # 在gpu_count为0的情况下,check_gpu_availability会返回1,保证至少有一个处理线程
        is_success = process_single_video(source_paths, video_path, final_output_path, gpu_count if gpu_count > 0 else 1)
        
        video_end_time = time.time()
        
        if is_success and final_output_path.exists():
            print(f"✅ 视频 {video_path.name} 处理成功!耗时: {video_end_time - video_start_time:.2f} 秒")
            successful_videos += 1
        else:
            print(f"❌ 视频 {video_path.name} 处理失败。")
            failed_videos.append(video_path.name)

    total_end_time = time.time()
    print("\n" + "="*80)
    print("✨✨✨ 全部任务处理完成 ✨✨✨")
    print(f"总耗时: {(total_end_time - total_start_time) / 60:.2f} 分钟")
    print(f"总计处理视频: {len(target_video_paths)} 个")
    print(f"  - 成功: {successful_videos} 个")
    print(f"  - 失败: {len(failed_videos)} 个")
    if failed_videos:
        print("  - 失败列表:", failed_videos)
    print(f"📂 所有成功处理的视频已保存至: {Path(output_folder).resolve()}")
    print("="*80)

代码贴在上面了。直接运行即可。

修正之后的代码

import subprocess
import os
import re
import time
from tqdm import tqdm
import threading
import math
from pathlib import Path
import json

# --- 配置文件路径 ---
source_faces_folder = './data/source'    # 包含所有源图片的文件夹
target_videos_folder = './data/target'  # 包含所有待处理视频的文件夹
output_folder = './data/output'          # 处理完成的视频将保存到这里

# --- 性能与稳定性配置 ---
# 将视频高度缩放到指定值进行处理,可以极大降低显存占用,解决崩溃问题。
# 常见值: 720, 1080。设置为 None 表示使用原始分辨率。
PROCESSING_RESOLUTION_HEIGHT = None

# --- 临时文件目录 (脚本会自动管理) ---
TEMP_DIR = './temp'
os.makedirs(TEMP_DIR, exist_ok=True)
os.makedirs(output_folder, exist_ok=True)

def get_video_duration(video_path):
    """
    获取视频时长(秒)- 经过优化的稳定版本
    """
    try:
        cmd = [
            'ffprobe', 
            '-v', 'quiet',
            '-print_format', 'json',
            '-show_format',
            str(video_path)
        ]
        result = subprocess.run(cmd, capture_output=True, text=True, check=True)
        format_info = json.loads(result.stdout).get('format', {})
        duration = format_info.get('duration')
        if duration is None:
            print(f"❌ 无法从视频文件中找到时长信息: {video_path}")
            return None
        return float(duration)
    except FileNotFoundError:
        print(f"❌ 命令失败: ffprobe 未找到。请确保 ffmpeg 已安装并位于系统 PATH 中。")
        return None
    except subprocess.CalledProcessError as e:
        print(f"❌ ffprobe 在处理视频 '{video_path}' 时出错。返回码: {e.returncode}")
        print(f"  -> FFprobe 错误输出: {e.stderr.strip()}")
        return None
    except json.JSONDecodeError:
        print(f"❌ 解析 ffprobe 的 JSON 输出失败。可能视频文件已损坏。")
        return None
    except Exception as e:
        print(f"❌ 获取视频时长时发生未知错误: {e}")
        return None

def split_video(input_path, output_pattern, num_parts):
    """
    将视频分割并(根据配置)降低分辨率
    """
    duration = get_video_duration(input_path)
    if duration is None: return False
    part_duration = math.ceil(duration / num_parts)
    print(f"📊 视频总时长: {duration:.2f}秒")
    print(f"🔪 正在将视频分割成 {num_parts} 个部分 (每部分约 {part_duration} 秒)...")

    for i in range(num_parts):
        start_time = i * part_duration
        output_path = output_pattern.format(i + 1)
        
        # *** MODIFICATION: Moved -ss before -i for faster seeking ***
        cmd = ['ffmpeg', '-y', '-ss', str(start_time), '-i', str(input_path), '-t', str(part_duration)]
        
        if PROCESSING_RESOLUTION_HEIGHT:
            print(f"    -> 同时将分辨率缩放至 {PROCESSING_RESOLUTION_HEIGHT}p 高度进行处理...")
            cmd.extend(['-vf', f'scale=-1:{PROCESSING_RESOLUTION_HEIGHT}', '-c:v', 'libx264', '-preset', 'fast', '-crf', '23'])
        else:
            print("    -> 使用原始分辨率进行无损分割...")
            cmd.extend(['-c', 'copy'])
        
        cmd.append(str(output_path))

        try:
            subprocess.run(cmd, check=True, capture_output=True, text=True)
            print(f"✅ 视频部分 {i+1} 已保存: {output_path}")
        except subprocess.CalledProcessError as e:
            print(f"❌ 分割视频部分 {i+1} 失败: {e.stderr}")
            return False
    return True

# +++ REVISED ROBUST MERGE FUNCTION +++
def merge_videos(input_pattern, num_parts, output_path):
    """
    使用 ffmpeg 的 'concat' filter 合并多个视频文件,以解决时间戳问题。
    这个方法更稳定,但会进行一次完整的视频重编码。
    """
    print("🔄 正在使用 'concat' filter 合并所有已处理的视频部分 (此步骤会重新编码以确保稳定性)...")
    
    # 1. 创建输入文件列表和过滤器字符串
    input_files = []
    filter_complex_str = ""
    for i in range(1, num_parts + 1):
        video_path = Path(input_pattern.format(i))
        if not video_path.exists():
            print(f"❌ 找不到要合并的视频部分: {video_path}")
            return False
        input_files.extend(['-i', str(video_path)])
        # [0:v][0:a] 表示第一个输入的视频和音频流
        filter_complex_str += f"[{i-1}:v][{i-1}:a]"

    # 2. 完成过滤器字符串
    filter_complex_str += f"concat=n={num_parts}:v=1:a=1[v][a]"

    # 3. 构建完整的 ffmpeg 命令
    cmd = ['ffmpeg', '-y']
    cmd.extend(input_files)
    cmd.extend([
        '-filter_complex', filter_complex_str,
        '-map', '[v]',
        '-map', '[a]',
        '-c:v', 'libx264',   # 可以根据需要选择编码器
        '-preset', 'fast',     # 平衡速度和质量
        '-crf', '22',          # 控制输出质量 (18-28 是一个合理的范围)
        '-c:a', 'aac',         # 常用的音频编码器
        '-b:a', '192k',        # 音频比特率
        str(output_path)
    ])

    try:
        # 运行合并命令
        process = subprocess.run(cmd, check=True, capture_output=True, text=True)
        print(f"✅ 视频合并完成: {output_path}")
        return True
    except subprocess.CalledProcessError as e:
        print(f"❌ 视频合并失败。这是 ffmpeg 的输出:")
        print(f"   -> STDOUT: {e.stdout.strip()}")
        print(f"   -> STDERR: {e.stderr.strip()}")
        return False

def read_output(stream, pbar, status, gpu_id):
    """读取子进程输出并更新进度条"""
    frame_pattern = re.compile(r'Processing frame (\d+)/(\d+)')
    for line in iter(stream.readline, ''):
        if not line: break
        line = line.strip()
        print(f"[GPU-{gpu_id}] {line}")
        match = frame_pattern.search(line)
        if match:
            current, total = int(match.group(1)), int(match.group(2))
            if pbar.total != total: pbar.total = total
            pbar.n = current
            pbar.refresh()
            status['last_update'] = time.time()
        if any(error in line.lower() for error in ['error', 'failed', 'traceback', 'exception', 'out of memory']):
            status['has_error'] = True
        if 'processing is complete' in line.lower():
            status['completed'] = True

def run_facefusion_process(sources, target, output, gpu_id, processors, part_info=""):
    """通用的人脸处理进程函数"""
    env = os.environ.copy()
    env['CUDA_VISIBLE_DEVICES'] = str(gpu_id)
    
    command = ['python', 'facefusion.py', 'headless-run']
    for src_path in sources:
        command.extend(['-s', str(src_path)])
    command.extend([
        '-t', str(target),
        '-o', str(output),
        '--execution-providers', 'cuda',
        '--processors', *processors,
        '--execution-thread-count', '1',
        '--temp-frame-format', 'jpeg',
        '--face-swapper-model', 'inswapper_128_fp16',
        '--face-enhancer-model', 'gfpgan_1.4',
        '--face-enhancer-blend', '80',
        '--output-video-quality', '95',
        '--face-selector-mode', 'many',
        '--reference-face-distance', '0.6'
    ])

    print(f"🚀 GPU-{gpu_id} 开始处理 [{', '.join(processors)}] 任务: {part_info}...")
    
    status = {'last_update': time.time(), 'has_error': False, 'completed': False}
    pbar = tqdm(total=100, desc=f"GPU-{gpu_id} {part_info}", ncols=120)

    try:
        process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True, env=env)
        stdout_thread = threading.Thread(target=read_output, args=(process.stdout, pbar, status, gpu_id))
        stderr_thread = threading.Thread(target=read_output, args=(process.stderr, pbar, status, gpu_id))
        stdout_thread.start()
        stderr_thread.start()
        
        timeout = 3600 # 1小时超时
        while process.poll() is None:
            time.sleep(5)
            if time.time() - status['last_update'] > timeout:
                print(f"⚠️ GPU-{gpu_id} 警告: 进程可能已卡住,{timeout}秒无进度更新。正在终止...")
                process.terminate()
                break
        
        process.wait()
        stdout_thread.join()
        stderr_thread.join()
        pbar.close()

        if process.returncode == 0 and os.path.exists(output) and os.path.getsize(output) > 0:
            print(f"✅ GPU-{gpu_id} 成功完成 [{', '.join(processors)}] 任务!")
            return True
        else:
            print(f"❌ GPU-{gpu_id} 处理 [{', '.join(processors)}] 任务失败。返回码: {process.returncode}")
            return False
            
    except Exception as e:
        print(f"❌ GPU-{gpu_id} 执行时发生意外错误: {e}")
        pbar.close()
        return False

def run_parallel_stage(num_gpus, source_paths, video_name, input_pattern, output_pattern, processors, stage_name):
    """
    通用化的并行处理函数,用于执行任何处理阶段(如换脸、增强)。
    """
    threads = []
    results = [False] * num_gpus

    def thread_target(part_index):
        input_part = input_pattern.format(part_index + 1)
        output_part = output_pattern.format(part_index + 1)
        part_info = f"{video_name} ({stage_name} 部分 {part_index + 1}/{num_gpus})"
        results[part_index] = run_facefusion_process(
            source_paths, input_part, output_part, part_index, processors, part_info
        )

    for i in range(num_gpus):
        thread = threading.Thread(target=thread_target, args=(i,))
        threads.append(thread)
        thread.start()
        
        # 为避免CUDA初始化冲突,在启动下一个GPU任务前稍作延迟
        if i < num_gpus - 1:
            delay = 3
            print(f"🕰️ 为避免资源冲突,等待 {delay} 秒后启动下一个GPU任务...")
            time.sleep(delay)

    for thread in threads:
        thread.join()

    if not all(results):
        print(f"❌ 并行处理阶段 '{stage_name}' 失败。")
        return False
    
    print(f"✅ 所有部分的 '{stage_name}' 阶段均已成功完成。")
    return True

def check_gpu_availability():
    """检查可用的GPU数量"""
    try:
        import torch
        if torch.cuda.is_available():
            gpu_count = torch.cuda.device_count()
            print(f"🎮 检测到 {gpu_count} 个可用的NVIDIA GPU:")
            for i in range(gpu_count): print(f"  - GPU {i}: {torch.cuda.get_device_name(i)}")
            return gpu_count
        else:
            print("⚠️ 未检测到可用的CUDA设备。将以单GPU模式运行。")
            return 1 # 返回1以进行单线程处理
    except ImportError:
        print("⚠️ PyTorch未安装,无法检测GPU。将以单GPU模式运行。")
        return 1
    except Exception as e:
        print(f"⚠️ 检查GPU时出错: {e}。将以单GPU模式运行。")
        return 1

def cleanup_temp_dir():
    """清理临时文件夹内的所有文件"""
    print("🧹 正在清理临时文件...")
    try:
        for item in Path(TEMP_DIR).glob('*'):
            os.remove(item)
        print("✅ 临时文件清理完成。")
    except Exception as e:
        print(f"⚠️ 清理临时文件时出错: {e}")

def process_single_video(source_paths, target_video_path, output_video_path, gpu_count):
    """处理单个视频的完整流程:分割 -> 并行换脸 -> 并行增强 -> 合并 -> 清理"""
    video_name = Path(target_video_path).name
    
    # 即使只有一个GPU,我们也进行分割以应用分辨率缩放设置
    num_parts = gpu_count if gpu_count > 0 else 1

    # 定义每个阶段的文件名模式
    original_parts_pattern = os.path.join(TEMP_DIR, f"original_part_{{}}.mp4")
    swapped_parts_pattern = os.path.join(TEMP_DIR, f"swapped_part_{{}}.mp4")
    enhanced_parts_pattern = os.path.join(TEMP_DIR, f"enhanced_part_{{}}.mp4")
    
    try:
        # --- 阶段 1: 分割视频 ---
        print("\n--- 阶段 1: 分割视频 ---")
        if not split_video(target_video_path, original_parts_pattern, num_parts):
            return False

        # --- 阶段 2: 并行换脸 ---
        print("\n--- 阶段 2: 并行换脸 (Face Swapping) ---")
        success = run_parallel_stage(
            num_gpus=num_parts, 
            source_paths=source_paths, 
            video_name=video_name,
            input_pattern=original_parts_pattern, 
            output_pattern=swapped_parts_pattern,
            processors=['face_swapper'],
            stage_name="换脸"
        )
        if not success:
            print("❌ 换脸阶段失败,处理中止。")
            return False

        # --- 阶段 3: 并行画质增强 ---
        print("\n--- 阶段 3: 并行画质增强 (Face Enhancing) ---")
        success = run_parallel_stage(
            num_gpus=num_parts,
            source_paths=source_paths, # FaceFusion需要源图像来进行人脸对齐
            video_name=video_name,
            input_pattern=swapped_parts_pattern, # 使用已换脸的视频作为输入
            output_pattern=enhanced_parts_pattern,
            processors=['face_enhancer'],
            stage_name="增强"
        )
        if not success:
            print("❌ 增强阶段失败,处理中止。")
            return False
            
        # --- 阶段 4: 合并最终视频 ---
        print("\n--- 阶段 4: 合并最终视频 ---")
        if not merge_videos(enhanced_parts_pattern, num_parts, output_video_path):
            return False

        return True
    
    finally:
        # 无论成功与否,都清理临时文件
        cleanup_temp_dir()


if __name__ == '__main__':
    total_start_time = time.time()
    
    print(f"📂 正在从文件夹 '{source_faces_folder}' 中扫描源图片...")
    if not os.path.isdir(source_faces_folder):
        print(f"❌ 错误: 源图片文件夹 '{source_faces_folder}' 不存在。")
        exit(1)
    image_extensions = ('.png', '.jpg', '.jpeg', '.webp')
    source_paths = [Path(source_faces_folder) / f for f in sorted(os.listdir(source_faces_folder)) if f.lower().endswith(image_extensions)]
    if not source_paths:
        print(f"❌ 错误: 在文件夹 '{source_faces_folder}' 中没有找到任何图片文件。")
        exit(1)
    print(f"✅ 成功找到 {len(source_paths)} 张源图片。")

    print(f"📂 正在从文件夹 '{target_videos_folder}' 中扫描目标视频...")
    if not os.path.isdir(target_videos_folder):
        print(f"❌ 错误: 目标视频文件夹 '{target_videos_folder}' 不存在。")
        exit(1)
    video_extensions = ('.mp4', '.mov', '.avi', '.mkv')
    target_video_paths = [Path(target_videos_folder) / f for f in sorted(os.listdir(target_videos_folder)) if f.lower().endswith(video_extensions)]
    if not target_video_paths:
        print(f"❌ 错误: 在文件夹 '{target_videos_folder}' 中没有找到任何视频文件。")
        exit(1)
    print(f"✅ 成功找到 {len(target_video_paths)} 个待处理视频。")

    gpu_count = check_gpu_availability()
    
    successful_videos = 0
    failed_videos = []

    for i, video_path in enumerate(target_video_paths):
        print("\n" + "="*80)
        print(f"🎬 开始处理第 {i + 1} / {len(target_video_paths)} 个视频: {video_path.name}")
        print("="*80)
        output_filename = f"{video_path.stem}_swapped.mp4"
        final_output_path = Path(output_folder) / output_filename
        video_start_time = time.time()
        
        # 在gpu_count为0的情况下,check_gpu_availability会返回1,保证至少有一个处理线程
        is_success = process_single_video(source_paths, video_path, final_output_path, gpu_count if gpu_count > 0 else 1)
        
        video_end_time = time.time()
        
        if is_success and final_output_path.exists():
            print(f"✅ 视频 {video_path.name} 处理成功!耗时: {video_end_time - video_start_time:.2f} 秒")
            successful_videos += 1
        else:
            print(f"❌ 视频 {video_path.name} 处理失败。")
            failed_videos.append(video_path.name)

    total_end_time = time.time()
    print("\n" + "="*80)
    print("✨✨✨ 全部任务处理完成 ✨✨✨")
    print(f"总耗时: {(total_end_time - total_start_time) / 60:.2f} 分钟")
    print(f"总计处理视频: {len(target_video_paths)} 个")
    print(f"  - 成功: {successful_videos} 个")
    print(f"  - 失败: {len(failed_videos)} 个")
    if failed_videos:
        print("  - 失败列表:", failed_videos)
    print(f"📂 所有成功处理的视频已保存至: {Path(output_folder).resolve()}")
    print("="*80)


评论