
在数字隐写术的发展历程中,视频隐写作为一种高级数据隐藏技术,因其大容量、强隐蔽性和实时性特点而备受关注。与静态图像隐写相比,视频隐写利用了视频特有的时间维度和冗余特性,提供了更大的数据隐藏空间和更强的抗检测能力。本指南将深入剖析视频隐写的基本原理、核心技术和实现方法,并通过详细的Python代码示例,帮助读者全面掌握视频隐写的技术要点,从而在实际安全工作和多媒体数据保护中能够准确应用这一技术。
视频隐写技术是CTF竞赛中的高级题型,也是多媒体安全研究的重要方向。通过本指南的学习,读者将能够系统地掌握视频隐写的实现方法,理解其技术优势和局限性,并在实际应用中灵活运用各种视频隐写策略。
视频隐写技术对比:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 空间域隐写 │ │ 变换域隐写 │ │ 编码域隐写 │
├─────────────┤ ├─────────────┤ ├─────────────┤
│ 帧像素直接修改 │ │ DCT/DWT变换嵌入 │ │ 压缩编码参数调整 │
│ 简单易实现 │ │ 抗压缩性强 │ │ 大容量高隐蔽性 │
│ 易被检测 │ │ 计算复杂度高 │ │ 实现难度大 │
└─────────────┘ └─────────────┘ └─────────────┘视频文件是一种复杂的多媒体容器,包含了视频流、音频流以及元数据等多种信息。理解视频文件的基本结构对于实现视频隐写至关重要。
视频编码是将原始视频数据压缩为更小体积的过程,主要编码标准包括:
视频隐写的核心思想是利用视频数据中的冗余信息,将秘密数据嵌入到视频载体中,同时保持视频的感官质量不变。与图像隐写相比,视频隐写具有以下独特优势:
视频中可用于隐写的空间主要包括:
视频隐写的理论容量可以通过以下公式估算:
C = F × R × B × Q其中:
实际应用中,隐写容量还受到以下因素限制:
评估视频隐写质量的常用指标包括:
在实现视频隐写之前,我们需要掌握使用OpenCV处理视频的基本操作。下面是一个使用OpenCV读取、处理和保存视频的示例代码:
import cv2
import numpy as np
def process_video(input_path, output_path, processing_function):
"""处理视频的通用函数
Args:
input_path: 输入视频文件路径
output_path: 输出视频文件路径
processing_function: 帧处理函数,接收单帧,返回处理后的帧
"""
# 打开视频文件
cap = cv2.VideoCapture(input_path)
# 检查是否成功打开
if not cap.isOpened():
print("无法打开视频文件")
return False
# 获取视频属性
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"视频属性: {width}x{height}, {fps} FPS, {frame_count} 帧")
# 创建视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 使用MP4编码
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
# 处理每一帧
processed_frames = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# 处理当前帧
processed_frame = processing_function(frame)
# 写入处理后的帧
out.write(processed_frame)
processed_frames += 1
if processed_frames % 100 == 0:
print(f"已处理 {processed_frames}/{frame_count} 帧")
# 释放资源
cap.release()
out.release()
print(f"视频处理完成,共处理 {processed_frames} 帧")
return True
# 示例:简单的灰度处理函数
def grayscale_transform(frame):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
return cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR) # 转回BGR以便保存
# 使用示例
# process_video('input.mp4', 'output_gray.mp4', grayscale_transform)
### 2.2 基于LSB的视频帧隐写实现
LSB(最低有效位)隐写是一种简单有效的隐写方法,同样适用于视频帧隐写。下面是基于LSB的视频隐写实现代码:
```python
import cv2
import numpy as np
import os
def text_to_bits(text):
"""将文本转换为比特序列"""
bits = []
for char in text:
# 将字符转换为8位二进制
byte = bin(ord(char))[2:].zfill(8)
bits.extend([int(bit) for bit in byte])
return bits
def bits_to_text(bits):
"""将比特序列转换回文本"""
text = ''
# 按每8位一组转换为字符
for i in range(0, len(bits), 8):
byte = bits[i:i+8]
if len(byte) < 8:
break
# 将二进制转换为整数,再转换为字符
char_code = int(''.join([str(bit) for bit in byte]), 2)
text += chr(char_code)
return text
def embed_lsb_frame(frame, bits, start_index=0):
"""在单帧中嵌入LSB隐写数据
Args:
frame: 输入视频帧
bits: 要嵌入的比特序列
start_index: 开始嵌入的像素索引
Returns:
tuple: (嵌入数据后的帧, 已嵌入的比特数, 结束索引)
"""
# 将帧转换为扁平数组以便处理
flat_frame = frame.flatten()
frame_size = len(flat_frame)
bits_len = len(bits)
embedded_bits = 0
# 计算可以嵌入的比特数
max_embed_bits = min(bits_len, (frame_size - start_index) // 3) # 每3个通道嵌入1位
for i in range(max_embed_bits):
# 计算当前处理的像素位置
pixel_pos = start_index + i
if pixel_pos >= frame_size:
break
# 提取当前像素值
pixel_value = flat_frame[pixel_pos]
# 清除最低有效位并嵌入秘密比特
# 使用按位与0xFE清除最低位,然后与秘密比特进行按位或操作
flat_frame[pixel_pos] = (pixel_value & 0xFE) | bits[embedded_bits]
embedded_bits += 1
# 每3个通道后跳过一个像素,以分散嵌入位置
if (i + 1) % 3 == 0:
start_index += 1
# 将修改后的扁平数组重构为帧
stego_frame = flat_frame.reshape(frame.shape)
return stego_frame, embedded_bits, start_index + embedded_bits
def extract_lsb_frame(frame, num_bits, start_index=0):
"""从单帧中提取LSB隐写数据
Args:
frame: 可能包含隐写数据的视频帧
num_bits: 要提取的比特数
start_index: 开始提取的像素索引
Returns:
tuple: (提取的比特序列, 已提取的比特数, 结束索引)
"""
# 将帧转换为扁平数组以便处理
flat_frame = frame.flatten()
frame_size = len(flat_frame)
extracted_bits = []
extracted_count = 0
# 计算可以提取的最大比特数
max_extract_bits = min(num_bits, (frame_size - start_index) // 3)
for i in range(max_extract_bits):
# 计算当前处理的像素位置
pixel_pos = start_index + i
if pixel_pos >= frame_size:
break
# 提取像素值的最低有效位
bit = flat_frame[pixel_pos] & 1
extracted_bits.append(bit)
extracted_count += 1
# 每3个通道后跳过一个像素,与嵌入时保持一致
if (i + 1) % 3 == 0:
start_index += 1
return extracted_bits, extracted_count, start_index + extracted_count
def embed_video_lsb(input_video_path, output_video_path, secret_text):
"""在视频中嵌入LSB隐写数据
Args:
input_video_path: 输入视频文件路径
output_video_path: 输出视频文件路径
secret_text: 要隐藏的文本
Returns:
dict: 嵌入结果信息
"""
# 检查输入文件是否存在
if not os.path.exists(input_video_path):
print(f"错误:输入文件 '{input_video_path}' 不存在")
return None
# 转换秘密文本为比特序列
secret_bits = text_to_bits(secret_text)
# 添加结束标记(8个1)
secret_bits.extend([1] * 8)
secret_bits_len = len(secret_bits)
print(f"要嵌入的比特数: {secret_bits_len}")
# 打开视频文件
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
print("错误:无法打开输入视频")
return None
# 获取视频属性
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"视频属性: {width}x{height}, {fps} FPS, {frame_count} 帧")
# 计算理论最大嵌入容量(每帧每3个通道嵌入1位)
max_capacity = (width * height * 3) // 3 * frame_count
print(f"理论最大嵌入容量: {max_capacity} 比特 ({max_capacity // 8} 字节)")
if secret_bits_len > max_capacity:
print("警告:秘密数据过大,无法完全嵌入")
# 截断数据
secret_bits = secret_bits[:max_capacity]
secret_bits_len = len(secret_bits)
print(f"已截断为: {secret_bits_len} 比特")
# 创建视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
# 嵌入过程
current_bit_index = 0
start_pixel_index = 0
frame_num = 0
while cap.isOpened() and current_bit_index < secret_bits_len:
ret, frame = cap.read()
if not ret:
break
frame_num += 1
print(f"处理帧 {frame_num}/{frame_count}, 当前比特位置: {current_bit_index}/{secret_bits_len}")
# 确定当前帧需要嵌入的比特范围
remaining_bits = secret_bits_len - current_bit_index
current_frame_bits = secret_bits[current_bit_index:current_bit_index + remaining_bits]
# 嵌入数据
stego_frame, embedded_bits, start_pixel_index = embed_lsb_frame(
frame, current_frame_bits, start_pixel_index
)
# 更新已嵌入的比特数
current_bit_index += embedded_bits
# 写入处理后的帧
out.write(stego_frame.astype(np.uint8))
# 处理剩余的帧(不嵌入数据)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
out.write(frame)
# 释放资源
cap.release()
out.release()
# 计算嵌入率
embedding_rate = (current_bit_index / 8) / (width * height * frame_count * 3 / 8) * 100
result = {
'input_video': input_video_path,
'output_video': output_video_path,
'secret_text_length': len(secret_text),
'embedded_bits': current_bit_index,
'embedded_bytes': current_bit_index // 8,
'embedding_rate': embedding_rate,
'frames_processed': frame_num,
'success': current_bit_index >= secret_bits_len - 8 # 检查是否嵌入了完整信息(减去结束标记)
}
print("\n嵌入完成!")
print(f"已嵌入 {current_bit_index} 比特 ({current_bit_index // 8} 字节)")
print(f"嵌入率: {embedding_rate:.2f}%")
print(f"成功: {result['success']}")
return result
def extract_video_lsb(input_video_path):
"""从视频中提取LSB隐写数据
Args:
input_video_path: 可能包含隐写数据的视频文件路径
Returns:
str: 提取的秘密文本
"""
# 检查输入文件是否存在
if not os.path.exists(input_video_path):
print(f"错误:输入文件 '{input_video_path}' 不存在")
return None
# 打开视频文件
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
print("错误:无法打开输入视频")
return None
# 获取视频属性
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"视频属性: {width}x{height}, {fps} FPS, {frame_count} 帧")
# 提取过程
extracted_bits = []
start_pixel_index = 0
frame_num = 0
found_end_marker = False
end_marker = [1, 1, 1, 1, 1, 1, 1, 1] # 8个1作为结束标记
while cap.isOpened() and not found_end_marker:
ret, frame = cap.read()
if not ret:
break
frame_num += 1
# 每次尝试提取100个字节(800比特)的数据
# 实际应用中可以根据需要调整
extract_bits_count = 800
# 提取数据
bits, extracted_count, start_pixel_index = extract_lsb_frame(
frame, extract_bits_count, start_pixel_index
)
# 添加到已提取的比特序列
extracted_bits.extend(bits)
# 检查是否包含结束标记
# 遍历已提取的比特序列,查找结束标记
for i in range(len(extracted_bits) - 7):
if extracted_bits[i:i+8] == end_marker:
found_end_marker = True
# 只保留结束标记之前的数据
extracted_bits = extracted_bits[:i]
print(f"在帧 {frame_num} 中找到结束标记,提取完成")
break
# 释放资源
cap.release()
# 将提取的比特序列转换为文本
secret_text = bits_to_text(extracted_bits)
print(f"\n提取完成!")
print(f"提取的比特数: {len(extracted_bits)}")
print(f"提取的文本长度: {len(secret_text)} 字符")
print(f"提取的文本: {secret_text}")
return secret_text
def test_video_steganography(input_video_path, test_message="这是一段测试视频隐写的秘密消息!"):
"""测试视频隐写和提取的完整流程
Args:
input_video_path: 输入视频文件路径
test_message: 测试消息
Returns:
bool: 测试是否成功
"""
print("===== 开始视频隐写测试 =====")
# 生成临时输出文件名
output_video_path = "temp_stego_video.mp4"
# 嵌入数据
print("\n[1] 嵌入隐写数据...")
embed_result = embed_video_lsb(input_video_path, output_video_path, test_message)
if not embed_result or not embed_result['success']:
print("嵌入失败!")
return False
# 提取数据
print("\n[2] 提取隐写数据...")
extracted_message = extract_video_lsb(output_video_path)
if extracted_message is None:
print("提取失败!")
return False
# 验证结果
print("\n[3] 验证结果...")
print(f"原始消息: {test_message}")
print(f"提取消息: {extracted_message}")
is_success = test_message == extracted_message
print(f"测试{'成功' if is_success else '失败'}: {is_success}")
# 清理临时文件
if os.path.exists(output_video_path):
os.remove(output_video_path)
print(f"已清理临时文件: {output_video_path}")
print("\n===== 视频隐写测试结束 =====")
return is_success
# 使用示例
# test_video_steganography('input_video.mp4')
下面是基于帧差的自适应视频隐写技术的实现代码:
```python
def calculate_frame_difference(frame1, frame2):
"""计算两帧之间的差异
Args:
frame1: 第一帧
frame2: 第二帧
Returns:
numpy.ndarray: 帧差图像
"""
# 转换为灰度图以便计算差异
if len(frame1.shape) == 3:
gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
else:
gray1 = frame1.copy()
if len(frame2.shape) == 3:
gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
else:
gray2 = frame2.copy()
# 计算帧差绝对值
diff = cv2.absdiff(gray1, gray2)
return diff
def create_embedding_mask(diff, high_threshold=50, medium_threshold=20):
"""基于帧差创建嵌入掩码
Args:
diff: 帧差图像
high_threshold: 高活跃度区域的阈值
medium_threshold: 中等活跃度区域的阈值
Returns:
numpy.ndarray: 嵌入掩码,0表示不嵌入,1表示嵌入1位,2表示嵌入2位
"""
# 创建嵌入掩码
mask = np.zeros_like(diff, dtype=np.uint8)
# 高活跃度区域,可以嵌入2位
mask[diff > high_threshold] = 2
# 中等活跃度区域,可以嵌入1位
mask[(diff <= high_threshold) & (diff > medium_threshold)] = 1
# 低活跃度区域,不嵌入
mask[diff <= medium_threshold] = 0
return mask
def embed_adaptive_frame(frame, diff_mask, bits, start_index=0):
"""自适应地在帧中嵌入数据
Args:
frame: 输入视频帧
diff_mask: 帧差掩码,指示每个像素可以嵌入的位数
bits: 要嵌入的比特序列
start_index: 开始嵌入的索引
Returns:
tuple: (嵌入数据后的帧, 已嵌入的比特数, 结束索引)
"""
# 获取帧的尺寸
height, width = diff_mask.shape
channels = 1 if len(frame.shape) == 2 else frame.shape[2]
# 复制帧以避免修改原始帧
stego_frame = frame.copy()
# 嵌入比特的索引
bit_index = start_index
embedded_bits = 0
# 遍历每个像素
for i in range(height):
for j in range(width):
# 检查是否还有比特需要嵌入
if bit_index >= len(bits):
break
# 获取当前像素的嵌入容量
capacity = diff_mask[i, j]
# 如果容量为0,跳过该像素
if capacity == 0:
continue
# 根据不同通道嵌入数据
for c in range(min(channels, 3)): # 最多处理3个通道
# 确保还有数据可嵌入
if bit_index >= len(bits):
break
# 根据容量嵌入不同位数
if capacity == 1:
# 嵌入1位(最低有效位)
stego_frame[i, j, c] = (stego_frame[i, j, c] & 0xFE) | bits[bit_index]
bit_index += 1
embedded_bits += 1
elif capacity == 2 and bit_index + 1 < len(bits):
# 嵌入2位(最低两位)
stego_frame[i, j, c] = (stego_frame[i, j, c] & 0xFC) | (bits[bit_index] << 1) | bits[bit_index + 1]
bit_index += 2
embedded_bits += 2
# 检查是否还有比特需要嵌入
if bit_index >= len(bits):
break
return stego_frame, embedded_bits, bit_index
def extract_adaptive_frame(frame, diff_mask, num_bits, start_index=0):
"""自适应地从帧中提取数据
Args:
frame: 可能包含隐写数据的视频帧
diff_mask: 帧差掩码,指示每个像素可能嵌入的位数
num_bits: 要提取的比特数
start_index: 开始提取的索引
Returns:
tuple: (提取的比特序列, 已提取的比特数, 结束索引)
"""
# 获取帧的尺寸
height, width = diff_mask.shape
channels = 1 if len(frame.shape) == 2 else frame.shape[2]
# 存储提取的比特
extracted_bits = []
# 提取比特的索引
bit_index = start_index
extracted_count = 0
# 遍历每个像素
for i in range(height):
for j in range(width):
# 检查是否已经提取了足够的比特
if extracted_count >= num_bits:
break
# 获取当前像素的嵌入容量
capacity = diff_mask[i, j]
# 如果容量为0,跳过该像素
if capacity == 0:
continue
# 根据不同通道提取数据
for c in range(min(channels, 3)): # 最多处理3个通道
# 确保还需要提取更多比特
if extracted_count >= num_bits:
break
# 根据容量提取不同位数
if capacity == 1:
# 提取1位(最低有效位)
bit = frame[i, j, c] & 1
extracted_bits.append(bit)
bit_index += 1
extracted_count += 1
elif capacity == 2:
# 提取2位(最低两位)
bits = frame[i, j, c] & 3 # 0b11
bit1 = (bits >> 1) & 1
bit2 = bits & 1
extracted_bits.append(bit1)
extracted_count += 1
bit_index += 1
# 检查是否还需要提取第二比特
if extracted_count < num_bits:
extracted_bits.append(bit2)
extracted_count += 1
bit_index += 1
# 检查是否已经提取了足够的比特
if extracted_count >= num_bits:
break
return extracted_bits, extracted_count, bit_index
def adaptive_video_steganography(input_video_path, output_video_path, secret_text):
"""基于帧差的自适应视频隐写
Args:
input_video_path: 输入视频文件路径
output_video_path: 输出视频文件路径
secret_text: 要隐藏的文本
Returns:
dict: 嵌入结果信息
"""
# 检查输入文件是否存在
if not os.path.exists(input_video_path):
print(f"错误:输入文件 '{input_video_path}' 不存在")
return None
# 转换秘密文本为比特序列
secret_bits = text_to_bits(secret_text)
# 添加结束标记(8个1)
secret_bits.extend([1] * 8)
secret_bits_len = len(secret_bits)
print(f"要嵌入的比特数: {secret_bits_len}")
# 打开视频文件
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
print("错误:无法打开输入视频")
return None
# 获取视频属性
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"视频属性: {width}x{height}, {fps} FPS, {frame_count} 帧")
# 创建视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
# 读取第一帧作为前一帧
ret, prev_frame = cap.read()
if not ret:
print("错误:无法读取视频的第一帧")
cap.release()
return None
# 写入第一帧(不嵌入数据)
out.write(prev_frame)
# 嵌入过程
current_bit_index = 0
frame_num = 1 # 已经处理了第一帧
while cap.isOpened() and current_bit_index < secret_bits_len:
ret, current_frame = cap.read()
if not ret:
break
frame_num += 1
print(f"处理帧 {frame_num}/{frame_count}, 当前比特位置: {current_bit_index}/{secret_bits_len}")
# 计算帧差
diff = calculate_frame_difference(prev_frame, current_frame)
# 创建嵌入掩码
mask = create_embedding_mask(diff)
# 确定当前帧需要嵌入的比特范围
remaining_bits = secret_bits_len - current_bit_index
current_frame_bits = secret_bits[current_bit_index:current_bit_index + remaining_bits]
# 自适应嵌入数据
stego_frame, embedded_bits, current_bit_index = embed_adaptive_frame(
current_frame, mask, current_frame_bits, current_bit_index
)
# 写入处理后的帧
out.write(stego_frame)
# 更新前一帧
prev_frame = current_frame.copy()
# 处理剩余的帧(不嵌入数据)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
out.write(frame)
# 释放资源
cap.release()
out.release()
# 计算嵌入率
embedding_rate = (current_bit_index / 8) / (width * height * frame_count * 3 / 8) * 100
result = {
'input_video': input_video_path,
'output_video': output_video_path,
'secret_text_length': len(secret_text),
'embedded_bits': current_bit_index,
'embedded_bytes': current_bit_index // 8,
'embedding_rate': embedding_rate,
'frames_processed': frame_num,
'success': current_bit_index >= secret_bits_len - 8 # 检查是否嵌入了完整信息(减去结束标记)
}
print("\n嵌入完成!")
print(f"已嵌入 {current_bit_index} 比特 ({current_bit_index // 8} 字节)")
print(f"嵌入率: {embedding_rate:.2f}%")
print(f"成功: {result['success']}")
return result
# 使用示例
# adaptive_video_steganography('input_video.mp4', 'adaptive_stego_video.mp4', '这是一段使用自适应视频隐写的秘密消息!')
对于视频隐写技术的评估,主要关注两个方面:嵌入容量和视觉质量。下面我们实现几种常用的视频质量评估指标:
```python
import numpy as np
import cv2
from skimage.metrics import structural_similarity as ssim
from skimage.metrics import peak_signal_noise_ratio as psnr
import os
def calculate_psnr(original_frame, stego_frame):
"""计算峰值信噪比(PSNR)
Args:
original_frame: 原始视频帧
stego_frame: 隐写后的视频帧
Returns:
float: PSNR值,单位为dB
"""
# 确保两个帧的数据类型相同
if original_frame.dtype != stego_frame.dtype:
original_frame = original_frame.astype(np.float64)
stego_frame = stego_frame.astype(np.float64)
# 计算MSE(均方误差)
mse = np.mean((original_frame - stego_frame) ** 2)
# 避免除零错误
if mse == 0:
return float('inf')
# 计算PSNR
# 假设像素值范围是0-255
max_pixel = 255.0
psnr_value = 20 * np.log10(max_pixel / np.sqrt(mse))
return psnr_value
def calculate_ssim(original_frame, stego_frame):
"""计算结构相似性指数(SSIM)
Args:
original_frame: 原始视频帧
stego_frame: 隐写后的视频帧
Returns:
float: SSIM值,范围0-1
"""
# 如果是彩色图像,转换为灰度图
if len(original_frame.shape) == 3:
original_gray = cv2.cvtColor(original_frame, cv2.COLOR_BGR2GRAY)
stego_gray = cv2.cvtColor(stego_frame, cv2.COLOR_BGR2GRAY)
else:
original_gray = original_frame.copy()
stego_gray = stego_frame.copy()
# 计算SSIM
# win_size需要是奇数,且不大于图像尺寸的7倍
win_size = min(7, original_gray.shape[0] // 2, original_gray.shape[1] // 2)
if win_size % 2 == 0:
win_size -= 1
# 确保win_size至少为1
win_size = max(1, win_size)
# 计算SSIM
ssim_value = ssim(original_gray, stego_gray, win_size=win_size, data_range=255)
return ssim_value
def calculate_vif(original_frame, stego_frame):
"""计算视觉信息保真度(VIF)
注:这是一个简化版本的VIF实现
Args:
original_frame: 原始视频帧
stego_frame: 隐写后的视频帧
Returns:
float: VIF值
"""
# 如果是彩色图像,转换为灰度图
if len(original_frame.shape) == 3:
original_gray = cv2.cvtColor(original_frame, cv2.COLOR_BGR2GRAY)
stego_gray = cv2.cvtColor(stego_frame, cv2.COLOR_BGR2GRAY)
else:
original_gray = original_frame.copy()
stego_gray = stego_frame.copy()
# 转换为float64类型
original_gray = original_gray.astype(np.float64)
stego_gray = stego_gray.astype(np.float64)
# 使用高斯滤波器
sigma = 2.0
gaussian_filter = cv2.getGaussianKernel(5, sigma)
gaussian_filter = gaussian_filter @ gaussian_filter.T
# 计算局部均值
mu1 = cv2.filter2D(original_gray, -1, gaussian_filter)
mu2 = cv2.filter2D(stego_gray, -1, gaussian_filter)
# 计算局部方差
sigma1_sq = cv2.filter2D(original_gray**2, -1, gaussian_filter) - mu1**2
sigma2_sq = cv2.filter2D(stego_gray**2, -1, gaussian_filter) - mu2**2
sigma12 = cv2.filter2D(original_gray*stego_gray, -1, gaussian_filter) - mu1*mu2
# 计算VIF
# 添加小的常数以避免除零错误
sigma1_sq = sigma1_sq + 1e-10
sigma2_sq = sigma2_sq + 1e-10
# 简化的VIF计算
vif = np.sum((sigma12**2 + 1e-10**2) / (sigma1_sq * sigma2_sq + 1e-10**2)) / sigma1_sq.size
return vif
def calculate_frame_histogram(original_frame, stego_frame):
"""计算帧直方图差异
Args:
original_frame: 原始视频帧
stego_frame: 隐写后的视频帧
Returns:
float: 直方图差异值
"""
# 如果是彩色图像,转换为灰度图
if len(original_frame.shape) == 3:
original_gray = cv2.cvtColor(original_frame, cv2.COLOR_BGR2GRAY)
stego_gray = cv2.cvtColor(stego_frame, cv2.COLOR_BGR2GRAY)
else:
original_gray = original_frame.copy()
stego_gray = stego_frame.copy()
# 计算直方图
hist1 = cv2.calcHist([original_gray], [0], None, [256], [0, 256])
hist2 = cv2.calcHist([stego_gray], [0], None, [256], [0, 256])
# 归一化直方图
hist1 = cv2.normalize(hist1, hist1).flatten()
hist2 = cv2.normalize(hist2, hist2).flatten()
# 计算直方图差异(相关系数)
# 相关系数越接近1,直方图越相似
correlation = cv2.compareHist(hist1, hist2, cv2.HISTCMP_CORREL)
return correlation
def evaluate_video_quality(original_video_path, stego_video_path, sample_frames=10):
"""评估视频隐写质量
Args:
original_video_path: 原始视频路径
stego_video_path: 隐写后的视频路径
sample_frames: 采样帧数,用于质量评估
Returns:
dict: 质量评估结果
"""
# 检查输入文件是否存在
if not os.path.exists(original_video_path):
print(f"错误:原始视频文件 '{original_video_path}' 不存在")
return None
if not os.path.exists(stego_video_path):
print(f"错误:隐写视频文件 '{stego_video_path}' 不存在")
return None
# 打开视频文件
original_cap = cv2.VideoCapture(original_video_path)
stego_cap = cv2.VideoCapture(stego_video_path)
if not original_cap.isOpened():
print("错误:无法打开原始视频")
return None
if not stego_cap.isOpened():
print("错误:无法打开隐写视频")
return None
# 获取视频属性
original_frame_count = int(original_cap.get(cv2.CAP_PROP_FRAME_COUNT))
stego_frame_count = int(stego_cap.get(cv2.CAP_PROP_FRAME_COUNT))
if original_frame_count != stego_frame_count:
print(f"警告:两个视频的帧数不匹配!原始视频: {original_frame_count} 帧, 隐写视频: {stego_frame_count} 帧")
# 计算采样间隔
frame_count = min(original_frame_count, stego_frame_count)
if sample_frames > frame_count:
sample_frames = frame_count
# 如果帧数较少,则全部评估
if frame_count <= 20:
sample_frames = frame_count
sample_indices = list(range(frame_count))
else:
# 均匀采样
step = frame_count / sample_frames
sample_indices = [int(i * step) for i in range(sample_frames)]
# 存储评估结果
psnr_values = []
ssim_values = []
vif_values = []
hist_correlations = []
# 当前帧索引
current_frame_idx = 0
processed_frames = 0
print(f"开始评估视频质量,共采样 {sample_frames} 帧")
while original_cap.isOpened() and stego_cap.isOpened() and processed_frames < sample_frames:
# 读取原始视频帧
original_ret, original_frame = original_cap.read()
# 读取隐写视频帧
stego_ret, stego_frame = stego_cap.read()
if not original_ret or not stego_ret:
break
# 检查是否需要评估当前帧
if current_frame_idx in sample_indices:
print(f"评估第 {current_frame_idx} 帧")
# 计算PSNR
psnr_val = calculate_psnr(original_frame, stego_frame)
psnr_values.append(psnr_val)
# 计算SSIM
ssim_val = calculate_ssim(original_frame, stego_frame)
ssim_values.append(ssim_val)
# 计算VIF
vif_val = calculate_vif(original_frame, stego_frame)
vif_values.append(vif_val)
# 计算直方图相关性
hist_corr = calculate_frame_histogram(original_frame, stego_frame)
hist_correlations.append(hist_corr)
processed_frames += 1
current_frame_idx += 1
# 释放资源
original_cap.release()
stego_cap.release()
# 计算平均评估指标
avg_psnr = np.mean(psnr_values) if psnr_values else 0
avg_ssim = np.mean(ssim_values) if ssim_values else 0
avg_vif = np.mean(vif_values) if vif_values else 0
avg_hist_corr = np.mean(hist_correlations) if hist_correlations else 0
# 质量评估结果解释
psnr_quality = "" # 初始化psnr_quality变量
if avg_psnr >= 40:
psnr_quality = "优秀 (难以察觉差异)"
elif avg_psnr >= 35:
psnr_quality = "良好 (极小差异)"
elif avg_psnr >= 30:
psnr_quality = "一般 (轻微差异)"
elif avg_psnr >= 25:
psnr_quality = "较差 (明显差异)"
else:
psnr_quality = "差 (严重差异)"
ssim_quality = "" # 初始化ssim_quality变量
if avg_ssim >= 0.95:
ssim_quality = "优秀 (几乎完全相同)"
elif avg_ssim >= 0.90:
ssim_quality = "良好 (非常相似)"
elif avg_ssim >= 0.80:
ssim_quality = "一般 (轻微失真)"
elif avg_ssim >= 0.70:
ssim_quality = "较差 (明显失真)"
else:
ssim_quality = "差 (严重失真)"
result = {
'original_video': original_video_path,
'stego_video': stego_video_path,
'frames_evaluated': processed_frames,
'avg_psnr': avg_psnr,
'psnr_quality': psnr_quality,
'avg_ssim': avg_ssim,
'ssim_quality': ssim_quality,
'avg_vif': avg_vif,
'avg_histogram_correlation': avg_hist_corr,
'detailed_results': {
'psnr_values': psnr_values,
'ssim_values': ssim_values,
'vif_values': vif_values,
'histogram_correlations': hist_correlations
}
}
print("\n视频质量评估完成!")
print(f"平均PSNR: {avg_psnr:.2f} dB ({psnr_quality})")
print(f"平均SSIM: {avg_ssim:.4f} ({ssim_quality})")
print(f"平均VIF: {avg_vif:.4f}")
print(f"平均直方图相关性: {avg_hist_corr:.4f}")
return result
# 使用示例
# evaluate_video_quality('original_video.mp4', 'stego_video.mp4')
## 第三章 视频编码域隐写技术
视频编码域隐写是在视频压缩编码过程中进行数据隐藏的技术。与空间域隐写相比,编码域隐写具有更高的隐蔽性和效率。下面介绍几种常见的视频编码域隐写技术。
#### 3.1.1 基于宏块类型的H.264隐写
在H.264编码中,宏块(Macroblock)是编码的基本单位。宏块可以有多种类型,如I宏块、P宏块、B宏块等。我们可以通过微调宏块的编码参数来隐藏信息。下面是基于FFmpeg和x264的H.264编码域隐写实现代码:
```python
import os
import subprocess
import numpy as np
import struct
def text_to_binary(text):
"""将文本转换为二进制字符串"""
binary = ''.join(format(ord(char), '08b') for char in text)
# 添加结束标记 (8个1)
binary += '11111111'
return binary
def binary_to_text(binary):
"""将二进制字符串转换为文本"""
# 查找结束标记
end_pos = binary.find('11111111')
if end_pos != -1:
binary = binary[:end_pos]
# 确保二进制长度是8的倍数
if len(binary) % 8 != 0:
binary = binary[:-(len(binary) % 8)]
# 转换为文本
text = ''
for i in range(0, len(binary), 8):
byte = binary[i:i+8]
text += chr(int(byte, 2))
return text
def modify_x264_param(param_file, binary_data):
"""修改x264参数文件来嵌入隐藏数据
Args:
param_file: x264参数文件路径
binary_data: 要嵌入的二进制数据
Returns:
str: 修改后的参数文件路径
"""
# 读取原始参数文件
with open(param_file, 'r') as f:
params = f.readlines()
# 创建修改后的参数文件
modified_param_file = param_file.replace('.txt', '_stego.txt')
# 参数索引,用于嵌入数据
binary_index = 0
with open(modified_param_file, 'w') as f:
for param_line in params:
# 保留原始注释行
if param_line.strip().startswith('#'):
f.write(param_line)
continue
# 处理参数行
if '=' in param_line and binary_index < len(binary_data):
# 尝试在宏块相关参数中嵌入数据
if any(keyword in param_line.lower() for keyword in ['mb_tree', 'subme', 'me', 'partitions']):
param_name, param_value = param_line.split('=', 1)
param_value = param_value.strip()
# 如果是数值参数,微调其值来嵌入数据位
try:
# 尝试转换为整数
value = int(param_value)
# 根据当前数据位调整参数值
bit = int(binary_data[binary_index])
# 微调参数值,+1或-1
if bit == 1 and value < 100:
value += 1
elif bit == 0 and value > 0:
value -= 1
# 写入修改后的参数
f.write(f"{param_name.strip()} = {value}\n")
binary_index += 1
except ValueError:
# 如果不是数值参数,保持不变
f.write(param_line)
else:
# 不是我们感兴趣的参数,保持不变
f.write(param_line)
else:
# 没有等号或数据已嵌入完成,保持不变
f.write(param_line)
# 检查是否成功嵌入所有数据
if binary_index < len(binary_data):
print(f"警告:无法嵌入所有数据!仅嵌入了 {binary_index}/{len(binary_data)} 位")
return modified_param_file
def extract_from_x264_param(original_param_file, stego_param_file, expected_length=None):
"""从x264参数文件中提取隐藏数据
Args:
original_param_file: 原始x264参数文件路径
stego_param_file: 隐写后的x264参数文件路径
expected_length: 预期的二进制数据长度(可选)
Returns:
str: 提取的二进制数据
"""
# 读取原始参数文件
with open(original_param_file, 'r') as f:
original_params = f.readlines()
# 读取隐写后的参数文件
with open(stego_param_file, 'r') as f:
stego_params = f.readlines()
# 确保两个文件行数相同
if len(original_params) != len(stego_params):
print("错误:两个参数文件的行数不同")
return ""
# 提取隐藏数据
binary_data = ''
for orig_line, stego_line in zip(original_params, stego_params):
# 跳过注释行
if orig_line.strip().startswith('#'):
continue
# 检查是否已达到预期长度
if expected_length and len(binary_data) >= expected_length:
break
# 处理参数行
if '=' in orig_line and '=' in stego_line:
# 尝试在宏块相关参数中提取数据
if any(keyword in orig_line.lower() for keyword in ['mb_tree', 'subme', 'me', 'partitions']):
orig_name, orig_value = orig_line.split('=', 1)
stego_name, stego_value = stego_line.split('=', 1)
try:
# 尝试转换为整数
orig_val = int(orig_value.strip())
stego_val = int(stego_value.strip())
# 根据参数值的变化确定嵌入的数据位
if stego_val > orig_val:
binary_data += '1'
elif stego_val < orig_val:
binary_data += '0'
# 如果值没有变化,我们无法确定数据位,可以跳过或假设为0
except ValueError:
# 如果不是数值参数,跳过
pass
return binary_data
def h264_macroblock_steganography(input_video, output_video, secret_text, x264_param_file):
"""基于H.264宏块类型的隐写实现
Args:
input_video: 输入视频文件路径
output_video: 输出视频文件路径
secret_text: 要隐藏的文本
x264_param_file: x264参数文件路径
Returns:
dict: 嵌入结果信息
"""
# 检查输入文件是否存在
if not os.path.exists(input_video):
print(f"错误:输入视频文件 '{input_video}' 不存在")
return None
if not os.path.exists(x264_param_file):
print(f"错误:x264参数文件 '{x264_param_file}' 不存在")
return None
# 将文本转换为二进制数据
binary_data = text_to_binary(secret_text)
print(f"要嵌入的二进制数据长度: {len(binary_data)} 位")
# 修改x264参数文件
modified_param_file = modify_x264_param(x264_param_file, binary_data)
# 构建FFmpeg命令,使用修改后的x264参数进行编码
ffmpeg_cmd = [
'ffmpeg', '-y', '-i', input_video,
'-c:v', 'libx264', '-x264-params', f'file={modified_param_file}',
'-c:a', 'copy', # 复制音频流
output_video
]
print(f"开始编码视频,使用修改后的x264参数...")
try:
# 执行FFmpeg命令
subprocess.run(ffmpeg_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(f"视频编码完成!输出文件: {output_video}")
except subprocess.CalledProcessError as e:
print(f"FFmpeg执行失败: {e}")
return None
# 计算嵌入率
# 首先获取视频信息
probe_cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0',
'-count_packets', '-show_entries', 'stream=nb_read_packets',
'-of', 'csv=p=0', input_video]
try:
result = subprocess.run(probe_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
frame_count = int(result.stdout.decode('utf-8').strip())
except Exception as e:
print(f"获取视频信息失败: {e}")
frame_count = 0
# 计算嵌入率
if frame_count > 0:
embedding_rate = (len(secret_text) * 8) / (frame_count * 64) * 100 # 假设平均每宏块64像素
else:
embedding_rate = 0
result = {
'input_video': input_video,
'output_video': output_video,
'secret_text_length': len(secret_text),
'binary_data_length': len(binary_data),
'modified_param_file': modified_param_file,
'embedding_rate': embedding_rate,
'frame_count': frame_count,
'success': os.path.exists(output_video)
}
print(f"\n嵌入完成!")
print(f"嵌入文本长度: {len(secret_text)} 字符")
print(f"嵌入数据长度: {len(binary_data)} 位")
print(f"估计嵌入率: {embedding_rate:.4f}%")
print(f"成功: {result['success']}")
return result
# 创建示例x264参数文件
def create_sample_x264_param_file(file_path):
"""创建示例x264参数文件"""
params = [
"# x264编码参数文件\n",
"# 基础参数\n",
"preset = medium\n",
"tune = film\n",
"crf = 23\n",
"# 宏块相关参数\n",
"me = hex\n",
"subme = 7\n",
"partitions = p8x8,b8x8,i8x8,i4x4\n",
"me_range = 16\n",
"trellis = 1\n",
"8x8dct = 1\n",
"cqm = flat\n",
"deadzone_inter = 21\n",
"deadzone_intra = 11\n",
"mb_tree = 1\n",
"# 其他参数\n",
"chroma_qp_offset = -2\n",
"threads = 0\n",
"thread_type = 1\n",
"sliced_threads = 0\n",
"nr = 0\n",
"decimate = 1\n",
"interlaced = 0\n",
"constrained_intra = 0\n",
"bframes = 3\n",
"b_pyramid = 2\n",
"b_adapt = 2\n",
"b_bias = 0\n",
"direct = 1\n",
"weightb = 1\n",
"open_gop = 0\n",
"weightp = 2\n",
"keyint_min = 25\n",
"scenecut = 40\n",
"intra_refresh = 0\n",
"rc_lookahead = 40\n",
"rc = crf\n",
"mbtree = 1\n",
"qcomp = 0.60\n",
"qpmin = 0\n",
"qpmax = 69\n",
"qpstep = 4\n",
"ip_ratio = 1.40\n",
"pb_ratio = 1.30\n",
"aq = 1\n",
"aq_strength = 1.0\n",
]
with open(file_path, 'w') as f:
f.writelines(params)
print(f"已创建示例x264参数文件: {file_path}")
return file_path
# 使用示例
# x264_param_file = create_sample_x264_param_file('x264_params.txt')
# h264_macroblock_steganography('input.mp4', 'output_stego.mp4', '这是一段隐藏在H.264编码参数中的秘密消息!', x264_param_file)
#### 3.2.1 运动矢量隐写实现
下面是一个基于FFmpeg和OpenCV的运动矢量隐写实现示例:
```python
import os
import subprocess
import numpy as np
import cv2
import struct
def text_to_bits(text):
"""将文本转换为二进制字符串"""
result = []
for char in text:
bits = bin(ord(char))[2:].zfill(8)
result.extend([int(bit) for bit in bits])
# 添加结束标记 (8个1)
result.extend([1,1,1,1,1,1,1,1])
return result
def bits_to_text(bits):
"""将二进制位列表转换回文本"""
# 查找结束标记
try:
end_idx = bits.index(1)
# 寻找8个连续的1
for i in range(end_idx, len(bits)-7):
if all(bits[i:i+8]):
bits = bits[:i]
break
except ValueError:
pass
# 确保位数是8的倍数
if len(bits) % 8 != 0:
bits = bits[:-(len(bits) % 8)]
# 转换为文本
chars = []
for i in range(0, len(bits), 8):
byte = bits[i:i+8]
byte_str = ''.join(str(bit) for bit in byte)
chars.append(chr(int(byte_str, 2)))
return ''.join(chars)
def extract_motion_vectors(video_path, output_file):
"""从视频中提取运动矢量信息并保存到文件"""
# 使用FFmpeg提取运动矢量信息
cmd = [
'ffmpeg', '-y', '-i', video_path,
'-vf', 'codecview=mv=pf+bf+bb', '-f', 'rawvideo', output_file
]
print(f"开始提取运动矢量信息...")
try:
result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(f"运动矢量信息提取完成,保存到: {output_file}")
return True
except subprocess.CalledProcessError as e:
print(f"FFmpeg执行失败: {e}")
print(f"错误输出: {e.stderr.decode('utf-8')}")
return False
def analyze_motion_vectors(video_path):
"""分析视频中的运动矢量,返回适合隐写的位置信息"""
# 使用FFprobe获取视频信息
cmd = [
'ffprobe', '-v', 'error', '-select_streams', 'v:0',
'-show_entries', 'stream=width,height,r_frame_rate,nb_frames',
'-of', 'json', video_path
]
try:
result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
import json
info = json.loads(result.stdout.decode('utf-8'))
width = int(info['streams'][0]['width'])
height = int(info['streams'][0]['height'])
frame_rate = info['streams'][0]['r_frame_rate']
frame_count = int(info['streams'][0]['nb_frames'])
print(f"视频信息:")
print(f" 分辨率: {width}x{height}")
print(f" 帧率: {frame_rate}")
print(f" 总帧数: {frame_count}")
# 计算可嵌入容量(假设每帧嵌入一些位)
# 这里使用一个简单的估计:每8x8宏块最多嵌入1位
macroblocks_per_frame = (width // 8) * (height // 8)
estimated_capacity_per_frame = macroblocks_per_frame // 16 # 为了安全,我们只使用可用宏块的1/16
total_capacity = estimated_capacity_per_frame * frame_count
print(f" 每帧宏块数: {macroblocks_per_frame}")
print(f" 每帧估计可嵌入位数: {estimated_capacity_per_frame}")
print(f" 总估计可嵌入位数: {total_capacity}")
print(f" 总估计可嵌入字符数: {total_capacity // 8}")
return {
'width': width,
'height': height,
'frame_rate': frame_rate,
'frame_count': frame_count,
'macroblocks_per_frame': macroblocks_per_frame,
'estimated_capacity_per_frame': estimated_capacity_per_frame,
'total_capacity': total_capacity,
'estimated_char_capacity': total_capacity // 8
}
except Exception as e:
print(f"分析视频失败: {e}")
return None
def motion_vector_steganography(video_path, output_path, secret_text, strength=0.05):
"""基于运动矢量的视频隐写实现
Args:
video_path: 输入视频文件路径
output_path: 输出视频文件路径
secret_text: 要隐藏的文本
strength: 运动矢量修改强度(0-1之间,默认0.05)
Returns:
dict: 嵌入结果信息
"""
# 检查输入文件是否存在
if not os.path.exists(video_path):
print(f"错误:输入视频文件 '{video_path}' 不存在")
return None
# 分析视频并检查容量
video_info = analyze_motion_vectors(video_path)
if not video_info:
return None
# 将文本转换为比特流
bits = text_to_bits(secret_text)
bit_count = len(bits)
# 检查容量是否足够
if bit_count > video_info['total_capacity']:
print(f"警告:隐藏数据过大!需要 {bit_count} 位,但视频只能容纳 {video_info['total_capacity']} 位")
# 截断数据
bits = bits[:video_info['total_capacity']]
bit_count = len(bits)
print(f"已截断数据至 {bit_count} 位")
print(f"要嵌入的二进制数据长度: {bit_count} 位")
# 创建临时文件用于存储修改后的视频帧
temp_file = 'temp_video.yuv'
# 使用FFmpeg提取YUV数据
extract_cmd = [
'ffmpeg', '-y', '-i', video_path,
'-pix_fmt', 'yuv420p', '-f', 'rawvideo', temp_file
]
try:
subprocess.run(extract_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(f"已提取YUV数据到临时文件: {temp_file}")
except subprocess.CalledProcessError as e:
print(f"提取YUV数据失败: {e}")
return None
# 读取YUV数据
frame_size = video_info['width'] * video_info['height'] * 3 // 2 # YUV420p格式
with open(temp_file, 'rb') as f:
yuv_data = bytearray(f.read())
# 创建一个临时的过滤脚本用于修改运动矢量
filter_script = 'motion_vector_filter.py'
with open(filter_script, 'w') as f:
f.write(f'''
import numpy as np
import sys
# 解析参数
if len(sys.argv) < 4:
print("用法: python motion_vector_filter.py <输入文件> <输出文件> <隐藏数据>")
sys.exit(1)
input_file = sys.argv[1]
output_file = sys.argv[2]
secret_bits_str = sys.argv[3]
# 将字符串转换回比特列表
secret_bits = [int(bit) for bit in secret_bits_str]
bit_index = 0
# 视频参数(需要根据实际情况修改)
width = {video_info['width']}
height = {video_info['height']}
frame_size = width * height * 3 // 2
strength = {strength}
print(f"处理视频: {width}x{height}")
print(f"嵌入数据长度: {{len(secret_bits)}} 位")
# 读取输入YUV数据
with open(input_file, 'rb') as f:
yuv_data = bytearray(f.read())
# 为了演示,我们简化处理:在Y分量的特定位置嵌入数据
# 注意:真实的运动矢量修改需要更复杂的FFmpeg过滤器
# 这里我们使用一个简化的模型来模拟效果
for i in range(0, len(yuv_data), frame_size):
# 处理一帧
frame_data = yuv_data[i:i+frame_size]
# 获取Y分量(亮度)
y_component = frame_data[:width*height]
# 计算每帧需要嵌入的位数
bits_per_frame = min(len(secret_bits) - bit_index, {video_info['estimated_capacity_per_frame']})
if bits_per_frame <= 0:
break
# 选择嵌入位置(均匀分布在帧中)
step = (width * height) // (bits_per_frame + 1)
# 嵌入数据
for j in range(bits_per_frame):
if bit_index >= len(secret_bits):
break
# 计算嵌入位置
pos = (j + 1) * step
if pos >= len(y_component):
continue
# 读取当前像素值
pixel_value = y_component[pos]
# 修改像素值来模拟运动矢量的变化效果
# 在真实实现中,这里应该修改运动矢量
if secret_bits[bit_index] == 1:
# 增加像素值(模拟运动矢量幅度增加)
if pixel_value < 255:
y_component[pos] = min(255, int(pixel_value * (1 + strength)))
else:
# 减少像素值(模拟运动矢量幅度减少)
if pixel_value > 0:
y_component[pos] = max(0, int(pixel_value * (1 - strength)))
bit_index += 1
# 将修改后的Y分量放回
frame_data[:width*height] = y_component
yuv_data[i:i+frame_size] = frame_data
# 写入修改后的数据
with open(output_file, 'wb') as f:
f.write(yuv_data)
print(f"成功嵌入 {{bit_index}} 位数据")
''')
# 准备修改后的临时文件
modified_temp_file = 'modified_temp_video.yuv'
# 调用Python过滤器脚本处理YUV数据
bits_str = ''.join(str(bit) for bit in bits)
filter_cmd = [
'python', filter_script,
temp_file, modified_temp_file, bits_str
]
try:
subprocess.run(filter_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(f"已处理YUV数据,模拟了运动矢量的修改")
except subprocess.CalledProcessError as e:
print(f"处理YUV数据失败: {e}")
return None
# 使用FFmpeg将处理后的YUV数据编码回视频
encode_cmd = [
'ffmpeg', '-y', '-f', 'rawvideo', '-pix_fmt', 'yuv420p',
'-s', f'{video_info["width"]}x{video_info["height"]}', '-i', modified_temp_file,
'-c:v', 'libx264', '-preset', 'medium', '-crf', '23',
'-c:a', 'copy', # 复制音频流(如果有)
output_path
]
try:
subprocess.run(encode_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(f"视频编码完成!输出文件: {output_path}")
except subprocess.CalledProcessError as e:
print(f"编码视频失败: {e}")
return None
finally:
# 清理临时文件
for file in [temp_file, modified_temp_file, filter_script]:
if os.path.exists(file):
try:
os.remove(file)
print(f"已清理临时文件: {file}")
except:
pass
# 计算嵌入率
embedding_rate = (bit_count / (video_info['frame_count'] * video_info['macroblocks_per_frame'])) * 100
result = {
'input_video': video_path,
'output_video': output_path,
'secret_text_length': len(secret_text),
'embedded_bits': bit_count,
'embedding_rate': embedding_rate,
'strength': strength,
'success': os.path.exists(output_path)
}
print(f"\n嵌入完成!")
print(f"嵌入文本长度: {len(secret_text)} 字符")
print(f"实际嵌入位数: {bit_count}")
print(f"嵌入率: {embedding_rate:.4f}%")
print(f"修改强度: {strength}")
print(f"成功: {result['success']}")
return result
def extract_from_motion_vector(video_path, original_path, expected_length=None):
"""从修改运动矢量的视频中提取隐藏数据
Args:
video_path: 隐写后的视频文件路径
original_path: 原始视频文件路径(用于对比)
expected_length: 预期提取的比特数(可选)
Returns:
str: 提取的文本
"""
# 分析视频信息
video_info = analyze_motion_vectors(video_path)
if not video_info:
return ""
# 创建临时文件
stego_temp = 'stego_temp.yuv'
orig_temp = 'orig_temp.yuv'
# 提取两个视频的YUV数据
for src, dst in [(video_path, stego_temp), (original_path, orig_temp)]:
cmd = [
'ffmpeg', '-y', '-i', src,
'-pix_fmt', 'yuv420p', '-f', 'rawvideo', dst
]
try:
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(f"已提取YUV数据: {dst}")
except subprocess.CalledProcessError as e:
print(f"提取YUV数据失败: {e}")
return ""
# 读取YUV数据
frame_size = video_info['width'] * video_info['height'] * 3 // 2
try:
with open(stego_temp, 'rb') as f:
stego_data = f.read()
with open(orig_temp, 'rb') as f:
orig_data = f.read()
except Exception as e:
print(f"读取YUV数据失败: {e}")
return ""
# 确保两个文件长度相同
min_len = min(len(stego_data), len(orig_data))
stego_data = stego_data[:min_len]
orig_data = orig_data[:min_len]
# 提取隐藏数据
extracted_bits = []
bit_index = 0
# 计算每帧需要提取的位数
bits_per_frame = video_info['estimated_capacity_per_frame']
# 逐帧比较
for i in range(0, min_len, frame_size):
# 处理一帧
stego_frame = stego_data[i:i+frame_size]
orig_frame = orig_data[i:i+frame_size]
# 获取Y分量(亮度)
stego_y = stego_frame[:video_info['width']*video_info['height']]
orig_y = orig_frame[:video_info['width']*video_info['height']]
# 计算嵌入位置(与嵌入时相同)
step = (video_info['width'] * video_info['height']) // (bits_per_frame + 1)
# 提取数据
for j in range(bits_per_frame):
if expected_length and len(extracted_bits) >= expected_length:
break
# 计算提取位置
pos = (j + 1) * step
if pos >= len(stego_y) or pos >= len(orig_y):
continue
# 比较像素值差异
if stego_y[pos] > orig_y[pos]:
extracted_bits.append(1)
else:
extracted_bits.append(0)
bit_index += 1
if expected_length and len(extracted_bits) >= expected_length:
break
# 清理临时文件
for file in [stego_temp, orig_temp]:
if os.path.exists(file):
try:
os.remove(file)
except:
pass
# 将提取的比特转换为文本
extracted_text = bits_to_text(extracted_bits)
print(f"提取完成!")
print(f"提取的比特数: {len(extracted_bits)}")
print(f"提取的文本: '{extracted_text}'")
return extracted_text
# 使用示例
# motion_vector_steganography('input.mp4', 'output_stego.mp4', '这是一段隐藏在运动矢量中的秘密消息!')
# extract_from_motion_vector('output_stego.mp4', 'input.mp4')
## 第四章 音频隐写与多模态隐写技术
视频隐写不仅可以隐藏在视频本身中,还可以结合音频隐写,实现多模态的数据隐藏。这一章我们将探讨音频隐写技术以及如何将视频隐写与音频隐写结合使用。
#### 4.1.1 音频隐写原理与技术
音频隐写利用音频信号的冗余性和人耳的感知特性来隐藏信息。常见的音频隐写技术包括:
1. **波形域隐写**:直接修改音频采样值
2. **变换域隐写**:在频域(如DCT、DWT)中隐藏信息
3. **压缩域隐写**:在音频压缩编码(如MP3、AAC)过程中隐藏信息
下面是基于PyDub和numpy的音频隐写实现代码:
```python
import numpy as np
from pydub import AudioSegment
import os
def text_to_bits(text):
"""将文本转换为二进制比特列表"""
result = []
for char in text:
bits = bin(ord(char))[2:].zfill(8)
result.extend([int(bit) for bit in bits])
# 添加结束标记 (8个1)
result.extend([1,1,1,1,1,1,1,1])
return result
def bits_to_text(bits):
"""将二进制比特列表转换回文本"""
# 查找结束标记
try:
for i in range(len(bits)-7):
if all(bits[i:i+8]):
bits = bits[:i]
break
except:
pass
# 确保位数是8的倍数
if len(bits) % 8 != 0:
bits = bits[:-(len(bits) % 8)]
# 转换为文本
chars = []
for i in range(0, len(bits), 8):
byte = bits[i:i+8]
byte_str = ''.join(str(bit) for bit in byte)
chars.append(chr(int(byte_str, 2)))
return ''.join(chars)
def lsb_audio_steganography(input_file, output_file, secret_text):
"""基于LSB的音频隐写实现
Args:
input_file: 输入音频文件路径
output_file: 输出音频文件路径
secret_text: 要隐藏的文本
Returns:
dict: 嵌入结果信息
"""
# 检查输入文件是否存在
if not os.path.exists(input_file):
print(f"错误:输入音频文件 '{input_file}' 不存在")
return None
# 加载音频文件
try:
audio = AudioSegment.from_file(input_file)
print(f"已加载音频文件: {input_file}")
print(f"音频长度: {len(audio)} ms")
print(f"声道数: {audio.channels}")
print(f"采样率: {audio.frame_rate} Hz")
print(f"样本宽度: {audio.sample_width} bytes")
except Exception as e:
print(f"加载音频文件失败: {e}")
return None
# 将音频转换为numpy数组
samples = np.array(audio.get_array_of_samples())
# 将文本转换为比特流
bits = text_to_bits(secret_text)
bit_count = len(bits)
# 检查容量是否足够
# 每个采样值可嵌入1位,我们只使用单声道或第一声道
max_bits = len(samples)
if bit_count > max_bits:
print(f"警告:隐藏数据过大!需要 {bit_count} 位,但音频只能容纳 {max_bits} 位")
# 截断数据
bits = bits[:max_bits]
bit_count = len(bits)
print(f"已截断数据至 {bit_count} 位")
print(f"要嵌入的二进制数据长度: {bit_count} 位")
# 嵌入数据
modified_samples = samples.copy()
for i in range(bit_count):
# 清除最低有效位
modified_samples[i] = modified_samples[i] & ~1
# 嵌入数据位
modified_samples[i] = modified_samples[i] | bits[i]
# 计算嵌入率
duration_seconds = len(audio) / 1000
embedding_rate = bit_count / (duration_seconds * audio.frame_rate) * 100
# 将修改后的numpy数组转换回音频
if audio.sample_width == 1:
# 8位音频
modified_samples = np.clip(modified_samples, -128, 127)
new_audio = AudioSegment(
modified_samples.astype(np.int8).tobytes(),
frame_rate=audio.frame_rate,
sample_width=audio.sample_width,
channels=audio.channels
)
elif audio.sample_width == 2:
# 16位音频
modified_samples = np.clip(modified_samples, -32768, 32767)
new_audio = AudioSegment(
modified_samples.astype(np.int16).tobytes(),
frame_rate=audio.frame_rate,
sample_width=audio.sample_width,
channels=audio.channels
)
elif audio.sample_width == 3:
# 24位音频(PyDub不直接支持,需要特殊处理)
print("警告:24位音频处理可能不完美")
byte_data = b''
for sample in modified_samples:
# 处理24位样本
if sample < 0:
sample += 16777216
byte_data += sample.to_bytes(3, byteorder='little', signed=False)
new_audio = AudioSegment(
byte_data,
frame_rate=audio.frame_rate,
sample_width=audio.sample_width,
channels=audio.channels
)
elif audio.sample_width == 4:
# 32位音频
modified_samples = np.clip(modified_samples, -2147483648, 2147483647)
new_audio = AudioSegment(
modified_samples.astype(np.int32).tobytes(),
frame_rate=audio.frame_rate,
sample_width=audio.sample_width,
channels=audio.channels
)
else:
print(f"不支持的音频样本宽度: {audio.sample_width}")
return None
# 导出修改后的音频
try:
new_audio.export(output_file, format="wav")
print(f"音频导出完成!输出文件: {output_file}")
except Exception as e:
print(f"导出音频失败: {e}")
return None
result = {
'input_file': input_file,
'output_file': output_file,
'secret_text_length': len(secret_text),
'embedded_bits': bit_count,
'embedding_rate': embedding_rate,
'audio_duration': duration_seconds,
'audio_channels': audio.channels,
'audio_sample_rate': audio.frame_rate,
'success': os.path.exists(output_file)
}
print(f"\n嵌入完成!")
print(f"嵌入文本长度: {len(secret_text)} 字符")
print(f"实际嵌入位数: {bit_count}")
print(f"嵌入率: {embedding_rate:.4f}%")
print(f"成功: {result['success']}")
return result
def extract_from_lsb_audio(stego_file, expected_length=None):
"""从LSB隐写的音频中提取隐藏数据
Args:
stego_file: 隐写后的音频文件路径
expected_length: 预期提取的比特数(可选)
Returns:
str: 提取的文本
"""
# 检查文件是否存在
if not os.path.exists(stego_file):
print(f"错误:音频文件 '{stego_file}' 不存在")
return ""
# 加载音频文件
try:
audio = AudioSegment.from_file(stego_file)
print(f"已加载音频文件: {stego_file}")
except Exception as e:
print(f"加载音频文件失败: {e}")
return ""
# 将音频转换为numpy数组
samples = np.array(audio.get_array_of_samples())
# 提取最低有效位
extracted_bits = []
# 如果有预期长度,只提取指定数量的位
if expected_length:
extract_count = min(expected_length, len(samples))
else:
extract_count = len(samples)
# 逐位提取
for i in range(extract_count):
# 提取最低有效位
bit = samples[i] & 1
extracted_bits.append(bit)
# 检查是否到达结束标记
if len(extracted_bits) >= 8 and all(extracted_bits[-8:]):
break
# 将提取的比特转换为文本
extracted_text = bits_to_text(extracted_bits)
print(f"提取完成!")
print(f"提取的比特数: {len(extracted_bits)}")
print(f"提取的文本: '{extracted_text}'")
return extracted_text
# 频域音频隐写 - 基于傅里叶变换
def frequency_domain_steganography(input_file, output_file, secret_text, alpha=0.05):
"""基于频域的音频隐写实现
Args:
input_file: 输入音频文件路径
output_file: 输出音频文件路径
secret_text: 要隐藏的文本
alpha: 嵌入强度(0-1之间,默认0.05)
Returns:
dict: 嵌入结果信息
"""
import numpy as np
from scipy.fft import rfft, irfft
# 检查输入文件是否存在
if not os.path.exists(input_file):
print(f"错误:输入音频文件 '{input_file}' 不存在")
return None
# 加载音频文件
try:
audio = AudioSegment.from_file(input_file)
print(f"已加载音频文件: {input_file}")
except Exception as e:
print(f"加载音频文件失败: {e}")
return None
# 将音频转换为numpy数组
samples = np.array(audio.get_array_of_samples())
# 将文本转换为比特流
bits = text_to_bits(secret_text)
bit_count = len(bits)
# 执行傅里叶变换
fft_data = rfft(samples)
# 检查容量是否足够
# 我们只使用频谱的实部来嵌入数据
max_bits = len(fft_data) // 2
if bit_count > max_bits:
print(f"警告:隐藏数据过大!需要 {bit_count} 位,但频域只能容纳 {max_bits} 位")
# 截断数据
bits = bits[:max_bits]
bit_count = len(bits)
print(f"已截断数据至 {bit_count} 位")
print(f"要嵌入的二进制数据长度: {bit_count} 位")
# 复制FFT数据用于修改
modified_fft = fft_data.copy()
# 选择中频区域进行嵌入(避开低频和高频)
start_freq = len(fft_data) // 4
end_freq = start_freq + bit_count
# 嵌入数据
for i in range(bit_count):
freq_index = start_freq + i
if freq_index >= len(modified_fft):
break
# 获取当前频谱幅度
magnitude = np.abs(modified_fft[freq_index])
# 根据数据位调整频谱幅度
if bits[i] == 1:
# 增加幅度
modified_fft[freq_index] = modified_fft[freq_index] * (1 + alpha)
else:
# 减少幅度
modified_fft[freq_index] = modified_fft[freq_index] * (1 - alpha)
# 执行逆傅里叶变换
modified_samples = irfft(modified_fft).astype(samples.dtype)
# 将修改后的numpy数组转换回音频
if audio.sample_width == 1:
modified_samples = np.clip(modified_samples, -128, 127)
new_audio = AudioSegment(
modified_samples.astype(np.int8).tobytes(),
frame_rate=audio.frame_rate,
sample_width=audio.sample_width,
channels=audio.channels
)
elif audio.sample_width == 2:
modified_samples = np.clip(modified_samples, -32768, 32767)
new_audio = AudioSegment(
modified_samples.astype(np.int16).tobytes(),
frame_rate=audio.frame_rate,
sample_width=audio.sample_width,
channels=audio.channels
)
elif audio.sample_width == 3:
print("警告:24位音频处理可能不完美")
byte_data = b''
for sample in modified_samples:
if sample < 0:
sample += 16777216
byte_data += sample.to_bytes(3, byteorder='little', signed=False)
new_audio = AudioSegment(
byte_data,
frame_rate=audio.frame_rate,
sample_width=audio.sample_width,
channels=audio.channels
)
elif audio.sample_width == 4:
modified_samples = np.clip(modified_samples, -2147483648, 2147483647)
new_audio = AudioSegment(
modified_samples.astype(np.int32).tobytes(),
frame_rate=audio.frame_rate,
sample_width=audio.sample_width,
channels=audio.channels
)
else:
print(f"不支持的音频样本宽度: {audio.sample_width}")
return None
# 导出修改后的音频
try:
new_audio.export(output_file, format="wav")
print(f"音频导出完成!输出文件: {output_file}")
except Exception as e:
print(f"导出音频失败: {e}")
return None
# 计算嵌入率
duration_seconds = len(audio) / 1000
embedding_rate = bit_count / (duration_seconds * audio.frame_rate) * 100
result = {
'input_file': input_file,
'output_file': output_file,
'secret_text_length': len(secret_text),
'embedded_bits': bit_count,
'embedding_rate': embedding_rate,
'alpha': alpha,
'audio_duration': duration_seconds,
'success': os.path.exists(output_file)
}
print(f"\n嵌入完成!")
print(f"嵌入文本长度: {len(secret_text)} 字符")
print(f"实际嵌入位数: {bit_count}")
print(f"嵌入率: {embedding_rate:.4f}%")
print(f"嵌入强度: {alpha}")
print(f"成功: {result['success']}")
return result
def extract_from_frequency_audio(stego_file, original_file, expected_length=None):
"""从频域隐写的音频中提取隐藏数据
Args:
stego_file: 隐写后的音频文件路径
original_file: 原始音频文件路径(用于对比)
expected_length: 预期提取的比特数(可选)
Returns:
str: 提取的文本
"""
import numpy as np
from scipy.fft import rfft
# 检查文件是否存在
for file_path in [stego_file, original_file]:
if not os.path.exists(file_path):
print(f"错误:音频文件 '{file_path}' 不存在")
return ""
# 加载音频文件
try:
stego_audio = AudioSegment.from_file(stego_file)
orig_audio = AudioSegment.from_file(original_file)
print(f"已加载音频文件")
except Exception as e:
print(f"加载音频文件失败: {e}")
return ""
# 将音频转换为numpy数组
stego_samples = np.array(stego_audio.get_array_of_samples())
orig_samples = np.array(orig_audio.get_array_of_samples())
# 确保两个音频长度相同
min_len = min(len(stego_samples), len(orig_samples))
stego_samples = stego_samples[:min_len]
orig_samples = orig_samples[:min_len]
# 执行傅里叶变换
stego_fft = rfft(stego_samples)
orig_fft = rfft(orig_samples)
# 提取隐藏数据
extracted_bits = []
# 计算要提取的位数
if expected_length:
bit_count = min(expected_length, len(stego_fft) // 4)
else:
bit_count = len(stego_fft) // 4 # 默认提取四分之一的FFT长度
# 选择中频区域进行提取(与嵌入时相同)
start_freq = len(stego_fft) // 4
# 提取数据
for i in range(bit_count):
freq_index = start_freq + i
if freq_index >= len(stego_fft) or freq_index >= len(orig_fft):
break
# 比较两个频谱的幅度差异
stego_magnitude = np.abs(stego_fft[freq_index])
orig_magnitude = np.abs(orig_fft[freq_index])
if stego_magnitude > orig_magnitude:
extracted_bits.append(1)
else:
extracted_bits.append(0)
# 检查是否到达结束标记
if len(extracted_bits) >= 8 and all(extracted_bits[-8:]):
break
# 将提取的比特转换为文本
extracted_text = bits_to_text(extracted_bits)
print(f"提取完成!")
print(f"提取的比特数: {len(extracted_bits)}")
print(f"提取的文本: '{extracted_text}'")
return extracted_text
# 使用示例
# lsb_audio_steganography('input.wav', 'output_stego.wav', '这是一段隐藏在音频中的秘密消息!')
# extract_from_lsb_audio('output_stego.wav')
# frequency_domain_steganography('input.wav', 'output_freq_stego.wav', '这是一段隐藏在音频频域中的秘密消息!')
# extract_from_frequency_audio('output_freq_stego.wav', 'input.wav')
### 4.2 多模态隐写技术
多模态隐写技术结合了不同媒体类型(如图像、视频、音频、文本等)进行协同数据隐藏,提供了更高的安全性和更大的隐藏容量。下面实现一个基于视频和音频的多模态隐写系统。
#### 4.2.1 多模态隐写原理与技术
多模态隐写的关键优势:
1. **更大的隐藏容量**:同时利用多种媒体的冗余空间
2. **更高的安全性**:攻击者需要同时破解多种媒体的隐写技术
3. **更好的鲁棒性**:即使一种媒体被破坏,信息仍可能通过其他媒体恢复
4. **更隐蔽的隐写痕迹**:分散在多种媒体中,降低检测概率
下面是一个完整的多模态隐写系统实现,结合视频帧隐写和音频隐写:
```python
import os
import numpy as np
import cv2
from pydub import AudioSegment
import tempfile
import subprocess
def split_data_for_multimodal(secret_data, modalities=2):
"""将秘密数据分割成多个部分,用于多模态隐写
Args:
secret_data: 要分割的秘密数据(字符串)
modalities: 模态数量
Returns:
list: 分割后的数据列表
"""
# 将字符串转换为字节数组
data_bytes = secret_data.encode('utf-8')
data_length = len(data_bytes)
# 计算每个模态应分配的数据量
base_length = data_length // modalities
remainder = data_length % modalities
# 分割数据
split_data = []
current_pos = 0
for i in range(modalities):
# 为前remainder个模态多分配1个字节
chunk_size = base_length + (1 if i < remainder else 0)
chunk = data_bytes[current_pos:current_pos + chunk_size]
# 将字节转换回字符串
split_data.append(chunk.decode('utf-8', errors='ignore'))
current_pos += chunk_size
return split_data
def multimodal_steganography(video_path, audio_path, output_path, secret_data,
video_method='lsb', audio_method='lsb',
video_params=None, audio_params=None):
"""多模态隐写系统实现
Args:
video_path: 输入视频文件路径
audio_path: 输入音频文件路径(可以是None,表示使用视频中的音频)
output_path: 输出文件路径
secret_data: 要隐藏的秘密数据
video_method: 视频隐写方法 ('lsb', 'dct', 'dwt')
audio_method: 音频隐写方法 ('lsb', 'frequency')
video_params: 视频隐写参数
audio_params: 音频隐写参数
Returns:
dict: 嵌入结果信息
"""
if video_params is None:
video_params = {}
if audio_params is None:
audio_params = {}
# 验证输入文件
if not os.path.exists(video_path):
print(f"错误:视频文件 '{video_path}' 不存在")
return None
# 创建临时目录
with tempfile.TemporaryDirectory() as temp_dir:
# 分离视频和音频
video_temp = os.path.join(temp_dir, "temp_video.mp4")
audio_temp = os.path.join(temp_dir, "temp_audio.wav")
stego_video = os.path.join(temp_dir, "stego_video.mp4")
stego_audio = os.path.join(temp_dir, "stego_audio.wav")
# 如果未提供单独的音频文件,则从视频中提取音频
if audio_path is None:
try:
# 提取视频中的音频
cmd = [
"ffmpeg", "-i", video_path, "-vn", "-acodec", "pcm_s16le",
"-ar", "44100", "-ac", "2", audio_temp, "-y"
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(f"已从视频中提取音频到 {audio_temp}")
# 提取纯视频(不含音频)
cmd = [
"ffmpeg", "-i", video_path, "-an", video_temp, "-y"
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(f"已提取纯视频到 {video_temp}")
audio_path = audio_temp
except Exception as e:
print(f"分离视频和音频失败: {e}")
return None
else:
# 复制视频到临时文件
cmd = ["ffmpeg", "-i", video_path, "-c:v", "copy", video_temp, "-y"]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 分割数据用于不同模态
split_data = split_data_for_multimodal(secret_data, modalities=2)
video_data, audio_data = split_data
# 在视频中嵌入数据
print(f"\n开始在视频中嵌入数据,方法: {video_method}")
video_result = None
if video_method == 'lsb':
# 使用之前实现的视频LSB隐写
video_result = embed_video_lsb(video_temp, stego_video, video_data, **video_params)
else:
print(f"不支持的视频隐写方法: {video_method}")
return None
if not video_result or not os.path.exists(stego_video):
print("视频隐写失败")
return None
# 在音频中嵌入数据
print(f"\n开始在音频中嵌入数据,方法: {audio_method}")
audio_result = None
if audio_method == 'lsb':
# 使用之前实现的音频LSB隐写
audio_result = lsb_audio_steganography(audio_path, stego_audio, audio_data, **audio_params)
elif audio_method == 'frequency':
# 使用之前实现的音频频域隐写
audio_result = frequency_domain_steganography(audio_path, stego_audio, audio_data, **audio_params)
else:
print(f"不支持的音频隐写方法: {audio_method}")
return None
if not audio_result or not os.path.exists(stego_audio):
print("音频隐写失败")
return None
# 合并隐写后的视频和音频
print(f"\n合并隐写后的视频和音频到 {output_path}")
try:
cmd = [
"ffmpeg", "-i", stego_video, "-i", stego_audio,
"-c:v", "copy", "-c:a", "aac", "-strict", "experimental",
output_path, "-y"
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(f"多模态隐写完成!输出文件: {output_path}")
except Exception as e:
print(f"合并失败: {e}")
return None
# 汇总结果
total_embedded = len(video_data.encode('utf-8')) + len(audio_data.encode('utf-8'))
result = {
'video_method': video_method,
'audio_method': audio_method,
'total_embedded_bytes': total_embedded,
'video_embedded_bytes': len(video_data.encode('utf-8')),
'audio_embedded_bytes': len(audio_data.encode('utf-8')),
'output_file': output_path,
'success': os.path.exists(output_path)
}
print(f"\n多模态隐写统计:")
print(f"视频隐写方法: {video_method}")
print(f"音频隐写方法: {audio_method}")
print(f"视频中嵌入: {result['video_embedded_bytes']} 字节")
print(f"音频中嵌入: {result['audio_embedded_bytes']} 字节")
print(f"总计嵌入: {result['total_embedded_bytes']} 字节")
print(f"成功: {result['success']}")
return result
def multimodal_steganography_extract(stego_path, output_video=None, output_audio=None,
video_method='lsb', audio_method='lsb',
video_params=None, audio_params=None):
"""从多模态隐写文件中提取隐藏数据
Args:
stego_path: 隐写后的文件路径
output_video: 临时输出的纯视频路径
output_audio: 临时输出的纯音频路径
video_method: 视频隐写方法 ('lsb', 'dct', 'dwt')
audio_method: 音频隐写方法 ('lsb', 'frequency')
video_params: 视频隐写参数
audio_params: 音频隐写参数
Returns:
str: 提取的秘密数据
"""
if video_params is None:
video_params = {}
if audio_params is None:
audio_params = {}
# 验证输入文件
if not os.path.exists(stego_path):
print(f"错误:隐写文件 '{stego_path}' 不存在")
return ""
# 创建临时目录
with tempfile.TemporaryDirectory() as temp_dir:
# 如果未提供临时输出路径,则使用临时目录中的路径
if output_video is None:
output_video = os.path.join(temp_dir, "extracted_video.mp4")
if output_audio is None:
output_audio = os.path.join(temp_dir, "extracted_audio.wav")
# 从隐写文件中提取视频和音频
try:
# 提取纯视频
cmd = ["ffmpeg", "-i", stego_path, "-an", output_video, "-y"]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 提取音频
cmd = ["ffmpeg", "-i", stego_path, "-vn", "-acodec", "pcm_s16le",
"-ar", "44100", "-ac", "2", output_audio, "-y"]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(f"已从隐写文件中提取视频和音频")
except Exception as e:
print(f"提取视频和音频失败: {e}")
return ""
# 从视频中提取数据
print(f"\n从视频中提取数据,方法: {video_method}")
video_data = ""
if video_method == 'lsb':
# 使用之前实现的视频LSB隐写提取
video_data = extract_video_lsb(output_video, **video_params)
else:
print(f"不支持的视频隐写方法: {video_method}")
return ""
# 从音频中提取数据
print(f"\n从音频中提取数据,方法: {audio_method}")
audio_data = ""
if audio_method == 'lsb':
# 使用之前实现的音频LSB隐写提取
audio_data = extract_from_lsb_audio(output_audio, **audio_params)
elif audio_method == 'frequency':
# 频域提取需要原始音频作为参考,这里使用提取的音频(可能不完全准确)
# 在实际应用中,应该提供原始音频
print("警告:频域提取需要原始音频作为参考,结果可能不准确")
# 使用一个空的音频文件作为参考(效果会很差)
dummy_audio = os.path.join(temp_dir, "dummy.wav")
silent_audio = AudioSegment.silent(duration=1000) # 1秒静音
silent_audio.export(dummy_audio, format="wav")
audio_data = extract_from_frequency_audio(output_audio, dummy_audio, **audio_params)
else:
print(f"不支持的音频隐写方法: {audio_method}")
return ""
# 合并提取的数据
extracted_data = video_data + audio_data
print(f"\n多模态提取完成:")
print(f"从视频中提取: {len(video_data)} 字符")
print(f"从音频中提取: {len(audio_data)} 字符")
print(f"总计提取: {len(extracted_data)} 字符")
print(f"提取的文本: '{extracted_data}'")
return extracted_data
def adaptive_multimodal_steganography(video_path, output_path, secret_data, complexity_analysis=True):
"""自适应多模态隐写系统,根据媒体复杂度自动分配隐藏数据
Args:
video_path: 输入视频文件路径
output_path: 输出文件路径
secret_data: 要隐藏的秘密数据
complexity_analysis: 是否进行复杂度分析(影响嵌入位置选择)
Returns:
dict: 嵌入结果信息
"""
# 创建临时目录
with tempfile.TemporaryDirectory() as temp_dir:
video_temp = os.path.join(temp_dir, "temp_video.mp4")
audio_temp = os.path.join(temp_dir, "temp_audio.wav")
stego_video = os.path.join(temp_dir, "stego_video.mp4")
stego_audio = os.path.join(temp_dir, "stego_audio.wav")
try:
# 从视频中提取音频
cmd = [
"ffmpeg", "-i", video_path, "-vn", "-acodec", "pcm_s16le",
"-ar", "44100", "-ac", "2", audio_temp, "-y"
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 提取纯视频
cmd = ["ffmpeg", "-i", video_path, "-an", video_temp, "-y"]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 计算视频和音频的复杂度/容量
video_complexity = analyze_video_complexity(video_temp)
audio_complexity = analyze_audio_complexity(audio_temp)
print(f"\n媒体复杂度分析:")
print(f"视频复杂度得分: {video_complexity}")
print(f"音频复杂度得分: {audio_complexity}")
# 根据复杂度分配数据
total_complexity = video_complexity + audio_complexity
if total_complexity > 0:
video_ratio = video_complexity / total_complexity
audio_ratio = audio_complexity / total_complexity
else:
video_ratio = 0.5
audio_ratio = 0.5
# 计算分配的数据量
data_length = len(secret_data)
video_length = int(data_length * video_ratio)
# 分割数据
video_data = secret_data[:video_length]
audio_data = secret_data[video_length:]
print(f"\n数据分配:")
print(f"视频数据比例: {video_ratio:.2f} ({len(video_data)} 字符)")
print(f"音频数据比例: {audio_ratio:.2f} ({len(audio_data)} 字符)")
# 根据复杂度选择隐写方法和参数
if video_complexity > 0.7: # 高复杂度视频
# 在复杂区域使用更强的隐写
video_result = embed_adaptive_frame_video(video_temp, stego_video, video_data)
else: # 低复杂度视频
# 使用普通LSB
video_result = embed_video_lsb(video_temp, stego_video, video_data)
if audio_complexity > 0.7: # 高复杂度音频
# 使用频域隐写
audio_result = frequency_domain_steganography(audio_temp, stego_audio, audio_data, alpha=0.08)
else: # 低复杂度音频
# 使用LSB隐写
audio_result = lsb_audio_steganography(audio_temp, stego_audio, audio_data)
# 合并隐写后的视频和音频
cmd = [
"ffmpeg", "-i", stego_video, "-i", stego_audio,
"-c:v", "copy", "-c:a", "aac", "-strict", "experimental",
output_path, "-y"
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 计算质量指标
video_psnr = compare_videos(video_temp, stego_video)
result = {
'video_complexity': video_complexity,
'audio_complexity': audio_complexity,
'video_ratio': video_ratio,
'audio_ratio': audio_ratio,
'video_data_length': len(video_data),
'audio_data_length': len(audio_data),
'video_psnr': video_psnr,
'output_file': output_path,
'success': os.path.exists(output_path)
}
print(f"\n自适应多模态隐写完成:")
print(f"视频PSNR: {video_psnr:.2f} dB")
print(f"成功: {result['success']}")
return result
except Exception as e:
print(f"自适应多模态隐写失败: {e}")
return None
def analyze_video_complexity(video_path):
"""分析视频复杂度,返回0-1之间的值
复杂度基于:边缘密度、运动强度、纹理丰富度
"""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {video_path}")
return 0.5 # 默认中等复杂度
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
step = max(1, frame_count // 10) # 采样10帧进行分析
edge_density_scores = []
texture_scores = []
motion_scores = []
ret, prev_frame = cap.read()
if not ret:
cap.release()
return 0.5
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
for i in range(1, frame_count, step):
cap.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = cap.read()
if not ret:
continue
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 计算边缘密度
edges = cv2.Canny(gray, 50, 150)
edge_density = np.sum(edges > 0) / (gray.shape[0] * gray.shape[1])
edge_density_scores.append(edge_density)
# 计算纹理复杂度(使用拉普拉斯算子)
laplacian = cv2.Laplacian(gray, cv2.CV_64F)
texture = np.var(laplacian)
# 归一化到0-1范围
texture_normalized = min(1.0, texture / 10000.0)
texture_scores.append(texture_normalized)
# 计算运动强度
if i > 1:
flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
magnitude, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
motion_intensity = np.mean(magnitude)
# 归一化到0-1范围
motion_normalized = min(1.0, motion_intensity / 5.0)
motion_scores.append(motion_normalized)
prev_gray = gray.copy()
cap.release()
# 计算平均复杂度
edge_density_avg = np.mean(edge_density_scores) if edge_density_scores else 0
texture_avg = np.mean(texture_scores) if texture_scores else 0
motion_avg = np.mean(motion_scores) if motion_scores else 0
# 综合评分(权重可以调整)
complexity = 0.4 * edge_density_avg + 0.3 * texture_avg + 0.3 * motion_avg
# 确保在0-1范围内
return max(0.0, min(1.0, complexity))
def analyze_audio_complexity(audio_path):
"""分析音频复杂度,返回0-1之间的值
复杂度基于:频谱熵、振幅变化、频率分布
"""
try:
from scipy.fft import rfft
from scipy.stats import entropy
audio = AudioSegment.from_file(audio_path)
samples = np.array(audio.get_array_of_samples())
# 归一化采样
samples = samples / np.max(np.abs(samples))
# 分割音频为多个片段进行分析
segment_length = audio.frame_rate # 1秒片段
segments = []
for i in range(0, len(samples) - segment_length, segment_length):
segments.append(samples[i:i+segment_length])
if not segments:
return 0.5
entropy_scores = []
amplitude_scores = []
frequency_scores = []
for segment in segments:
# 频谱熵
fft_values = np.abs(rfft(segment))
# 归一化为概率分布
if np.sum(fft_values) > 0:
fft_normalized = fft_values / np.sum(fft_values)
# 避免log(0)
fft_normalized = fft_normalized[fft_normalized > 0]
spectral_entropy = entropy(fft_normalized)
# 归一化熵值
max_entropy = np.log2(len(fft_normalized))
if max_entropy > 0:
entropy_normalized = spectral_entropy / max_entropy
entropy_scores.append(entropy_normalized)
# 振幅变化
amplitude_changes = np.abs(np.diff(segment))
amplitude_variation = np.std(amplitude_changes)
amplitude_scores.append(amplitude_variation)
# 频率分布(使用频谱中心)
freq_indices = np.arange(len(fft_values))
if np.sum(fft_values) > 0:
spectral_centroid = np.sum(freq_indices * fft_values) / np.sum(fft_values)
# 归一化到0-1范围
freq_normalized = spectral_centroid / len(fft_values)
frequency_scores.append(freq_normalized)
# 计算平均复杂度
entropy_avg = np.mean(entropy_scores) if entropy_scores else 0
amplitude_avg = np.mean(amplitude_scores) if amplitude_scores else 0
# 振幅变化归一化
amplitude_normalized = min(1.0, amplitude_avg * 10.0)
frequency_avg = np.mean(frequency_scores) if frequency_scores else 0
# 综合评分
complexity = 0.4 * entropy_avg + 0.4 * amplitude_normalized + 0.2 * frequency_avg
# 确保在0-1范围内
return max(0.0, min(1.0, complexity))
except Exception as e:
print(f"音频复杂度分析失败: {e}")
return 0.5
def compare_videos(original_video, stego_video):
"""比较两个视频的PSNR值,评估隐写质量"""
try:
import cv2
cap1 = cv2.VideoCapture(original_video)
cap2 = cv2.VideoCapture(stego_video)
if not cap1.isOpened() or not cap2.isOpened():
print("无法打开视频文件")
return 0.0
psnr_values = []
while True:
ret1, frame1 = cap1.read()
ret2, frame2 = cap2.read()
if not ret1 or not ret2:
break
# 确保两个帧大小相同
if frame1.shape != frame2.shape:
frame2 = cv2.resize(frame2, (frame1.shape[1], frame1.shape[0]))
# 计算PSNR
psnr = cv2.PSNR(frame1, frame2)
psnr_values.append(psnr)
cap1.release()
cap2.release()
return np.mean(psnr_values) if psnr_values else 0.0
except Exception as e:
print(f"视频比较失败: {e}")
return 0.0
def embed_adaptive_frame_video(input_video, output_video, secret_text):
"""基于帧复杂度的自适应视频隐写(作为之前帧差自适应的补充)"""
try:
# 读取视频
cap = cv2.VideoCapture(input_video)
if not cap.isOpened():
print(f"无法打开视频文件: {input_video}")
return None
# 获取视频属性
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# 创建输出视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video, fourcc, fps, (width, height))
# 准备秘密数据
bits = text_to_bits(secret_text)
bit_index = 0
total_bits = len(bits)
# 分析所有帧的复杂度
complexities = []
frames = []
# 第一遍:保存所有帧并计算复杂度
while True:
ret, frame = cap.read()
if not ret:
break
frames.append(frame.copy())
# 计算帧复杂度
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
edge_density = np.sum(edges > 0) / (gray.shape[0] * gray.shape[1])
laplacian = cv2.Laplacian(gray, cv2.CV_64F)
texture = np.var(laplacian)
texture_normalized = min(1.0, texture / 10000.0)
# 综合复杂度
complexity = 0.5 * edge_density + 0.5 * texture_normalized
complexities.append(complexity)
cap.release()
# 根据复杂度选择嵌入帧(优先选择高复杂度帧)
sorted_frames = sorted(enumerate(complexities), key=lambda x: x[1], reverse=True)
# 计算每帧可嵌入的位数(基于复杂度)
embedding_capacity = {}
total_complexity = sum(complexities)
if total_complexity > 0:
for idx, complexity in enumerate(complexities):
# 复杂度越高,分配的嵌入容量越大
capacity = int(total_bits * (complexity / total_complexity))
embedding_capacity[idx] = max(1, capacity) # 至少1位
else:
# 如果所有帧复杂度都为0,平均分配
avg_capacity = max(1, total_bits // len(frames))
for idx in range(len(frames)):
embedding_capacity[idx] = avg_capacity
# 第二遍:执行嵌入
for idx, frame in enumerate(frames):
# 只在有容量分配的帧中嵌入
if idx in embedding_capacity and embedding_capacity[idx] > 0 and bit_index < total_bits:
# 嵌入位数
embed_count = min(embedding_capacity[idx], total_bits - bit_index)
# 将帧转换为可修改的格式
frame_array = np.array(frame, dtype=np.uint8)
# 计算要修改的像素数量
pixels_needed = embed_count
height, width, channels = frame_array.shape
total_pixels = height * width * channels
# 确保有足够的像素
if pixels_needed > total_pixels:
pixels_needed = total_pixels
# 在复杂度高的区域嵌入
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
# 找到边缘像素(复杂度高的区域)
edge_pixels = np.argwhere(edges > 0)
# 如果边缘像素不足,使用随机像素
if len(edge_pixels) < pixels_needed:
# 先生成所有像素坐标
all_pixels = np.argwhere(np.ones_like(gray))
# 随机打乱
np.random.shuffle(all_pixels)
# 选择足够的像素
selected_pixels = all_pixels[:pixels_needed]
else:
# 随机选择边缘像素
np.random.shuffle(edge_pixels)
selected_pixels = edge_pixels[:pixels_needed]
# 嵌入数据
for i in range(min(pixels_needed, embed_count)):
if bit_index >= total_bits:
break
y, x = selected_pixels[i]
# 选择一个颜色通道
channel = i % channels
# 清除最低有效位并嵌入数据
frame_array[y, x, channel] = (frame_array[y, x, channel] & ~1) | bits[bit_index]
bit_index += 1
# 将修改后的数组转换回帧
frame = frame_array
# 写入输出视频
out.write(frame)
out.release()
result = {
'success': os.path.exists(output_video),
'embedded_bits': bit_index,
'total_bits': total_bits,
'frames_processed': len(frames)
}
print(f"自适应帧隐写完成!")
print(f"嵌入位数: {bit_index}/{total_bits}")
print(f"处理帧数: {len(frames)}")
return result
except Exception as e:
print(f"自适应帧隐写失败: {e}")
return None
# 多模态隐写系统测试函数
def test_multimodal_steganography(test_video, test_audio=None):
"""测试多模态隐写系统功能
Args:
test_video: 测试视频路径
test_audio: 测试音频路径(可选)
"""
print("=== 多模态隐写系统测试 ===")
# 检查文件存在
if not os.path.exists(test_video):
print(f"错误:测试视频 '{test_video}' 不存在")
return
if test_audio and not os.path.exists(test_audio):
print(f"错误:测试音频 '{test_audio}' 不存在")
return
# 创建测试数据
test_data = "这是一段用于测试多模态隐写系统的秘密消息。该系统可以同时在视频和音频中隐藏数据,提高安全性和容量。"
# 测试标准多模态隐写
print("\n1. 测试标准多模态隐写 (LSB + LSB)")
output_file = "multimodal_stego.mp4"
result = multimodal_steganography(
test_video, test_audio, output_file, test_data,
video_method='lsb', audio_method='lsb'
)
if result and result['success']:
print("\n2. 测试提取隐藏数据")
extracted_data = multimodal_steganography_extract(
output_file,
video_method='lsb', audio_method='lsb'
)
# 验证结果
if extracted_data:
print(f"\n3. 验证提取结果")
similarity = calculate_string_similarity(test_data, extracted_data)
print(f"原始数据: '{test_data[:50]}...'")
print(f"提取数据: '{extracted_data[:50]}...'")
print(f"相似度: {similarity:.2f}%")
if similarity > 80:
print("✓ 提取成功")
else:
print("✗ 提取失败")
# 测试自适应多模态隐写
print("\n4. 测试自适应多模态隐写")
adaptive_output = "adaptive_multimodal_stego.mp4"
adaptive_result = adaptive_multimodal_steganography(
test_video, adaptive_output, test_data,
complexity_analysis=True
)
if adaptive_result and adaptive_result['success']:
print("\n自适应多模态隐写测试完成")
print(f"视频PSNR: {adaptive_result['video_psnr']:.2f} dB")
# 辅助函数:计算字符串相似度
def calculate_string_similarity(s1, s2):
"""使用最长公共子序列计算字符串相似度"""
if not s1 or not s2:
return 0.0
m, n = len(s1), len(s2)
# 创建DP表
dp = [[0] * (n + 1) for _ in range(m + 1)]
# 填充DP表
for i in range(1, m + 1):
for j in range(1, n + 1):
if s1[i-1] == s2[j-1]:
dp[i][j] = dp[i-1][j-1] + 1
else:
dp[i][j] = max(dp[i-1][j], dp[i][j-1])
# 计算最长公共子序列长度
lcs_length = dp[m][n]
# 计算相似度(基于较长字符串的长度)
max_length = max(m, n)
similarity = (lcs_length / max_length) * 100
return similarity
# 使用示例
# test_multimodal_steganography('input_video.mp4')
## 第五章 高级隐写技术与隐写分析防护
随着隐写技术的不断发展,隐写分析技术也在进步。本章我们将探讨一些高级隐写技术和如何提高隐写的隐蔽性,防止被隐写分析检测到。
### 5.1 抗检测隐写技术
#### 5.1.1 基于统计特征的抗检测隐写
传统LSB隐写会破坏载体的统计特性,使得隐写内容容易被检测。我们可以通过保持统计特征来提高隐写的隐蔽性。
```python
import numpy as np
import cv2
import os
def statistical_aware_steganography(video_path, output_path, secret_text, embedding_rate=0.2):
"""基于统计特征保持的抗检测视频隐写
Args:
video_path: 输入视频路径
output_path: 输出视频路径
secret_text: 秘密文本
embedding_rate: 嵌入率(0-1之间)
Returns:
dict: 嵌入结果信息
"""
# 读取视频
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {video_path}")
return None
# 获取视频属性
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# 创建输出视频
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
# 准备秘密数据
bits = text_to_bits(secret_text)
total_bits = len(bits)
# 计算每帧平均需要嵌入的位数
bits_per_frame = max(1, int(total_bits / frame_count * embedding_rate * 3)) # *3是因为有RGB三个通道
bit_index = 0
embedded_bits = 0
while True:
ret, frame = cap.read()
if not ret:
break
# 创建帧的副本用于修改
stego_frame = frame.copy()
# 只在有数据需要嵌入时进行处理
if bit_index < total_bits:
# 统计帧的像素值分布
b_hist, _ = np.histogram(stego_frame[:,:,0].flatten(), bins=256, range=(0,256))
g_hist, _ = np.histogram(stego_frame[:,:,1].flatten(), bins=256, range=(0,256))
r_hist, _ = np.histogram(stego_frame[:,:,2].flatten(), bins=256, range=(0,256))
# 找出适合嵌入的像素位置(优先选择高频区域)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
# 找出边缘像素坐标
edge_pixels = np.argwhere(edges > 0)
# 计算本帧要嵌入的位数
frame_bits_needed = min(bits_per_frame, total_bits - bit_index)
# 如果边缘像素不足,使用随机位置
if len(edge_pixels) < frame_bits_needed:
# 生成随机位置
h, w = frame.shape[:2]
y_indices = np.random.randint(0, h, size=frame_bits_needed)
x_indices = np.random.randint(0, w, size=frame_bits_needed)
channel_indices = np.random.randint(0, 3, size=frame_bits_needed)
else:
# 随机选择边缘像素
np.random.shuffle(edge_pixels)
selected_pixels = edge_pixels[:frame_bits_needed]
y_indices = selected_pixels[:, 0]
x_indices = selected_pixels[:, 1]
channel_indices = np.random.randint(0, 3, size=frame_bits_needed)
# 嵌入数据时保持直方图统计特征
for i in range(frame_bits_needed):
if bit_index >= total_bits:
break
y, x = y_indices[i], x_indices[i]
channel = channel_indices[i]
original_value = stego_frame[y, x, channel]
target_bit = bits[bit_index]
# 检查LSB是否已经与目标位相同
if (original_value & 1) == target_bit:
# 已经匹配,不需要修改
pass
else:
# 需要修改,但选择保持统计特征的方式
# 考虑修改前后的直方图变化
new_value = original_value ^ 1 # 翻转最低位
# 检查修改是否会破坏统计特征
# 计算原像素值和新像素值的出现频率
if channel == 0:
orig_freq = b_hist[original_value]
new_freq = b_hist[new_value]
elif channel == 1:
orig_freq = g_hist[original_value]
new_freq = g_hist[new_value]
else:
orig_freq = r_hist[original_value]
new_freq = r_hist[new_value]
# 如果修改后的频率不会导致分布明显失衡,则进行修改
# 这里使用简单的阈值,实际应用中可能需要更复杂的策略
if new_freq < orig_freq * 1.5: # 避免修改导致频率突然增加太多
stego_frame[y, x, channel] = new_value
embedded_bits += 1
bit_index += 1
# 写入隐写帧
out.write(stego_frame)
# 释放资源
cap.release()
out.release()
result = {
'success': os.path.exists(output_path),
'total_bits': total_bits,
'embedded_bits': embedded_bits,
'embedding_efficiency': embedded_bits / total_bits if total_bits > 0 else 0
}
print(f"统计感知隐写完成!")
print(f"总位数: {total_bits}")
print(f"嵌入位数: {embedded_bits}")
print(f"嵌入效率: {result['embedding_efficiency']:.2f}")
return result
def extract_statistical_steganography(video_path, total_bits):
"""提取基于统计特征的隐写数据
Args:
video_path: 隐写视频路径
total_bits: 预期提取的总位数
Returns:
str: 提取的秘密文本
"""
# 读取视频
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {video_path}")
return ""
# 获取视频属性
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# 计算每帧平均需要提取的位数
bits_per_frame = max(1, int(total_bits / frame_count * 3)) # 与嵌入时保持一致
extracted_bits = []
bit_index = 0
while True:
ret, frame = cap.read()
if not ret or bit_index >= total_bits:
break
# 找出可能的嵌入位置(与嵌入时相同的策略)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
# 找出边缘像素坐标
edge_pixels = np.argwhere(edges > 0)
# 计算本帧要提取的位数
frame_bits_needed = min(bits_per_frame, total_bits - bit_index)
# 如果边缘像素不足,使用随机位置(这会导致提取错误,实际应用中需要更精确的位置记录)
if len(edge_pixels) < frame_bits_needed:
# 生成随机位置(与嵌入时不同,会导致提取失败)
# 实际应用中应该有更精确的位置记录机制
print("警告:边缘像素不足,提取可能不准确")
h, w = frame.shape[:2]
y_indices = np.random.randint(0, h, size=frame_bits_needed)
x_indices = np.random.randint(0, w, size=frame_bits_needed)
channel_indices = np.random.randint(0, 3, size=frame_bits_needed)
else:
# 随机选择边缘像素
np.random.shuffle(edge_pixels)
selected_pixels = edge_pixels[:frame_bits_needed]
y_indices = selected_pixels[:, 0]
x_indices = selected_pixels[:, 1]
channel_indices = np.random.randint(0, 3, size=frame_bits_needed)
# 提取数据
for i in range(frame_bits_needed):
if bit_index >= total_bits:
break
y, x = y_indices[i], x_indices[i]
channel = channel_indices[i]
# 提取最低有效位
extracted_bit = frame[y, x, channel] & 1
extracted_bits.append(extracted_bit)
bit_index += 1
# 释放资源
cap.release()
# 将位序列转换回文本
if extracted_bits:
secret_text = bits_to_text(extracted_bits)
return secret_text
else:
return ""内容感知隐写根据载体媒体的内容特性,自适应地调整嵌入策略,只在感知上不重要的区域嵌入数据。
import numpy as np
import cv2
import os
def content_aware_steganography(video_path, output_path, secret_text, quality_factor=0.8):
"""基于内容感知的自适应视频隐写
Args:
video_path: 输入视频路径
output_path: 输出视频路径
secret_text: 秘密文本
quality_factor: 质量因子(0-1,值越高嵌入量越少但隐蔽性越好)
Returns:
dict: 嵌入结果信息
"""
# 读取视频
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {video_path}")
return None
# 获取视频属性
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# 创建输出视频
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
# 准备秘密数据
bits = text_to_bits(secret_text)
total_bits = len(bits)
bit_index = 0
embedded_bits = 0
# 分块处理视频
block_size = 8 # 8x8块
while True:
ret, frame = cap.read()
if not ret:
break
# 创建帧的副本用于修改
stego_frame = frame.copy()
# 只在有数据需要嵌入时进行处理
if bit_index < total_bits:
# 计算每帧的视觉显著性图
saliency_map = calculate_saliency(frame)
# 归一化显著性图
saliency_map = cv2.normalize(saliency_map, None, 0, 1, cv2.NORM_MINMAX)
# 获取帧的高度和宽度
h, w = frame.shape[:2]
# 遍历每个块
for i in range(0, h, block_size):
for j in range(0, w, block_size):
if bit_index >= total_bits:
break
# 获取块的边界
block_h_end = min(i + block_size, h)
block_w_end = min(j + block_size, w)
# 计算块的平均显著性
block_saliency = np.mean(saliency_map[i:block_h_end, j:block_w_end])
# 显著性越低,嵌入容量越大
# 质量因子越高,只在显著性越低的区域嵌入
if block_saliency < (1.0 - quality_factor):
# 计算该块可嵌入的位数(基于显著性的倒数)
capacity = max(1, int(block_size * block_size * (1.0 - block_saliency - (1.0 - quality_factor)) * 0.1))
# 随机选择嵌入位置
block_pixels = []
for y in range(i, block_h_end):
for x in range(j, block_w_end):
block_pixels.append((y, x))
# 随机打乱像素顺序
np.random.shuffle(block_pixels)
# 选择要嵌入的像素
pixels_to_embed = min(capacity, len(block_pixels), total_bits - bit_index)
# 嵌入数据
for k in range(pixels_to_embed):
if bit_index >= total_bits:
break
y, x = block_pixels[k]
# 选择颜色通道
channel = k % 3
# 嵌入数据
stego_frame[y, x, channel] = (stego_frame[y, x, channel] & ~1) | bits[bit_index]
embedded_bits += 1
bit_index += 1
# 写入隐写帧
out.write(stego_frame)
# 释放资源
cap.release()
out.release()
result = {
'success': os.path.exists(output_path),
'total_bits': total_bits,
'embedded_bits': embedded_bits,
'embedding_ratio': embedded_bits / total_bits if total_bits > 0 else 0
}
print(f"内容感知隐写完成!")
print(f"总位数: {total_bits}")
print(f"嵌入位数: {embedded_bits}")
print(f"嵌入比例: {result['embedding_ratio']:.2f}")
return result
def calculate_saliency(frame):
"""计算图像的视觉显著性图
Args:
frame: 输入图像/帧
Returns:
numpy.ndarray: 显著性图
"""
# 转换为LAB色彩空间
lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
# 分离通道
l, a, b = cv2.split(lab)
# 计算各通道的高斯模糊
l_blur = cv2.GaussianBlur(l, (15, 15), 0)
a_blur = cv2.GaussianBlur(a, (15, 15), 0)
b_blur = cv2.GaussianBlur(b, (15, 15), 0)
# 计算中心-周围对比度
l_saliency = cv2.absdiff(l, l_blur)
a_saliency = cv2.absdiff(a, a_blur)
b_saliency = cv2.absdiff(b, b_blur)
# 合并显著性图
saliency = l_saliency * 0.5 + a_saliency * 0.25 + b_saliency * 0.25
# 进一步处理以增强显著区域
saliency = cv2.GaussianBlur(saliency, (7, 7), 0)
return saliency隐写分析是检测媒体中是否隐藏了秘密信息的技术。下面实现几种常用的视频隐写检测方法。
import numpy as np
import cv2
import os
from scipy import stats
def video_steganalysis_detection(video_path, feature_extraction_method='histogram', threshold=0.5):
"""视频隐写分析检测
Args:
video_path: 待检测视频路径
feature_extraction_method: 特征提取方法 ('histogram', 'statistics', 'wavelet')
threshold: 检测阈值 (0-1)
Returns:
dict: 检测结果
"""
# 读取视频
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {video_path}")
return None
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frames_to_analyze = min(50, frame_count) # 最多分析50帧
step = max(1, frame_count // frames_to_analyze)
features = []
frame_index = 0
while True:
ret, frame = cap.read()
if not ret or frame_index >= frames_to_analyze:
break
# 根据选择的方法提取特征
if feature_extraction_method == 'histogram':
frame_feature = extract_histogram_features(frame)
elif feature_extraction_method == 'statistics':
frame_feature = extract_statistical_features(frame)
elif feature_extraction_method == 'wavelet':
frame_feature = extract_wavelet_features(frame)
else:
print(f"不支持的特征提取方法: {feature_extraction_method}")
cap.release()
return None
features.append(frame_feature)
# 跳到下一帧
frame_index += 1
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_index * step)
cap.release()
# 计算平均特征
avg_features = np.mean(features, axis=0)
# 执行检测
# 这里使用简单的阈值检测,实际应用中应该使用训练好的分类器
if feature_extraction_method == 'histogram':
# 检查LSB直方图异常
lsb_analysis_score = analyze_lsb_histogram(avg_features)
is_stego = lsb_analysis_score > threshold
elif feature_extraction_method == 'statistics':
# 检查统计异常
stat_analysis_score = analyze_statistical_anomalies(avg_features)
is_stego = stat_analysis_score > threshold
else: # wavelet
# 检查小波特征异常
wavelet_analysis_score = analyze_wavelet_anomalies(avg_features)
is_stego = wavelet_analysis_score > threshold
result = {
'is_stego': is_stego,
'confidence': lsb_analysis_score if feature_extraction_method == 'histogram'
else stat_analysis_score if feature_extraction_method == 'statistics'
else wavelet_analysis_score,
'method': feature_extraction_method,
'frames_analyzed': frames_to_analyze,
'threshold_used': threshold
}
print(f"隐写分析完成!")
print(f"检测方法: {feature_extraction_method}")
print(f"分析帧数: {frames_to_analyze}")
print(f"置信度: {result['confidence']:.4f}")
print(f"是否包含隐写: {'是' if is_stego else '否'}")
return result
def extract_histogram_features(frame):
"""提取帧的直方图特征
Args:
frame: 输入帧
Returns:
numpy.ndarray: 特征向量
"""
# 分离BGR通道
b, g, r = cv2.split(frame)
# 计算每个通道的直方图
b_hist = np.histogram(b.flatten(), bins=256, range=(0, 256))[0]
g_hist = np.histogram(g.flatten(), bins=256, range=(0, 256))[0]
r_hist = np.histogram(r.flatten(), bins=256, range=(0, 256))[0]
# 计算LSB直方图(统计每个像素最低位为0和1的数量)
b_lsb0 = np.sum((b & 1) == 0)
b_lsb1 = np.sum((b & 1) == 1)
g_lsb0 = np.sum((g & 1) == 0)
g_lsb1 = np.sum((g & 1) == 1)
r_lsb0 = np.sum((r & 1) == 0)
r_lsb1 = np.sum((r & 1) == 1)
# 计算相邻像素值对的出现频率
pairs_b = []
pairs_g = []
pairs_r = []
# 水平相邻像素
pairs_b.extend((b[:, :-1].flatten() & 1) | ((b[:, 1:].flatten() & 1) << 1))
pairs_g.extend((g[:, :-1].flatten() & 1) | ((g[:, 1:].flatten() & 1) << 1))
pairs_r.extend((r[:, :-1].flatten() & 1) | ((r[:, 1:].flatten() & 1) << 1))
# 垂直相邻像素
pairs_b.extend((b[:-1].flatten() & 1) | ((b[1:].flatten() & 1) << 1))
pairs_g.extend((g[:-1].flatten() & 1) | ((g[1:].flatten() & 1) << 1))
pairs_r.extend((r[:-1].flatten() & 1) | ((r[1:].flatten() & 1) << 1))
# 计算相邻位对的直方图
pair_hist_b = np.bincount(pairs_b, minlength=4)
pair_hist_g = np.bincount(pairs_g, minlength=4)
pair_hist_r = np.bincount(pairs_r, minlength=4)
# 组合特征
features = np.concatenate([
np.array([b_lsb0, b_lsb1, g_lsb0, g_lsb1, r_lsb0, r_lsb1]),
pair_hist_b, pair_hist_g, pair_hist_r
])
# 归一化特征
features = features / np.sum(features) if np.sum(features) > 0 else features
return features
def extract_statistical_features(frame):
"""提取帧的统计特征
Args:
frame: 输入帧
Returns:
numpy.ndarray: 特征向量
"""
# 分离BGR通道
b, g, r = cv2.split(frame)
features = []
# 对每个通道计算统计特征
for channel in [b, g, r]:
# 计算LSB统计
lsb0_count = np.sum((channel & 1) == 0)
lsb1_count = np.sum((channel & 1) == 1)
lsb_ratio = lsb0_count / (lsb1_count + 1e-10) # 避免除零
# 计算像素值统计
mean_val = np.mean(channel)
std_val = np.std(channel)
skewness = stats.skew(channel.flatten())
kurtosis = stats.kurtosis(channel.flatten())
# 计算梯度统计
grad_x = cv2.Sobel(channel, cv2.CV_64F, 1, 0, ksize=3)
grad_y = cv2.Sobel(channel, cv2.CV_64F, 0, 1, ksize=3)
grad_magnitude = np.sqrt(grad_x**2 + grad_y**2)
mean_grad = np.mean(grad_magnitude)
std_grad = np.std(grad_magnitude)
# 计算熵
entropy = calculate_entropy(channel)
# 添加特征
features.extend([
lsb_ratio, mean_val, std_val, skewness, kurtosis,
mean_grad, std_grad, entropy
])
return np.array(features)
def extract_wavelet_features(frame):
"""提取帧的小波特征
Args:
frame: 输入帧
Returns:
numpy.ndarray: 特征向量
"""
try:
import pywt
# 转换为灰度图像
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 执行一级离散小波变换
coeffs2 = pywt.dwt2(gray, 'haar')
cA, (cH, cV, cD) = coeffs2
features = []
# 对每个小波系数子带计算特征
for coeff in [cA, cH, cV, cD]:
# 计算统计特征
mean_val = np.mean(coeff)
std_val = np.std(coeff)
skewness = stats.skew(coeff.flatten())
kurtosis = stats.kurtosis(coeff.flatten())
# 计算直方图特征
hist, _ = np.histogram(coeff.flatten(), bins=64, range=(-100, 100))
hist = hist / np.sum(hist) if np.sum(hist) > 0 else hist
# 添加特征
features.extend([mean_val, std_val, skewness, kurtosis])
features.extend(hist[:10]) # 只使用前10个直方图 bin
return np.array(features)
except ImportError:
print("警告: pywt库未安装,使用替代特征")
# 如果没有pywt库,返回统计特征作为替代
return extract_statistical_features(frame)
def analyze_lsb_histogram(features):
"""分析LSB直方图特征以检测隐写
Args:
features: 特征向量
Returns:
float: 隐写可能性分数 (0-1)
"""
# 提取LSB相关特征 (特征向量的前6个元素)
lsb0_b, lsb1_b, lsb0_g, lsb1_g, lsb0_r, lsb1_r = features[:6]
# 计算各通道的LSB平衡度(理想情况下应该接近1)
# 隐写会破坏这种平衡
balance_b = min(lsb0_b, lsb1_b) / max(lsb0_b, lsb1_b + 1e-10)
balance_g = min(lsb0_g, lsb1_g) / max(lsb0_g, lsb1_g + 1e-10)
balance_r = min(lsb0_r, lsb1_r) / max(lsb0_r, lsb1_r + 1e-10)
# 计算相邻位对的异常(特征向量的7-18位)
pair_hist_b = features[6:10]
pair_hist_g = features[10:14]
pair_hist_r = features[14:18]
# 对于自然图像,相邻位对的分布应该相对均匀
# 计算分布的均匀性(熵)
entropy_b = calculate_entropy_from_hist(pair_hist_b)
entropy_g = calculate_entropy_from_hist(pair_hist_g)
entropy_r = calculate_entropy_from_hist(pair_hist_r)
# 最大可能的熵(对于4个可能的值)
max_entropy = np.log2(4)
# 归一化熵
norm_entropy_b = entropy_b / max_entropy
norm_entropy_g = entropy_g / max_entropy
norm_entropy_r = entropy_r / max_entropy
# 综合评分:平衡度越低,熵越低,隐写可能性越高
# 1.0表示肯定是隐写,0.0表示肯定不是
score_b = 1.0 - (balance_b * 0.5 + norm_entropy_b * 0.5)
score_g = 1.0 - (balance_g * 0.5 + norm_entropy_g * 0.5)
score_r = 1.0 - (balance_r * 0.5 + norm_entropy_r * 0.5)
# 取平均分数
final_score = (score_b + score_g + score_r) / 3.0
# 确保分数在0-1范围内
return max(0.0, min(1.0, final_score))
def analyze_statistical_anomalies(features):
"""分析统计特征异常以检测隐写
Args:
features: 特征向量
Returns:
float: 隐写可能性分数 (0-1)
"""
# 自然图像的期望特征值范围(这些值需要通过大量样本训练得到)
# 这里使用经验值作为示例
expected_lsb_ratio = 1.0 # LSB应该大致平衡
expected_std_min = 10.0 # 标准差不应该太小
expected_entropy_min = 5.0 # 熵不应该太小
scores = []
# 对每个通道的特征进行分析
for channel in range(3):
offset = channel * 8
# 分析LSB比率
lsb_ratio = features[offset + 0]
lsb_score = 1.0 - min(lsb_ratio, 1.0/lsb_ratio) # 偏离1.0越远,分数越高
# 分析标准差
std_val = features[offset + 2]
std_score = 1.0 - min(std_val / expected_std_min, 1.0) # 标准差越小,分数越高
# 分析熵
entropy = features[offset + 7]
entropy_score = 1.0 - min(entropy / expected_entropy_min, 1.0) # 熵越小,分数越高
# 分析偏度和峰度
skewness = features[offset + 3]
kurtosis = features[offset + 4]
# 隐写可能会改变这些统计量
skew_score = min(abs(skewness) / 5.0, 1.0) # 偏度绝对值越大,分数越高
kurt_score = min(abs(kurtosis) / 10.0, 1.0) # 峰度绝对值越大,分数越高
# 综合该通道的分数
channel_score = (lsb_score * 0.4 + std_score * 0.2 +
entropy_score * 0.2 + skew_score * 0.1 +
kurt_score * 0.1)
scores.append(channel_score)
# 取平均分数
final_score = sum(scores) / len(scores)
# 确保分数在0-1范围内
return max(0.0, min(1.0, final_score))
def analyze_wavelet_anomalies(features):
"""分析小波特征异常以检测隐写
Args:
features: 特征向量
Returns:
float: 隐写可能性分数 (0-1)
"""
# 小波子带索引
subbands = ['cA', 'cH', 'cV', 'cD']
scores = []
# 对每个小波子带的特征进行分析
for subband_idx in range(4):
offset = subband_idx * 14 # 每个子带有4个统计特征 + 10个直方图特征
# 分析统计特征
mean_val = features[offset + 0]
std_val = features[offset + 1]
skewness = features[offset + 2]
kurtosis = features[offset + 3]
# 隐写会改变小波系数的统计特性
# 对于高频子带(cH, cV, cD),隐写的影响可能更明显
if subband_idx > 0: # 高频子带
# 标准差异常
std_score = min(std_val / 50.0, 1.0) # 标准差越大,可能表示隐写
# 偏度和峰度异常
skew_score = min(abs(skewness), 1.0)
kurt_score = min(abs(kurtosis) / 10.0, 1.0)
# 综合评分
subband_score = (std_score * 0.5 + skew_score * 0.25 + kurt_score * 0.25)
else: # 低频子带cA
# 低频子带的变化可能较小
subband_score = min(abs(mean_val) / 10.0, 1.0) # 均值偏离0越大,分数越高
scores.append(subband_score)
# 高频子带的分数权重更高
weights = [0.1, 0.3, 0.3, 0.3]
final_score = sum(s * w for s, w in zip(scores, weights))
# 确保分数在0-1范围内
return max(0.0, min(1.0, final_score))
def calculate_entropy(image):
"""计算图像的熵
Args:
image: 输入图像
Returns:
float: 熵值
"""
# 计算直方图
hist, _ = np.histogram(image.flatten(), bins=256, range=(0, 256))
# 归一化直方图
hist = hist / np.sum(hist)
# 计算熵
entropy = -np.sum(hist[hist > 0] * np.log2(hist[hist > 0]))
return entropy
def calculate_entropy_from_hist(histogram):
"""从直方图计算熵
Args:
histogram: 直方图
Returns:
float: 熵值
"""
# 归一化直方图
hist = histogram / np.sum(histogram) if np.sum(histogram) > 0 else histogram
# 计算熵
entropy = -np.sum(hist[hist > 0] * np.log2(hist[hist > 0]))
return entropy
def run_multiple_detections(video_path):
"""运行多种检测方法并综合结果
Args:
video_path: 待检测视频路径
Returns:
dict: 综合检测结果
"""
print(f"对视频 '{video_path}' 运行多方法检测...")
# 运行多种检测方法
hist_result = video_steganalysis_detection(video_path, 'histogram')
stat_result = video_steganalysis_detection(video_path, 'statistics')
wavelet_result = video_steganalysis_detection(video_path, 'wavelet')
# 综合结果
results = [r for r in [hist_result, stat_result, wavelet_result] if r is not None]
if not results:
print("所有检测方法都失败了")
return None
# 计算平均置信度
avg_confidence = np.mean([r['confidence'] for r in results])
# 基于多数表决和置信度决定最终结果
stego_count = sum(1 for r in results if r['is_stego'])
is_stego = stego_count >= len(results) / 2 or avg_confidence > 0.7
final_result = {
'is_stego': is_stego,
'avg_confidence': avg_confidence,
'method_results': results,
'stego_votes': stego_count,
'total_methods': len(results)
}
print(f"\n综合检测结果:")
print(f"平均置信度: {avg_confidence:.4f}")
print(f"隐写检测投票: {stego_count}/{len(results)}")
print(f"最终判断: {'可能包含隐写内容' if is_stego else '未检测到隐写内容'}")
return final_result将隐写与数字水印技术结合,可以提高数据隐藏的鲁棒性和安全性。
import numpy as np
import cv2
import os
from cryptography.fernet import Fernet
def steganography_with_watermark(video_path, output_path, secret_data, watermark_text,
encryption_key=None, embedding_strength=0.1):
"""结合隐写和数字水印的高级防护方法
Args:
video_path: 输入视频路径
output_path: 输出视频路径
secret_data: 要隐藏的秘密数据
watermark_text: 要嵌入的水印文本
encryption_key: 加密密钥(如果为None则自动生成)
embedding_strength: 水印嵌入强度
Returns:
dict: 操作结果
"""
# 生成或使用提供的加密密钥
if encryption_key is None:
encryption_key = Fernet.generate_key()
cipher = Fernet(encryption_key)
# 加密秘密数据
encrypted_data = cipher.encrypt(secret_data.encode())
encrypted_text = encrypted_data.decode('utf-8')
print(f"数据已加密,密钥: {encryption_key.decode()}")
# 读取视频
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {video_path}")
return None
# 获取视频属性
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# 创建输出视频
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
# 准备水印数据(重复以适应视频长度)
watermark_bits = text_to_bits(watermark_text)
watermark_len = len(watermark_bits)
# 准备秘密数据
secret_bits = text_to_bits(encrypted_text)
secret_len = len(secret_bits)
# 计算嵌入分布
# 水印会嵌入到所有帧中,而秘密数据只嵌入到一部分帧中
secret_frames = set(np.random.choice(range(frame_count),
min(frame_count, max(10, frame_count // 2)),
replace=False))
frame_idx = 0
secret_bit_idx = 0
while True:
ret, frame = cap.read()
if not ret:
break
# 创建帧的副本用于修改
stego_frame = frame.copy()
# 嵌入水印(所有帧都嵌入)
stego_frame = embed_watermark(stego_frame, watermark_bits,
frame_idx % watermark_len,
embedding_strength)
# 在选定的帧中嵌入秘密数据
if frame_idx in secret_frames and secret_bit_idx < secret_len:
# 计算本帧可嵌入的秘密数据位数
frame_capacity = int(width * height * 0.001) # 限制每帧的嵌入量
bits_to_embed = min(frame_capacity, secret_len - secret_bit_idx)
# 嵌入秘密数据
secret_chunk = secret_bits[secret_bit_idx:secret_bit_idx + bits_to_embed]
stego_frame = embed_adaptive_frame(stego_frame, secret_chunk)
secret_bit_idx += bits_to_embed
# 写入输出帧
out.write(stego_frame)
frame_idx += 1
# 释放资源
cap.release()
out.release()
result = {
'success': os.path.exists(output_path),
'encryption_key': encryption_key.decode(),
'secret_data_embedded': secret_bit_idx,
'secret_data_total': secret_len,
'watermark_embedded': True,
'frames_processed': frame_idx,
'secret_frames_count': len(secret_frames)
}
print(f"\n隐写与水印结合完成!")
print(f"输出文件: {output_path}")
print(f"嵌入秘密数据位数: {secret_bit_idx}/{secret_len}")
print(f"嵌入水印: {'成功'}")
print(f"处理帧数: {frame_idx}")
print(f"用于嵌入秘密数据的帧数: {len(secret_frames)}")
return result
def embed_watermark(frame, watermark_bits, start_index, strength=0.1):
"""在帧中嵌入鲁棒水印
Args:
frame: 输入帧
watermark_bits: 水印位序列
start_index: 水印起始索引
strength: 嵌入强度
Returns:
numpy.ndarray: 嵌入水印后的帧
"""
try:
import pywt
# 转换为浮点型用于变换
frame_float = frame.astype(np.float32)
# 对每个通道分别处理
watermarked_frame = np.zeros_like(frame_float)
for c in range(3): # BGR通道
# 执行离散小波变换
coeffs2 = pywt.dwt2(frame_float[:, :, c], 'haar')
cA, (cH, cV, cD) = coeffs2
# 选择用于嵌入的系数(使用中频区域)
# 对cH(水平细节)进行修改
h, w = cH.shape
# 计算需要嵌入的位数
watermark_len = len(watermark_bits)
# 准备要嵌入的数据(循环使用水印)
embed_bits = []
for i in range(h * w):
bit_idx = (start_index + i) % watermark_len
embed_bits.append(watermark_bits[bit_idx])
# 将位序列重塑为与cH相同的形状
embed_pattern = np.array(embed_bits).reshape(h, w)
# 嵌入水印(调整系数)
# 对于值为1的位,增加系数值;对于值为0的位,减少系数值
cH_watermarked = cH + strength * np.mean(np.abs(cH)) * (2 * embed_pattern - 1)
# 重建系数
coeffs_watermarked = (cA, (cH_watermarked, cV, cD))
# 执行逆离散小波变换
watermarked_channel = pywt.idwt2(coeffs_watermarked, 'haar')
# 裁剪到原始大小
watermarked_channel = watermarked_channel[:frame.shape[0], :frame.shape[1]]
watermarked_frame[:, :, c] = watermarked_channel
# 转换回整数类型并裁剪范围
watermarked_frame = np.clip(watermarked_frame, 0, 255).astype(np.uint8)
return watermarked_frame
except ImportError:
print("警告: pywt库未安装,使用简单的频率域水印替代")
# 如果没有pywt库,使用基于FFT的简单水印
return embed_simple_watermark(frame, watermark_bits, start_index, strength)
def embed_simple_watermark(frame, watermark_bits, start_index, strength=0.1):
"""使用FFT嵌入简单水印
Args:
frame: 输入帧
watermark_bits: 水印位序列
start_index: 水印起始索引
strength: 嵌入强度
Returns:
numpy.ndarray: 嵌入水印后的帧
"""
watermarked_frame = frame.copy()
for c in range(3): # BGR通道
# 执行FFT
f_transform = np.fft.fft2(watermarked_frame[:, :, c])
f_shift = np.fft.fftshift(f_transform)
# 获取频域图像的中心点
rows, cols = watermarked_frame.shape[:2]
crow, ccol = rows // 2, cols // 2
# 选择嵌入区域(中心周围的高频区域)
# 避免修改DC分量(中心点)
size = 10 # 水印区域大小
# 准备水印模式
watermark_len = len(watermark_bits)
watermark_pattern = np.zeros((size, size))
for i in range(size):
for j in range(size):
idx = (start_index + i * size + j) % watermark_len
watermark_pattern[i, j] = watermark_bits[idx]
# 嵌入水印
# 将水印值映射到频域修改量
magnitude = np.abs(f_shift[crow-size:crow+size, ccol-size:ccol+size])
phase_shift = strength * (2 * watermark_pattern - 1) * np.max(magnitude) * 0.1
# 修改相位
phase = np.angle(f_shift[crow-size:crow+size, ccol-size:ccol+size])
phase += phase_shift
# 重建频域数据
f_shift[crow-size:crow+size, ccol-size:ccol+size] = \
magnitude * np.exp(1j * phase)
# 逆FFT
f_ishift = np.fft.ifftshift(f_shift)
img_back = np.fft.ifft2(f_ishift)
img_back = np.abs(img_back)
# 更新通道
watermarked_frame[:, :, c] = np.clip(img_back, 0, 255).astype(np.uint8)
return watermarked_frame
def extract_watermark(video_path, watermark_length, start_frame=0, frames_to_use=20):
"""从视频中提取水印
Args:
video_path: 视频路径
watermark_length: 预期水印长度(位数)
start_frame: 开始提取的帧索引
frames_to_use: 使用多少帧进行水印提取(越多越准确)
Returns:
str: 提取的水印文本
"""
# 读取视频
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {video_path}")
return ""
# 设置起始帧
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
available_frames = min(frames_to_use, frame_count - start_frame)
if available_frames <= 0:
print("没有足够的帧用于水印提取")
cap.release()
return ""
# 存储每个水印位的投票
bit_votes = np.zeros(watermark_length)
bit_counts = np.zeros(watermark_length)
for frame_idx in range(available_frames):
ret, frame = cap.read()
if not ret:
break
# 从当前帧提取水印位
extracted_bits = extract_watermark_from_frame(frame, watermark_length, frame_idx)
if extracted_bits:
# 对每个位进行投票
for i in range(len(extracted_bits)):
bit_votes[i] += extracted_bits[i]
bit_counts[i] += 1
cap.release()
# 根据投票确定最终水印位
final_bits = []
for i in range(watermark_length):
if bit_counts[i] > 0:
# 如果1的票数多于0的票数,则判定为1
final_bit = 1 if bit_votes[i] > bit_counts[i] / 2 else 0
final_bits.append(final_bit)
else:
# 如果没有投票,默认为0
final_bits.append(0)
# 转换为文本
watermark_text = bits_to_text(final_bits)
print(f"水印提取完成!")
print(f"使用帧数: {available_frames}")
print(f"提取的水印文本: '{watermark_text}'")
return watermark_text
def extract_watermark_from_frame(frame, watermark_length, frame_offset):
"""从单个帧中提取水印位
Args:
frame: 输入帧
watermark_length: 预期水印长度(位数)
frame_offset: 帧偏移量,用于同步水印提取
Returns:
list: 提取的位序列
"""
try:
import pywt
extracted_bits = []
# 对每个通道提取水印,然后取平均值
channel_bits = []
for c in range(3): # BGR通道
# 执行离散小波变换
coeffs2 = pywt.dwt2(frame[:, :, c], 'haar')
_, (cH, _, _) = coeffs2
# 获取cH的统计信息
h, w = cH.shape
# 计算可以提取的位数量
bits_per_channel = min(watermark_length, h * w)
# 提取位(从cH中采样)
bits = []
for i in range(bits_per_channel):
# 计算采样位置
pos = (frame_offset + i) % (h * w)
row = pos // w
col = pos % w
# 根据系数值的正负判断位值
# 在嵌入时,我们对1的位增加系数,对0的位减少系数
bit = 1 if cH[row, col] > np.median(cH) else 0
bits.append(bit)
channel_bits.append(bits)
# 对三个通道的结果进行多数表决
for i in range(watermark_length):
votes = [channel[i] if i < len(channel) else 0 for channel in channel_bits]
bit = 1 if sum(votes) > len(votes) / 2 else 0
extracted_bits.append(bit)
return extracted_bits
except ImportError:
# 使用基于FFT的水印提取
return extract_simple_watermark_from_frame(frame, watermark_length, frame_offset)
def extract_simple_watermark_from_frame(frame, watermark_length, frame_offset):
"""使用FFT从单个帧中提取简单水印
Args:
frame: 输入帧
watermark_length: 预期水印长度(位数)
frame_offset: 帧偏移量
Returns:
list: 提取的位序列
"""
extracted_bits = []
channel_bits = []
for c in range(3): # BGR通道
# 执行FFT
f_transform = np.fft.fft2(frame[:, :, c])
f_shift = np.fft.fftshift(f_transform)
# 获取频域图像的中心点
rows, cols = frame.shape[:2]
crow, ccol = rows // 2, cols // 2
# 选择水印区域
size = 10
# 提取水印区域的相位
phase = np.angle(f_shift[crow-size:crow+size, ccol-size:ccol+size])
# 从相位中提取位
bits = []
phase_mean = np.mean(phase)
for i in range(watermark_length):
# 映射索引到水印区域
idx = (frame_offset + i) % (size * size)
row = idx // size
col = idx % size
# 根据相位与平均值的关系判断位值
bit = 1 if phase[row, col] > phase_mean else 0
bits.append(bit)
channel_bits.append(bits)
# 对三个通道的结果进行多数表决
for i in range(watermark_length):
votes = [channel[i] if i < len(channel) else 0 for channel in channel_bits]
bit = 1 if sum(votes) > len(votes) / 2 else 0
extracted_bits.append(bit)
return extracted_bits
def extract_secret_with_watermark(stego_video_path, encryption_key, frames_to_scan=100):
"""从带有水印的隐写视频中提取秘密数据
Args:
stego_video_path: 隐写视频路径
encryption_key: 加密密钥
frames_to_scan: 要扫描的帧数
Returns:
str: 解密后的秘密数据
"""
# 读取视频
cap = cv2.VideoCapture(stego_video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {stego_video_path}")
return ""
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frames_to_process = min(frames_to_scan, frame_count)
# 扫描可能包含秘密数据的帧
all_extracted_bits = []
for frame_idx in range(frames_to_process):
ret, frame = cap.read()
if not ret:
break
# 使用之前实现的自适应提取方法
extracted_bits = extract_adaptive_bits_from_frame(frame)
all_extracted_bits.extend(extracted_bits)
cap.release()
# 如果没有提取到足够的位,返回空
if len(all_extracted_bits) < 8:
print("未能提取到足够的数据")
return ""
# 将位序列转换为文本
extracted_text = bits_to_text(all_extracted_bits)
# 尝试解密数据
try:
cipher = Fernet(encryption_key.encode() if isinstance(encryption_key, str) else encryption_key)
# 找到有效的加密数据(可能包含一些垃圾数据)
# 查找可能的加密数据起始位置(加密数据通常以'gAAAAA'开头)
import re
pattern = r'gAAAAA[\w+/=]+'
matches = re.findall(pattern, extracted_text)
if matches:
# 尝试解密最长的匹配项
longest_match = max(matches, key=len)
try:
decrypted_data = cipher.decrypt(longest_match.encode())
decrypted_text = decrypted_data.decode('utf-8')
print(f"秘密数据提取成功!")
print(f"解密后的文本: '{decrypted_text[:100]}...'" if len(decrypted_text) > 100 else f"解密后的文本: '{decrypted_text}'")
return decrypted_text
except Exception as e:
print(f"解密失败: {e}")
# 尝试其他匹配项
for match in matches[1:]:
try:
decrypted_data = cipher.decrypt(match.encode())
decrypted_text = decrypted_data.decode('utf-8')
print(f"使用另一个匹配项解密成功!")
return decrypted_text
except:
continue
else:
print("未找到有效的加密数据格式")
except Exception as e:
print(f"解密过程出错: {e}")
return ""
# 假设之前定义的text_to_bits和bits_to_text函数可用
# 假设之前定义的extract_adaptive_bits_from_frame函数可用(简化版如下)
def extract_adaptive_bits_from_frame(frame, max_bits=1000):
"""从帧中提取自适应嵌入的位(简化版)
Args:
frame: 输入帧
max_bits: 最大提取位数
Returns:
list: 提取的位序列
"""
# 转换为灰度以检测边缘
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
# 找出边缘像素(可能的嵌入位置)
edge_pixels = np.argwhere(edges > 0)
# 限制提取的位数
bits_to_extract = min(max_bits, len(edge_pixels))
# 提取位
extracted_bits = []
for i in range(bits_to_extract):
y, x = edge_pixels[i]
# 检查所有颜色通道
for c in range(3):
if len(extracted_bits) >= max_bits:
break
# 提取最低有效位
bit = frame[y, x, c] & 1
extracted_bits.append(bit)
return extracted_bits
# 高级防护隐写系统测试
def test_secure_steganography(video_path):
"""测试高级防护隐写系统
Args:
video_path: 测试视频路径
"""
print("=== 高级防护隐写系统测试 ===")
if not os.path.exists(video_path):
print(f"错误:视频文件 '{video_path}' 不存在")
return
# 测试数据
secret_data = "这是一段受到高级防护的秘密信息,结合了加密、隐写和数字水印技术。"
watermark_text = "SecureStego2025"
# 1. 执行带水印的加密隐写
output_path = "secure_stego_output.mp4"
result = steganography_with_watermark(
video_path, output_path, secret_data, watermark_text,
embedding_strength=0.05
)
if not result or not result['success']:
print("隐写失败!")
return
encryption_key = result['encryption_key']
print(f"\n保存的加密密钥: {encryption_key}")
# 2. 提取水印
print("\n=== 提取水印测试 ===")
watermark_bits = len(text_to_bits(watermark_text))
extracted_watermark = extract_watermark(
output_path, watermark_bits,
frames_to_use=30
)
# 验证水印
if extracted_watermark:
similarity = calculate_string_similarity(watermark_text, extracted_watermark)
print(f"原始水印: '{watermark_text}'")
print(f"提取水印: '{extracted_watermark}'")
print(f"水印相似度: {similarity:.2f}%")
# 3. 提取并解密秘密数据
print("\n=== 提取秘密数据测试 ===")
extracted_secret = extract_secret_with_watermark(
output_path, encryption_key,
frames_to_scan=100
)
# 验证秘密数据
if extracted_secret:
similarity = calculate_string_similarity(secret_data, extracted_secret)
print(f"原始数据长度: {len(secret_data)} 字符")
print(f"提取数据长度: {len(extracted_secret)} 字符")
print(f"相似度: {similarity:.2f}%")
if similarity > 90:
print("✓ 秘密数据提取成功!")
else:
print("⚠ 秘密数据部分提取")
# 4. 进行隐写分析测试
print("\n=== 隐写分析检测测试 ===")
detection_result = run_multiple_detections(output_path)
# 评估隐写的隐蔽性
if detection_result:
print(f"隐写检测结果: {'被检测到' if detection_result['is_stego'] else '未被检测到'}")
print(f"平均置信度: {detection_result['avg_confidence']:.4f}")
if not detection_result['is_stego']:
print("✓ 隐写具有良好的隐蔽性!")
else:
print("⚠ 隐写可能被检测到")
# 使用示例
# test_secure_steganography('input_video.mp4')
## 第六章 视频隐写的实际应用与挑战
在本章中,我们将探讨视频隐写技术的实际应用场景、面临的挑战以及未来发展方向。
### 6.1 实际应用场景
#### 6.1.1 版权保护与数字权益管理
视频隐写技术在版权保护方面有着重要应用。通过在视频中嵌入不可见的数字水印,可以证明版权归属,追踪盗版来源,以及控制数字内容的使用权限。
```python
import cv2
import numpy as np
import os
from datetime import datetime
def digital_rights_management(video_path, output_path, owner_info, license_info,
embedding_strength=0.05):
"""视频数字权益管理系统
Args:
video_path: 输入视频路径
output_path: 输出视频路径
owner_info: 所有者信息
license_info: 许可证信息
embedding_strength: 嵌入强度
Returns:
dict: 操作结果
"""
# 准备水印数据
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
watermark_data = f"OWNER:{owner_info}|LICENSE:{license_info}|TIME:{timestamp}"
print(f"准备嵌入的权益信息: {watermark_data}")
# 读取视频
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {video_path}")
return None
# 获取视频属性
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# 创建输出视频
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
# 将水印数据转换为位序列
watermark_bits = text_to_bits(watermark_data)
watermark_len = len(watermark_bits)
# 确保水印可以在整个视频中重复嵌入
# 每N帧重复一次水印
repeat_interval = max(1, frame_count // 10)
embedded_frames = 0
for frame_idx in range(frame_count):
ret, frame = cap.read()
if not ret:
break
# 创建帧的副本用于修改
stego_frame = frame.copy()
# 每隔一定帧数嵌入一次完整水印
if frame_idx % repeat_interval == 0:
# 嵌入水印
stego_frame = embed_drm_watermark(stego_frame, watermark_bits,
embedding_strength)
embedded_frames += 1
# 写入输出帧
out.write(stego_frame)
# 显示进度
if frame_idx % 100 == 0 or frame_idx == frame_count - 1:
print(f"处理进度: {frame_idx+1}/{frame_count} ({(frame_idx+1)/frame_count*100:.1f}%)")
# 释放资源
cap.release()
out.release()
# 生成验证信息
verification_code = generate_verification_code(watermark_data)
result = {
'success': os.path.exists(output_path),
'owner_info': owner_info,
'license_info': license_info,
'timestamp': timestamp,
'verification_code': verification_code,
'embedded_frames': embedded_frames,
'total_frames': frame_count
}
print(f"\n数字权益管理标记完成!")
print(f"输出文件: {output_path}")
print(f"所有者: {owner_info}")
print(f"许可证: {license_info}")
print(f"时间戳: {timestamp}")
print(f"验证代码: {verification_code}")
print(f"嵌入帧数: {embedded_frames}/{frame_count}")
# 保存验证信息到文件
save_verification_info(output_path + ".verify", result)
return result
def embed_drm_watermark(frame, watermark_bits, strength=0.05):
"""嵌入DRM水印
Args:
frame: 输入帧
watermark_bits: 水印位序列
strength: 嵌入强度
Returns:
numpy.ndarray: 嵌入水印后的帧
"""
try:
import pywt
# 转换为浮点型
frame_float = frame.astype(np.float32)
watermarked_frame = np.zeros_like(frame_float)
for c in range(3): # 对每个BGR通道分别处理
# 执行多级离散小波变换
coeffs = pywt.wavedec2(frame_float[:, :, c], 'db4', level=3)
# 选择中频系数进行水印嵌入(第二级的细节系数)
cH2, cV2, cD2 = coeffs[2]
# 水印数据长度
wm_len = len(watermark_bits)
# 调整水印大小以匹配系数大小
h, w = cH2.shape
# 计算需要嵌入的系数数量
coeff_count = h * w
# 将水印位复制到足够长度
extended_watermark = np.tile(watermark_bits, (coeff_count // wm_len) + 1)[:coeff_count]
# 重塑为系数形状
wm_pattern = extended_watermark.reshape(h, w)
# 计算嵌入强度(根据系数的统计特性自适应)
coeff_std = np.std(cH2)
adaptive_strength = strength * coeff_std
# 嵌入水印到水平细节系数
cH2_watermarked = cH2 + adaptive_strength * (2 * wm_pattern - 1)
# 更新系数
coeffs[2] = (cH2_watermarked, cV2, cD2)
# 重构图像
watermarked_channel = pywt.waverec2(coeffs, 'db4')
# 裁剪到原始大小
h_orig, w_orig = frame.shape[:2]
watermarked_channel = watermarked_channel[:h_orig, :w_orig]
watermarked_frame[:, :, c] = watermarked_channel
# 转换回整数类型并裁剪范围
watermarked_frame = np.clip(watermarked_frame, 0, 255).astype(np.uint8)
return watermarked_frame
except ImportError:
print("警告: pywt库未安装,使用频率域水印替代")
# 使用基于DCT的水印作为替代
return embed_dct_watermark(frame, watermark_bits, strength)
def embed_dct_watermark(frame, watermark_bits, strength=0.05):
"""使用DCT嵌入水印
Args:
frame: 输入帧
watermark_bits: 水印位序列
strength: 嵌入强度
Returns:
numpy.ndarray: 嵌入水印后的帧
"""
watermarked_frame = frame.copy()
h, w = frame.shape[:2]
# DCT块大小
block_size = 8
# 水印索引
wm_idx = 0
wm_len = len(watermark_bits)
# 遍历每个DCT块
for i in range(0, h, block_size):
for j in range(0, w, block_size):
# 确保块在图像范围内
if i + block_size > h or j + block_size > w:
continue
# 对每个颜色通道处理
for c in range(3):
# 提取块
block = watermarked_frame[i:i+block_size, j:j+block_size, c].astype(np.float32)
# 执行DCT
dct_block = cv2.dct(block)
# 选择中频系数位置 (避开DC分量和高频分量)
# 这里选择位置 (3,3),可根据需要调整
u, v = 3, 3
# 嵌入水印位
if wm_idx < wm_len:
# 调整系数值
dct_block[u, v] += strength * np.abs(dct_block[u, v]) * (2 * watermark_bits[wm_idx] - 1)
wm_idx += 1
# 如果水印用完了,从头开始
if wm_idx >= wm_len:
wm_idx = 0
# 执行IDCT
idct_block = cv2.idct(dct_block)
# 更新块
watermarked_frame[i:i+block_size, j:j+block_size, c] = np.clip(idct_block, 0, 255).astype(np.uint8)
return watermarked_frame
def extract_drm_watermark(video_path, expected_length=100, frames_to_use=20):
"""提取DRM水印
Args:
video_path: 视频路径
expected_length: 预期水印长度(位数)
frames_to_use: 使用多少帧进行水印提取
Returns:
str: 提取的水印信息
"""
# 读取视频
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {video_path}")
return ""
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# 计算采样间隔
interval = max(1, frame_count // frames_to_use)
# 存储每帧提取的位
all_extracted_bits = []
# 从关键帧中提取水印
for frame_idx in range(0, frame_count, interval):
# 设置帧位置
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if not ret:
break
# 从当前帧提取水印位
extracted_bits = extract_watermark_from_drm_frame(frame, expected_length)
if extracted_bits:
all_extracted_bits.append(extracted_bits)
# 如果已处理足够帧数,提前退出
if len(all_extracted_bits) >= frames_to_use:
break
cap.release()
# 如果没有提取到足够的位,返回空
if not all_extracted_bits:
print("未能从任何帧中提取水印")
return ""
# 对多帧结果进行多数表决
final_bits = majority_voting(all_extracted_bits)
# 转换为文本
extracted_text = bits_to_text(final_bits)
# 尝试解析DRM信息
parsed_info = parse_drm_info(extracted_text)
if parsed_info:
print("\n成功提取DRM信息:")
for key, value in parsed_info.items():
print(f"{key}: {value}")
return extracted_text
else:
print("\n提取的水印无法解析为DRM信息:")
print(f"原始文本: '{extracted_text[:100]}...'" if len(extracted_text) > 100 else f"原始文本: '{extracted_text}'")
return extracted_text
def extract_watermark_from_drm_frame(frame, expected_length):
"""从单个DRM帧中提取水印位
Args:
frame: 输入帧
expected_length: 预期水印长度(位数)
Returns:
list: 提取的位序列
"""
try:
import pywt
# 存储每个通道提取的位
channel_bits = []
for c in range(3): # BGR通道
# 执行多级离散小波变换
coeffs = pywt.wavedec2(frame[:, :, c], 'db4', level=3)
# 获取第二级的水平细节系数
cH2, _, _ = coeffs[2]
# 获取系数形状
h, w = cH2.shape
# 计算需要提取的系数数量
coeff_count = h * w
# 创建一个足够大的数组存储提取的位
extracted = []
# 从系数中提取位
# 将系数重塑为一维
cH2_flat = cH2.flatten()
# 计算中位数作为阈值
median = np.median(cH2_flat)
# 提取位(系数大于中位数为1,否则为0)
for i in range(expected_length):
# 循环使用系数
coeff_idx = i % coeff_count
bit = 1 if cH2_flat[coeff_idx] > median else 0
extracted.append(bit)
channel_bits.append(extracted)
# 对三个通道的结果进行多数表决
final_bits = []
for i in range(expected_length):
votes = [channel[i] for channel in channel_bits]
bit = 1 if sum(votes) > len(votes) / 2 else 0
final_bits.append(bit)
return final_bits
except ImportError:
# 使用基于DCT的水印提取
return extract_dct_watermark_from_frame(frame, expected_length)
def extract_dct_watermark_from_frame(frame, expected_length):
"""从DCT嵌入的帧中提取水印
Args:
frame: 输入帧
expected_length: 预期水印长度(位数)
Returns:
list: 提取的位序列
"""
h, w = frame.shape[:2]
block_size = 8
# 存储提取的位
extracted_bits = []
# 水印系数位置
u, v = 3, 3
# 收集DCT系数
coefficients = []
# 遍历每个DCT块
for i in range(0, h, block_size):
for j in range(0, w, block_size):
# 确保块在图像范围内
if i + block_size > h or j + block_size > w:
continue
# 对每个颜色通道处理
for c in range(3):
# 提取块
block = frame[i:i+block_size, j:j+block_size, c].astype(np.float32)
# 执行DCT
dct_block = cv2.dct(block)
# 记录系数值
coefficients.append(dct_block[u, v])
# 如果系数不足,返回空
if not coefficients:
return []
# 计算中位数作为阈值
median = np.median(coefficients)
# 提取位(需要循环使用系数)
for i in range(expected_length):
coeff_idx = i % len(coefficients)
bit = 1 if coefficients[coeff_idx] > median else 0
extracted_bits.append(bit)
return extracted_bits
def majority_voting(bit_lists):
"""对多位序列进行多数表决
Args:
bit_lists: 位序列列表
Returns:
list: 表决后的位序列
"""
if not bit_lists:
return []
# 获取最短序列长度
min_length = min(len(bits) for bits in bit_lists)
result = []
# 对每个位置进行表决
for i in range(min_length):
# 统计1的数量
ones_count = sum(1 for bits in bit_lists if bits[i] == 1)
# 多数表决
result.append(1 if ones_count > len(bit_lists) / 2 else 0)
return result
def parse_drm_info(text):
"""解析DRM信息
Args:
text: 包含DRM信息的文本
Returns:
dict: 解析后的DRM信息
"""
try:
import re
# 尝试匹配DRM信息格式
owner_match = re.search(r'OWNER:(.*?)\|', text)
license_match = re.search(r'LICENSE:(.*?)\|', text)
time_match = re.search(r'TIME:(\d{8}_\d{6})', text)
if owner_match and license_match:
info = {
'owner': owner_match.group(1).strip(),
'license': license_match.group(1).strip()
}
if time_match:
info['timestamp'] = time_match.group(1)
return info
return None
except Exception as e:
print(f"解析DRM信息时出错: {e}")
return None
def generate_verification_code(data):
"""生成验证代码
Args:
data: 输入数据
Returns:
str: 验证代码
"""
import hashlib
# 使用SHA-256生成哈希值
hash_obj = hashlib.sha256(data.encode())
hash_hex = hash_obj.hexdigest()
# 返回前16个字符作为验证代码
return hash_hex[:16].upper()
def save_verification_info(file_path, info):
"""保存验证信息到文件
Args:
file_path: 文件路径
info: 验证信息
"""
try:
import json
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(info, f, ensure_ascii=False, indent=2)
print(f"验证信息已保存到: {file_path}")
except Exception as e:
print(f"保存验证信息时出错: {e}")
def verify_content_ownership(video_path, owner_info=None, verification_file=None):
"""验证内容所有权
Args:
video_path: 视频路径
owner_info: 预期所有者信息(可选)
verification_file: 验证文件路径(可选)
Returns:
dict: 验证结果
"""
print(f"验证视频 '{video_path}' 的所有权...")
# 尝试从验证文件读取信息
expected_info = None
if verification_file and os.path.exists(verification_file):
try:
import json
with open(verification_file, 'r', encoding='utf-8') as f:
expected_info = json.load(f)
print("已加载验证文件信息")
except Exception as e:
print(f"读取验证文件时出错: {e}")
# 提取视频中的DRM水印
# 假设水印长度约为500位(足够包含典型的DRM信息)
extracted_text = extract_drm_watermark(video_path, expected_length=500, frames_to_use=30)
if not extracted_text:
return {
'verified': False,
'message': "未能从视频中提取任何DRM信息",
'extracted_info': None
}
# 解析提取的信息
parsed_info = parse_drm_info(extracted_text)
# 验证
verified = False
message = ""
if parsed_info:
# 如果有预期所有者信息,进行验证
if owner_info:
if 'owner' in parsed_info and parsed_info['owner'] == owner_info:
verified = True
message = f"验证成功!视频归属于'{owner_info}'"
else:
message = f"验证失败!预期所有者'{owner_info}',实际所有者'{parsed_info.get('owner', '未知')}'"
# 如果有验证文件,进行更全面的验证
elif expected_info:
# 检查所有者
if 'owner' in parsed_info and 'owner_info' in expected_info:
owner_match = parsed_info['owner'] == expected_info['owner_info']
else:
owner_match = False
# 检查时间戳
if 'timestamp' in parsed_info and 'timestamp' in expected_info:
timestamp_match = parsed_info['timestamp'] == expected_info['timestamp']
else:
timestamp_match = False
# 综合验证结果
verified = owner_match and timestamp_match
if verified:
message = "验证成功!视频信息与验证文件匹配"
else:
message = "验证失败!视频信息与验证文件不匹配"
else:
message = "已提取DRM信息,但没有提供验证标准"
else:
message = "未能解析提取的水印数据为有效DRM信息"
result = {
'verified': verified,
'message': message,
'extracted_info': parsed_info,
'extracted_text': extracted_text
}
print(f"验证结果: {'成功' if verified else '失败'}")
print(f"消息: {message}")
return result
# DRM系统使用示例
def drm_workflow_demo(input_video):
"""DRM工作流程演示
Args:
input_video: 输入视频路径
"""
if not os.path.exists(input_video):
print(f"错误:视频文件 '{input_video}' 不存在")
return
# 1. 嵌入DRM信息
output_video = "protected_video.mp4"
result = digital_rights_management(
input_video, output_video,
owner_info="ContentStudio Inc.",
license_info="Commercial Use - Non Redistributable",
embedding_strength=0.05
)
if not result or not result['success']:
print("DRM嵌入失败!")
return
print("\n" + "="*60)
# 2. 提取并验证DRM信息
print("\n提取并验证DRM信息:")
verify_result = verify_content_ownership(
output_video,
owner_info="ContentStudio Inc."
)
# 3. 测试在视频处理后的鲁棒性
print("\n" + "="*60)
print("\n测试水印鲁棒性(轻微压缩后):")
# 创建一个压缩版本的视频
compressed_video = "compressed_protected_video.mp4"
compress_video(output_video, compressed_video)
# 尝试从压缩视频中提取水印
if os.path.exists(compressed_video):
robustness_result = extract_drm_watermark(compressed_video, expected_length=500)
if robustness_result:
print("水印在压缩后仍然可提取!")
else:
print("警告:压缩后未能提取水印")
def compress_video(input_path, output_path, crf=28):
"""压缩视频(用于测试水印鲁棒性)
Args:
input_path: 输入视频路径
output_path: 输出视频路径
crf: 恒定速率因子(值越大,压缩率越高,质量越低)
"""
try:
# 使用ffmpeg进行压缩
# 注意:此函数需要系统安装ffmpeg
import subprocess
cmd = [
'ffmpeg', '-i', input_path, '-c:v', 'libx264',
'-crf', str(crf), '-preset', 'medium',
'-c:a', 'aac', '-b:a', '128k',
'-y', output_path
]
print(f"正在压缩视频(CRF={crf})...")
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(f"压缩完成: {output_path}")
except Exception as e:
print(f"视频压缩失败: {e}")视频隐写技术在安全通信领域也有重要应用。政府机构、军队和情报部门可以利用视频隐写来安全地传输敏感信息,而不引起注意。
import cv2
import numpy as np
import os
import time
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import base64
def secure_communication_system(video_path, output_path, secret_message,
password, stego_method='adaptive'):
"""安全通信系统 - 使用视频隐写进行秘密消息传递
Args:
video_path: 载体视频路径
output_path: 输出视频路径
secret_message: 秘密消息
password: 加密密码
stego_method: 隐写方法 ('adaptive', 'statistical', 'frequency')
Returns:
dict: 操作结果
"""
start_time = time.time()
print(f"安全通信系统启动...")
print(f"隐写方法: {stego_method}")
# 1. 消息预处理与加密
print("\n1. 消息预处理与加密...")
# 生成加密密钥和初始化向量
key, iv = derive_key_and_iv(password)
# 加密消息
encrypted_message = encrypt_message(secret_message, key, iv)
# 编码为适合嵌入的格式
encoded_message = base64.b64encode(encrypted_message).decode('utf-8')
# 添加消息长度前缀,便于接收方知道何时停止提取
message_length = len(encrypted_message)
header = f"LENGTH:{message_length:010d}|"
full_message = header + encoded_message
print(f"原始消息长度: {len(secret_message)} 字符")
print(f"加密后长度: {message_length} 字节")
print(f"完整嵌入数据: {len(full_message)} 字符")
# 2. 执行隐写
print("\n2. 执行隐写...")
if stego_method == 'adaptive':
stego_result = content_aware_steganography(
video_path, output_path, full_message,
quality_factor=0.9 # 高质量因子,提高隐蔽性
)
elif stego_method == 'statistical':
stego_result = statistical_aware_steganography(
video_path, output_path, full_message,
embedding_rate=0.1 # 低嵌入率,减少统计异常
)
elif stego_method == 'frequency':
stego_result = frequency_domain_steganography(
video_path, output_path, full_message
)
else:
print(f"不支持的隐写方法: {stego_method}")
return None
if not stego_result or not stego_result.get('success', False):
print("隐写失败!")
return None
# 3. 验证嵌入结果
print("\n3. 验证嵌入结果...")
# 计算原始视频和隐写视频之间的PSNR
psnr_value = calculate_psnr(video_path, output_path)
# 执行简单的隐写分析检测
detection_attempt = quick_steganalysis_check(output_path)
# 4. 生成传输信息
print("\n4. 生成传输信息...")
# 生成随机传输ID
import random
import string
transport_id = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
# 生成时间戳
timestamp = time.strftime("%Y%m%d_%H%M%S")
# 5. 生成接收方指南
print("\n5. 生成接收方指南...")
receiver_instructions = generate_receiver_instructions(
transport_id, timestamp, stego_method, password_hint=password[:3] + '*' * (len(password) - 3)
)
# 保存接收方指南
guide_file = output_path + ".guide"
with open(guide_file, 'w', encoding='utf-8') as f:
f.write(receiver_instructions)
# 计算总时间
total_time = time.time() - start_time
result = {
'success': True,
'transport_id': transport_id,
'timestamp': timestamp,
'stego_method': stego_method,
'message_length': message_length,
'psnr': psnr_value,
'detection_risk': "低" if not detection_attempt['detected'] else "高",
'detection_confidence': detection_attempt['confidence'],
'processing_time': f"{total_time:.2f} 秒",
'output_file': output_path,
'guide_file': guide_file,
'receiver_hint': password[:3] + '*' * (len(password) - 3)
}
print("\n" + "="*60)
print("安全通信系统操作完成!")
print(f"传输ID: {transport_id}")
print(f"时间戳: {timestamp}")
print(f"输出文件: {output_path}")
print(f"接收指南: {guide_file}")
print(f"PSNR值: {psnr_value:.2f} dB")
print(f"检测风险: {result['detection_risk']}")
print(f"处理时间: {result['processing_time']}")
print("="*60)
return result
def receive_secure_message(stego_video_path, password, stego_method='adaptive'):
"""接收并解密安全消息
Args:
stego_video_path: 隐写视频路径
password: 解密密码
stego_method: 隐写方法 ('adaptive', 'statistical', 'frequency')
Returns:
str: 解密后的消息
"""
print(f"接收安全消息...")
print(f"使用隐写方法: {stego_method}")
# 从隐写视频中提取数据
print("\n1. 从视频中提取数据...")
if stego_method == 'adaptive':
# 对于自适应隐写,需要提取足够的数据
extracted_data = extract_from_content_aware_stego(stego_video_path, max_bytes=100000)
elif stego_method == 'statistical':
# 对于统计隐写,需要估计可能的大小
extracted_data = extract_statistical_steganography(stego_video_path, total_bits=800000) # 假设最大100KB
elif stego_method == 'frequency':
extracted_data = extract_frequency_domain_stego(stego_video_path)
else:
print(f"不支持的隐写方法: {stego_method}")
return ""
if not extracted_data:
print("未能从视频中提取数据")
return ""
print(f"提取到的数据长度: {len(extracted_data)} 字符")
# 2. 解析头部信息
print("\n2. 解析头部信息...")
import re
length_match = re.search(r'LENGTH:(\d{10})\|', extracted_data)
if not length_match:
print("无法找到消息长度头部")
return ""
try:
message_length = int(length_match.group(1))
print(f"消息长度: {message_length} 字节")
# 提取实际的编码消息
encoded_message = extracted_data[length_match.end():]
except Exception as e:
print(f"解析头部信息时出错: {e}")
return ""
# 3. 解码和解密消息
print("\n3. 解码和解密消息...")
try:
# 解码base64
encrypted_message = base64.b64decode(encoded_message)
# 生成加密密钥和初始化向量
key, iv = derive_key_and_iv(password)
# 解密消息
decrypted_message = decrypt_message(encrypted_message, key, iv)
print(f"解密成功!消息长度: {len(decrypted_message)} 字符")
print(f"解密后消息: '{decrypted_message[:100]}...'" if len(decrypted_message) > 100 else f"解密后消息: '{decrypted_message}'")
return decrypted_message
except Exception as e:
print(f"解码或解密时出错: {e}")
print("可能是密码错误或数据损坏")
return ""
def derive_key_and_iv(password, key_length=32, iv_length=16, iterations=100000):
"""从密码派生密钥和初始化向量
Args:
password: 输入密码
key_length: 密钥长度(字节)
iv_length: 初始化向量长度(字节)
iterations: 迭代次数
Returns:
tuple: (key, iv)
"""
import hashlib
# 使用PBKDF2从密码派生密钥材料
salt = b'secure_stego_salt_2025' # 在实际应用中应该使用随机盐
key_material = hashlib.pbkdf2_hmac(
'sha256',
password.encode(),
salt,
iterations,
dklen=key_length + iv_length
)
# 分离密钥和初始化向量
key = key_material[:key_length]
iv = key_material[key_length:key_length + iv_length]
return key, iv
def encrypt_message(message, key, iv):
"""加密消息
Args:
message: 要加密的消息
key: 加密密钥
iv: 初始化向量
Returns:
bytes: 加密后的消息
"""
# 确保消息是字节类型
if isinstance(message, str):
message_bytes = message.encode('utf-8')
else:
message_bytes = message
# 使用AES-256-CBC加密
backend = default_backend()
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=backend)
encryptor = cipher.encryptor()
# 填充消息以使其长度为16的倍数(PKCS7填充)
padding_length = 16 - (len(message_bytes) % 16)
padded_message = message_bytes + (chr(padding_length) * padding_length).encode()
# 执行加密
ciphertext = encryptor.update(padded_message) + encryptor.finalize()
return ciphertext
def decrypt_message(ciphertext, key, iv):
"""解密消息
Args:
ciphertext: 加密的消息
key: 解密密钥
iv: 初始化向量
Returns:
str: 解密后的消息
"""
# 使用AES-256-CBC解密
backend = default_backend()
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=backend)
decryptor = cipher.decryptor()
# 执行解密
padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()
# 移除PKCS7填充
padding_length = padded_plaintext[-1]
plaintext = padded_plaintext[:-padding_length]
# 转换为字符串
return plaintext.decode('utf-8')
def frequency_domain_steganography(video_path, output_path, secret_text):
"""频域隐写方法
Args:
video_path: 输入视频路径
output_path: 输出视频路径
secret_text: 秘密文本
Returns:
dict: 隐写结果
"""
# 读取视频
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {video_path}")
return None
# 获取视频属性
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# 创建输出视频
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
# 准备秘密数据
bits = text_to_bits(secret_text)
total_bits = len(bits)
bit_index = 0
embedded_bits = 0
# 处理视频的每个帧
processed_frames = 0
while True:
ret, frame = cap.read()
if not ret:
break
# 创建帧的副本
stego_frame = frame.copy()
# 对每个颜色通道进行处理
for c in range(3): # BGR通道
# 获取当前通道
channel = stego_frame[:, :, c].astype(np.float32)
# 执行离散余弦变换(DCT)
dct = cv2.dct(channel)
# 只处理频域中的中频部分(避开DC分量和高频噪声)
h, w = dct.shape
start_row, end_row = max(0, h // 4), min(h, h * 3 // 4)
start_col, end_col = max(0, w // 4), min(w, w * 3 // 4)
# 嵌入数据到中频系数
for i in range(start_row, end_row, 2): # 每隔一个像素嵌入以减少影响
for j in range(start_col, end_col, 2):
if bit_index >= total_bits:
break
# 修改中频系数的最低有效位
# 由于DCT系数可以是负数,我们需要特殊处理
coeff_abs = abs(dct[i, j])
coeff_sign = 1 if dct[i, j] >= 0 else -1
# 确保系数足够大以进行修改
if coeff_abs > 1:
# 修改绝对值的最低有效位
new_abs = (int(coeff_abs) & ~1) | bits[bit_index]
dct[i, j] = float(new_abs) * coeff_sign
embedded_bits += 1
bit_index += 1
# 如果所有数据都已嵌入,退出循环
if bit_index >= total_bits:
break
# 执行逆离散余弦变换(IDCT)
idct = cv2.idct(dct)
# 更新通道
stego_frame[:, :, c] = np.clip(idct, 0, 255).astype(np.uint8)
# 如果所有数据都已嵌入,退出循环
if bit_index >= total_bits:
break
# 写入隐写帧
out.write(stego_frame)
processed_frames += 1
# 显示进度
if processed_frames % 100 == 0:
print(f"处理进度: {processed_frames}/{frame_count}")
# 如果所有数据都已嵌入,可以提前结束
if bit_index >= total_bits:
# 复制剩余的帧
while True:
ret, frame = cap.read()
if not ret:
break
out.write(frame)
processed_frames += 1
break
# 释放资源
cap.release()
out.release()
result = {
'success': os.path.exists(output_path),
'total_bits': total_bits,
'embedded_bits': embedded_bits,
'embedding_ratio': embedded_bits / total_bits if total_bits > 0 else 0,
'frames_processed': processed_frames
}
print(f"频域隐写完成!")
print(f"嵌入位数: {embedded_bits}/{total_bits}")
print(f"处理帧数: {processed_frames}")
return result
def extract_frequency_domain_stego(video_path, expected_bits=800000):
"""从频域隐写视频中提取数据
Args:
video_path: 隐写视频路径
expected_bits: 预期提取的位数
Returns:
str: 提取的文本
"""
# 读取视频
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {video_path}")
return ""
extracted_bits = []
while True:
ret, frame = cap.read()
if not ret:
break
# 对每个颜色通道进行处理
for c in range(3): # BGR通道
# 获取当前通道
channel = frame[:, :, c].astype(np.float32)
# 执行离散余弦变换(DCT)
dct = cv2.dct(channel)
# 只处理频域中的中频部分
h, w = dct.shape
start_row, end_row = max(0, h // 4), min(h, h * 3 // 4)
start_col, end_col = max(0, w // 4), min(w, w * 3 // 4)
# 从中频系数提取数据
for i in range(start_row, end_row, 2):
for j in range(start_col, end_col, 2):
if len(extracted_bits) >= expected_bits:
break
# 获取系数的绝对值的最低有效位
coeff_abs = abs(dct[i, j])
if coeff_abs > 1: # 只考虑足够大的系数
bit = int(coeff_abs) & 1
extracted_bits.append(bit)
if len(extracted_bits) >= expected_bits:
break
if len(extracted_bits) >= expected_bits:
break
# 释放资源
cap.release()
# 将位序列转换回文本
if extracted_bits:
extracted_text = bits_to_text(extracted_bits)
return extracted_text
else:
return ""
def extract_from_content_aware_stego(video_path, max_bytes=100000):
"""从内容感知隐写视频中提取数据
Args:
video_path: 隐写视频路径
max_bytes: 最大提取字节数
Returns:
str: 提取的文本
"""
# 读取视频
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {video_path}")
return ""
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
max_bits = max_bytes * 8
extracted_bits = []
block_size = 8
for frame_idx in range(frame_count):
ret, frame = cap.read()
if not ret:
break
# 计算视觉显著性图
saliency_map = calculate_saliency(frame)
saliency_map = cv2.normalize(saliency_map, None, 0, 1, cv2.NORM_MINMAX)
h, w = frame.shape[:2]
# 遍历每个块
for i in range(0, h, block_size):
for j in range(0, w, block_size):
if len(extracted_bits) >= max_bits:
break
# 获取块的边界
block_h_end = min(i + block_size, h)
block_w_end = min(j + block_size, w)
# 计算块的平均显著性
block_saliency = np.mean(saliency_map[i:block_h_end, j:block_w_end])
# 低显著性区域可能包含嵌入数据
if block_saliency < 0.5: # 假设质量因子约为0.5
# 收集块中的像素
block_pixels = []
for y in range(i, block_h_end):
for x in range(j, block_w_end):
block_pixels.append((y, x))
# 从块中提取位
for idx, (y, x) in enumerate(block_pixels):
if len(extracted_bits) >= max_bits:
break
# 从所有颜色通道提取
for c in range(3):
if len(extracted_bits) >= max_bits:
break
# 提取最低有效位
bit = frame[y, x, c] & 1
extracted_bits.append(bit)
if len(extracted_bits) >= max_bits:
break
if len(extracted_bits) >= max_bits:
break
# 释放资源
cap.release()
# 将位序列转换为文本
if extracted_bits:
extracted_text = bits_to_text(extracted_bits)
return extracted_text
else:
return ""
def calculate_psnr(original_video, stego_video):
"""计算两个视频之间的PSNR(峰值信噪比)
Args:
original_video: 原始视频路径
stego_video: 隐写视频路径
Returns:
float: PSNR值(dB)
"""
# 打开两个视频
cap1 = cv2.VideoCapture(original_video)
cap2 = cv2.VideoCapture(stego_video)
if not cap1.isOpened() or not cap2.isOpened():
print("无法打开视频文件进行PSNR计算")
return 0.0
# 获取视频属性
frame_count1 = int(cap1.get(cv2.CAP_PROP_FRAME_COUNT))
frame_count2 = int(cap2.get(cv2.CAP_PROP_FRAME_COUNT))
# 确保两个视频有相同的帧数
min_frames = min(frame_count1, frame_count2)
total_mse = 0.0
processed_frames = 0
# 计算每个对应帧的MSE
for i in range(min_frames):
ret1, frame1 = cap1.read()
ret2, frame2 = cap2.read()
if not ret1 or not ret2:
break
# 计算MSE
mse = np.mean((frame1.astype(np.float64) - frame2.astype(np.float64)) ** 2)
total_mse += mse
processed_frames += 1
# 释放资源
cap1.release()
cap2.release()
if processed_frames == 0:
return 0.0
# 计算平均MSE
avg_mse = total_mse / processed_frames
# 计算PSNR
if avg_mse == 0:
return float('inf') # 如果没有差异,PSNR为无穷大
max_pixel = 255.0
psnr = 20 * np.log10(max_pixel / np.sqrt(avg_mse))
return psnr
def quick_steganalysis_check(video_path):
"""快速隐写分析检查
Args:
video_path: 待检测视频路径
Returns:
dict: 检测结果
"""
# 读取视频
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {video_path}")
return {'detected': False, 'confidence': 0.0}
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frames_to_analyze = min(10, frame_count) # 只分析10帧以加快速度
step = max(1, frame_count // frames_to_analyze)
# 存储LSB统计信息
lsb_stats = []
for i in range(frames_to_analyze):
# 设置帧位置
cap.set(cv2.CAP_PROP_POS_FRAMES, i * step)
ret, frame = cap.read()
if not ret:
continue
# 计算LSB统计
b, g, r = cv2.split(frame)
# 计算每个通道的LSB0和LSB1数量
lsb0_b = np.sum((b & 1) == 0)
lsb1_b = np.sum((b & 1) == 1)
lsb0_g = np.sum((g & 1) == 0)
lsb1_g = np.sum((g & 1) == 1)
lsb0_r = np.sum((r & 1) == 0)
lsb1_r = np.sum((r & 1) == 1)
# 计算LSB平衡性(理想情况下应该接近1)
balance_b = min(lsb0_b, lsb1_b) / max(lsb0_b, lsb1_b + 1e-10)
balance_g = min(lsb0_g, lsb1_g) / max(lsb0_g, lsb1_g + 1e-10)
balance_r = min(lsb0_r, lsb1_r) / max(lsb0_r, lsb1_r + 1e-10)
# 记录平均平衡度
avg_balance = (balance_b + balance_g + balance_r) / 3.0
lsb_stats.append(avg_balance)
# 释放资源
cap.release()
if not lsb_stats:
return {'detected': False, 'confidence': 0.0}
# 计算平均平衡度
avg_balance_all = np.mean(lsb_stats)
# 如果平衡度显著偏离1,可能存在隐写
# 这里使用简单的阈值,实际应用中可能需要更复杂的检测方法
# 隐写通常会使LSB更接近均匀分布(平衡度接近1)
# 但如果使用了高级隐写方法,可能不会有明显变化
# 注意:这个检测可能会产生误报,特别是对于内容复杂的视频
detected = avg_balance_all > 0.98 # 如果太平衡,可能有隐写
confidence = min(1.0, abs(avg_balance_all - 0.5) * 2.0) # 距离0.5越远,置信度越高
return {
'detected': detected,
'confidence': confidence,
'avg_balance': avg_balance_all
}
def generate_receiver_instructions(transport_id, timestamp, method, password_hint):
"""生成接收方指南
Args:
transport_id: 传输ID
timestamp: 时间戳
method: 隐写方法
password_hint: 密码提示
Returns:
str: 接收方指南文本
"""
instructions = f"""=== 安全通信接收指南 ===
传输ID: {transport_id}
创建时间: {timestamp}
重要提示:
此文件包含接收安全消息所需的关键信息。请妥善保管,不要与视频文件分开传输。
接收步骤:
1. 使用配套的视频文件和以下信息解密消息
2. 确保您使用正确的接收程序版本
3. 输入完整密码(不是提示)进行解密
解密参数:
- 隐写方法: {method}
- 密码提示: {password_hint}
安全警告:
- 请勿将此指南与视频文件通过相同渠道传输
- 请在消息接收后删除此指南
- 如遇解密失败,请确认密码是否正确,并重试
=== 结束 ==="""
return instructions
def secure_communication_demo(input_video):
"""安全通信系统演示
Args:
input_video: 输入视频路径
"""
if not os.path.exists(input_video):
print(f"错误:视频文件 '{input_video}' 不存在")
return
# 演示发送消息
print("===== 发送方演示 =====")
# 秘密消息
secret_message = "这是一条使用视频隐写技术传输的高度机密消息。该技术结合了先进的加密算法和隐写术,确保信息在传输过程中的安全性和隐蔽性。只有掌握正确密钥的接收方才能提取并阅读此消息。"
# 加密密码
password = "SecurePass2025!"
# 输出文件
output_video = "secure_communication_video.mp4"
# 执行安全通信系统
result = secure_communication_system(
input_video, output_video, secret_message,
password=password,
stego_method='adaptive'
)
if not result or not result['success']:
print("发送演示失败!")
return
print("\n" + "="*60)
print("\n===== 接收方演示 =====")
print("模拟接收方接收消息...")
# 模拟接收方接收并解密消息
if os.path.exists(output_video):
print(f"正在从 '{output_video}' 接收消息...")
# 正确密码测试
print("\n测试1: 使用正确密码")
received_message = receive_secure_message(output_video, password, stego_method='adaptive')
# 错误密码测试
print("\n测试2: 使用错误密码")
wrong_password = "WrongPass123!"
wrong_received = receive_secure_message(output_video, wrong_password, stego_method='adaptive')
# 验证结果
print("\n" + "="*60)
print("安全通信系统演示总结:")
print(f"消息传输 {'成功' if received_message == secret_message else '失败'}")
print(f"安全验证: {'通过' if wrong_received == '' else '失败(错误密码也能解密)'}")
# 使用示例
# secure_communication_demo('input_video.mp4')本文系统梳理了视频隐写的关键思想、实现路径与从帧到容器的多通道嵌入示例(含接收端提取与校验流程)。为保证文档结尾清晰、可复现,给出如下技术总结与操作指引: