现如今无论是谷歌百度搜索知识学习,还是淘宝京东购物都离不开文字关键词的搜索。但现在很多平台或者应用有大量的视频,还有某些跟视频打交道的应用比如视频编辑器,视频自动化处理工具等,这些工具如果只有简单的文本搜索就远远不够用了,搜索体验肯定会大打折扣;由此引出我们今天的主题:
如何使用多模态RAG实现文本到视频内容的检索
RAG(Retrieval-Augmented Generation): RAG 是一种结合了检索系统和大型语言模型优势的生成技术。传统的语言模型在生成内容时往往依赖于自身的知识库,这可能导致生成结果缺乏实时性和准确性。而RAG技术通过从外部知识库中检索相关信息,再将这些信息与用户查询一起传递给语言模型,从而生成更加精准、相关且时效性强的内容。简单点就是说结合大模型和网络搜索的内容,整合后再返回给你,让你看到既新又准确的答案;
多模态RAG: 则是将RAG的理念扩展到多种数据模态中,包括文本、图片、音频、视频等。这种技术使得AI不仅能处理文本数据,还能理解和处理图片、音频等这些更多模态的数据;其实说白了就是普通RAG只支持文本,多模态拓展到图片、音视频;
嵌入式模型:嵌入式模型(Embedding)是一种广泛应用于自然语言处理(NLP)和计算机视觉(CV)等领域的机器学习模型,它可以将高维度的数据转化为低维度的嵌入空间(embedding space),并保留原始数据的特征和语义信息,从而提高模型的效率和准确性。简单来说就是特征提取。
光听概念很枯燥,还是接下来看如何一步步实现这个文本到视频的检索吧。
img
(图1)
img
(图2)
img
(图3)
我们的目标就是将这3张图片处理成向量数据,计算它们的向量相似度
from PIL import Image
import torch
from transformers import AutoProcessor, BridgeTowerProcessor, BridgeTowerModel,BridgeTowerForContrastiveLearning
import numpy as np
from numpy.linalg import norm
def bt_embedding_from_local_pretrained(prompt,image_path):
#使用的是这个模型:https://huggingface.co/BridgeTower/bridgetower-large-itm-mlm-itc
model_name = "BridgeTower/bridgetower-large-itm-mlm-itc"
processor = AutoProcessor.from_pretrained(model_name)
model = BridgeTowerForContrastiveLearning.from_pretrained(model_name)
text_only = False
# 进行预处理
if image_path!=Noneand image_path != '':
image= Image.open(image_path)
inputs = processor(images=image, text=prompt, return_tensors="pt")
else:
image= Image.open("dummy.jpg")
inputs = processor(images=image,text=prompt, return_tensors="pt")
text_only = True
# 获取模型输出
with torch.no_grad():
outputs = model(**inputs)
# 这里我们假设需要的是池化后的特征
if text_only:
embeddings = outputs.text_embeds
else:
embeddings = outputs.cross_embeds
# 转换为列表形式
embeddings_list = embeddings.squeeze().tolist()
return embeddings_list
#图片信息
url1='http://farm3.staticflickr.com/2519/4126738647_cc436c111b_z.jpg'
cap1='A motorcycle sits parked across from a herd of livestock'
url2='http://farm3.staticflickr.com/2046/2003879022_1b4b466d1d_z.jpg'
cap2='Motorcycle on platform to be worked on in garage'
url3='http://images.cocodataset.org/val2017/000000360943.jpg'
cap3='a cat laying down stretched out near a laptop'
#这里是上图1 2 3信息
img1 = {
'flickr_url': url1,
'caption': cap1,
'image_path' : 'd:/下载/rag/materials/motorcycle_1.jpg'
}
img2 = {
'flickr_url': url2,
'caption': cap2,
'image_path' : 'd:/下载/rag/materials/motorcycle_2.jpg'
}
img3 = {
'flickr_url' : url3,
'caption': cap3,
'image_path' : 'd:/下载/rag/materials/cat_1.jpg'
}
imgs = [img1, img2, img3]
embeddings = []
for img in [img1, img2, img3]:
img_path = img['image_path']
caption = img['caption']
#这里嵌入图片和文本
embedding = bt_embedding_from_local_pretrained(caption, img_path)
embeddings.append(embedding)
print(embedding)
print("length:",len(embeddings[0]))
输出:
得到如下样式的向量数组和打印数组长度
这里不同的模型可能会得到不同的向量数据,处理起来可能用不同的效果,需要自行试验;
我们用到的是余弦相似度算法
:
"因为我们上面嵌入式模型得到的是向量数据,向量数据是可以计算相似度的,利用余弦夹角
的概念可以计算向量的空间距离,空间距离越近,两个向量的相似度便越高。如果大家了解颜色表RGB
的话就比较容易理解,举个例子(255, 0, 0)
就是纯红色,(255, 10, 10)
也是红色,但是不是纯红色。如果把(255, 0, 0)
和(255, 10, 10)
映射到一个三维的空间坐标图上它们的距离就很近,但是它们和纯蓝色(0, 0, 255)
的空间距离就很远,因为一个贴近X
轴,一个贴近Z
轴。现在大家所熟知的向量数据库,大概采用的就是类似的原理。也是现在流行的RAG
检索增强生成的基础。"(引用一念大佬的原文)
# 计算余弦计算相似度
def cosine_similarity(vec1, vec2):
similarity = np.dot(vec1,vec2)/(norm(vec1)*norm(vec2))
return similarity
ex1_embed = np.array(embeddings[0])
ex2_embed = np.array(embeddings[1])
ex3_embed = np.array(embeddings[2])
# 图片1&2 1&3一组
sim_ex1_ex2 = cosine_similarity(ex1_embed, ex2_embed)
sim_ex1_ex3 = cosine_similarity(ex1_embed, ex3_embed)
print(f"图片1&2的余弦相似度是:{sim_ex1_ex2}")
print(f"图片1&3的余弦相似度是:{sim_ex1_ex3}")
输出:
图片1&2的余弦相似度是:0.4851664642889189 图片1&3的余弦相似度是:0.14224603129566593
对比结果准确,图片1&2摩托车的相似度远远大于1&3摩托车和猫的;
这里重申本文目标:使用多模态RAG实现文本到视频内容的检索
到这步我们要进行视频数据的处理了,我们把视频分成两类:
以这个视频为例:https://www.youtube.com/watch?v=33bZIOLX4do
大家自行下载视频到本地,有很多在线下载工具的,如:https://yt1d.com/en12/
captions.vtt
WEBVTT
00:00:00.120 --> 00:00:01.360
today we are flying
00:00:01.360 --> 00:00:04.080
to China with tensions between China and the West,
00:00:04.080 --> 00:00:06.840
the lengthy visa process, and the recent pandemic.
00:00:06.840 --> 00:00:09.840
China has become unknown to the outside world,
00:00:09.840 --> 00:00:11.200
and over the next 4 weeks
......more
提取方式省略了,有很多工具可以实现,比如火山或者微软的字幕接口等;甚至还有免费的工具;
#根据字幕提取帧报错为json
def extract_and_save_frames_and_metadata(
path_to_video,
path_to_transcript,
path_to_save_extracted_frames,
path_to_save_metadatas):
ifnot os.path.exists(path_to_transcript):
print("字幕不存在")
exit()
# metadatas will store the metadata of all extracted frames
metadatas = []
# load video using cv2
video = cv2.VideoCapture(path_to_video)
# load transcript using webvtt
trans = webvtt.read(path_to_transcript)
# iterate transcript file
# for each video segment specified in the transcript file
for idx, transcript in enumerate(trans):
# get the start time and end time in seconds
start_time_ms = str2time(transcript.start)
end_time_ms = str2time(transcript.end)
# get the time in ms exactly
# in the middle of start time and end time
# read frame on mid of capthion
mid_time_ms = (end_time_ms + start_time_ms) / 2
# get the transcript, remove the next-line symbol
text = transcript.text.replace("\n", ' ')
# get frame at the middle time
video.set(cv2.CAP_PROP_POS_MSEC, mid_time_ms)
success, frame = video.read()
if success:
# if the frame is extracted successfully, resize it
image = maintain_aspect_ratio_resize(frame, height=350)
# save frame as JPEG file
img_fname = f'frame_{idx}.jpg'
img_fpath = osp.join(
path_to_save_extracted_frames, img_fname
)
imr= cv2.imwrite(img_fpath, image)
# prepare the metadata
metadata = {
'extracted_frame_path': img_fpath,
'transcript': text,
'video_segment_id': idx,
'video_path': path_to_video,
'mid_time_ms': mid_time_ms,
}
metadatas.append(metadata)
else:
print(f"ERROR! Cannot extract frame: idx = {idx}")
# save metadata of all extracted frames
fn = osp.join(path_to_save_metadatas, 'metadatas.json')
with open(fn, 'w', encoding='utf-8') as outfile:
json.dump(metadatas, outfile, ensure_ascii=False, indent=4)
return metadatas
#视频存储文件夹
vid1_dir = "./shared_data/videos/travelcn2024"
vid1_filepath = os.path.join(vid1_dir, "travel_china2024.mp4")
vid1_transcript_filepath = os.path.join(vid1_dir, "captions.vtt")
# output paths to save extracted frames and their metadata
extracted_frames_path = osp.join(vid1_dir, 'extracted_frame')
metadatas_path = vid1_dir
# create these output folders if not existing
Path(extracted_frames_path).mkdir(parents=True, exist_ok=True)
Path(metadatas_path).mkdir(parents=True, exist_ok=True)
## 1、Jenerate frames and metadatas
# call the function to extract frames and metadatas
metadatas = extract_and_save_frames_and_metadata(
vid1_filepath,
vid1_transcript_filepath,
extracted_frames_path,
metadatas_path,
)
print(metadatas)
保存的json文件的元数据:
[
{
"extracted_frame_path": "./shared_data/videos/travelcn2024\\extracted_frame\\frame_0.jpg",
"transcript": "today we are flying",
"video_segment_id": 0,
"video_path": "./shared_data/videos/travelcn2024\\travel_china2024.mp4",
"mid_time_ms": 740.0
},
{
"extracted_frame_path": "./shared_data/videos/travelcn2024\\extracted_frame\\frame_1.jpg",
"transcript": "to China with tensions between China and the West,",
"video_segment_id": 1,
"video_path": "./shared_data/videos/travelcn2024\\travel_china2024.mp4",
"mid_time_ms": 2720.0
},
{
"extracted_frame_path": "./shared_data/videos/travelcn2024\\extracted_frame\\frame_2.jpg",
"transcript": "the lengthy visa process, and the recent pandemic.",
"video_segment_id": 2,
"video_path": "./shared_data/videos/travelcn2024\\travel_china2024.mp4",
"mid_time_ms": 5460.0
}
......
]
处理一遍元数据以便具有上下文信息
# 处理元数据信息
def preprocess_transcript(vid_metadata_path):
if not os.path.exists(vid_metadata_path):
return
vid_metadata = load_json_file_GBK(vid_metadata_path)
vid_trans = [vid['transcript'] for vid in vid_metadata]
vid_img_path = [vid['extracted_frame_path'] for vid in vid_metadata]
# 1、原字幕处理成具有上下文信息的新数组,数组长度一样
n = 7 #这里取7
updated_vid_trans = [
' '.join(vid_trans[i-int(n/2) : i+int(n/2)]) if i-int(n/2) >= 0 else
' '.join(vid_trans[0 : i + int(n/2)]) for i in range(len(vid_trans))
]
for i in range(len(updated_vid_trans)):
vid_metadata[i]['transcript'] = updated_vid_trans[i]
print(f'第六句处理前:\n"{vid_trans[6]}"')
print(f'第六句处理后:\n"{updated_vid_trans[6]}"')
return vid_trans,updated_vid_trans,vid_img_path,vid_metadata
vid_metadata_path=osp.join(metadatas_path, 'metadatas.json')
#这里最终会返回处理后的json元数据,后面需要保存的
vid_trans,updated_vid_trans,vid_img_path,vid_metadata=preprocess_transcript(vid_metadata_path)
输出:
第六句处理前: "and seeing what it's really like." 第六句处理后: "China has become unknown to the outside world, and over the next 4 weeks we will be traveling through the country and seeing what it's really like. we're currently in Heathrow Airport and are about to fly to Beijing, China."
如果看到原始字幕的话,可以看到其实是把前后几句重复串在一起了;
篇幅问题,处理过程省略,有空单开一篇。
大概处理过程是:先用视觉模型将帧描述出来,再处理为如上的json元数据;
我自己用的视觉模型为:https://github.com/haotian-liu/LLaVA,或者也可以直接用Chatgpt4这种商业视觉模型做测试,但生产用商业模型不现实,太贵了;
LanceDB官网:https://lancedb.github.io/lancedb/
github: https://github.com/lancedb/lancedb
LanceDB的定位就是一个向量数据库,专门用于嵌入式模型数据的存储、管理与检索的,它的设计就是完全为了这种AI模型的数据的和应用的。
创建BridgeTowerEmbeddings类
rom typing import List
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import (
BaseModel,
)
from utils import encode_image, bt_embedding_from_prediction_guard, bt_embedding_from_local_pretrained
from tqdm import tqdm
class BridgeTowerEmbeddings(BaseModel, Embeddings):
""" BridgeTower embedding model """
def embed_documents(self, texts: List[str]) -> List[List[float]]:
embeddings = []
for text in texts:
embedding= bt_embedding_from_local_pretrained(text,"")
embeddings.append(embedding)
return embeddings
def embed_query(self, text: str) -> List[float]:
return self.embed_documents([text])[0]
def embed_image_text_pairs(self, texts: List[str], images: List[str], batch_size=2) -> List[List[float]]:
# the length of texts must be equal to the length of images
assert len(texts)==len(images), "the len of captions should be equal to the len of images"
embeddings = []
for path_to_img, text in tqdm(zip(images, texts), total=len(texts)):
embedding= bt_embedding_from_local_pretrained(text,path_to_img)
embeddings.append(embedding)
return embeddings
这个类主要的作用是处理进入LanceDB的数据和查询语句;
创建数据操作类
from typing import Any, Iterable, List, Optional
from langchain_core.embeddings import Embeddings
import uuid
from langchain_community.vectorstores.lancedb import LanceDB
class MultimodalLanceDB(LanceDB):
def __init__(
self,
connection: Optional[Any] = None,
embedding: Optional[Embeddings] = None,
uri: Optional[str] = "/tmp/lancedb",
vector_key: Optional[str] = "vector",
id_key: Optional[str] = "id",
text_key: Optional[str] = "text",
image_path_key: Optional[str] = "image_path",
table_name: Optional[str] = "vectorstore",
api_key: Optional[str] = None,
region: Optional[str] = None,
mode: Optional[str] = "append",
):
super(MultimodalLanceDB, self).__init__(connection, embedding, uri, vector_key, id_key, text_key, table_name, api_key, region, mode)
self._image_path_key = image_path_key
#插入文本<-> 图片对应记录
def add_text_image_pairs(
self,
texts: Iterable[str],
image_paths: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
# the length of texts must be equal to the length of images
assert len(texts)==len(image_paths), "the len of transcripts should be equal to the len of images"
# Embed texts and create documents
docs = []
ids = ids or [str(uuid.uuid4()) for _ in texts]
embeddings = self._embedding.embed_image_text_pairs(texts=list(texts), images=list(image_paths)) # type: ignore
for idx, text in enumerate(texts):
embedding = embeddings[idx]
metadata = metadatas[idx] if metadatas else {"id": ids[idx]}
docs.append(
{
self._vector_key: embedding,
self._id_key: ids[idx],
self._text_key: text,
self._image_path_key : image_paths[idx],
"metadata": metadata,
}
)
if'mode'in kwargs:
mode = kwargs['mode']
else:
mode = self.mode
if self._table_name in self._connection.table_names():
tbl = self._connection.open_table(self._table_name)
if self.api_key is None:
tbl.add(docs, mode=mode)
else:
tbl.add(docs)
else:
self._connection.create_table(self._table_name, data=docs)
return ids
#定义个类函数,供外部调用插入文本->图片嵌入向量数据
@classmethod
def from_text_image_pairs(
cls,
texts: List[str],
image_paths: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
connection: Any = None,
vector_key: Optional[str] = "vector",
id_key: Optional[str] = "id",
text_key: Optional[str] = "text",
image_path_key: Optional[str] = "image_path",
table_name: Optional[str] = "vectorstore",
**kwargs: Any,
):
instance = MultimodalLanceDB(
connection=connection,
embedding=embedding,
vector_key=vector_key,
id_key=id_key,
text_key=text_key,
image_path_key=image_path_key,
table_name=table_name,
)
instance.add_text_image_pairs(texts, image_paths, metadatas=metadatas, **kwargs)
return instance
这个类主要是作用是做输出插入和检索;
插入前面处理好的视频数据
#初始化数据配置
LANCEDB_HOST_FILE = "./shared_data/.lancedb_test"
TBL_NAME = "test_tbl"
db = lancedb.connect(LANCEDB_HOST_FILE)
#示例化一个表实例
tbl = db.open_table(TBL_NAME)
#先打印看看操作前的表记录数
print(f"Before adding data, There are {tbl.to_pandas().shape[0]} rows in the table")
embedder = BridgeTowerEmbeddings()
dbInstance = MultimodalLanceDB.from_text_image_pairs(
texts=updated_vid_trans,
image_paths=vid_img_path,
embedding=embedder,
metadatas=vid_metadata,
connection=db,
table_name=TBL_NAME,
mode="append", #overwrite
)
tbl = db.open_table(TBL_NAME)
print(f"After adding data, There are {tbl.to_pandas().shape[0]} rows in the table")
输出:
Before adding data, There are 0 rows in the table After adding data, There are 124 rows in the table
到这步为止,我们已经把视频分解,嵌入式模型BridgeTower处理为向量数据,并已经插入到LanceDB了。那么接下来就看怎么检索这些视频数据了。
from mm_rag.embeddings.bridgetower_embeddings import (
BridgeTowerEmbeddings
)
from mm_rag.vectorstores.multimodal_lancedb import MultimodalLanceDB
import lancedb
from PIL import Image
from utils import bt_embedding_from_local_pretrained
import webbrowser
import os
#初始化查询参数
embedder = BridgeTowerEmbeddings()
LANCEDB_HOST_FILE = "./shared_data/.lancedb_test"
TBL_NAME = "test_tbl"
db = lancedb.connect(LANCEDB_HOST_FILE)
#创建
vectorstore = MultimodalLanceDB(
uri=LANCEDB_HOST_FILE,
embedding=embedder,
table_name=TBL_NAME
)
tbl = db.open_table(TBL_NAME)
#搜索图片帧,size是搜索结果的数量
def search_by_prompt(prompt,size):
retriever = vectorstore.as_retriever(
search_type='similarity',
search_kwargs={"k": size}
)
results = retriever.invoke(prompt)
print(f"Searching for \"{prompt}\"*********************************************\n")
#打印出搜索结果
# display_retrieved_results(results)
#打开搜索结果的图片
for i in range(len(results)):
print(results[i])
image_path=results[i].metadata['extracted_frame_path']
Image.open(image_path).show()
搜索看看
#这里要搜索:一个婴儿和大人
search_by_prompt("A toddler and an adult",2)
这里直接调用系统应用打开搜索到的图片了,我这里搜索2张,会按最相关度排序,最匹配的放前面;这里根据我自己初始化的视频来看,这里已经达到预期了,其他的搜索结果我这里就不一一演示了。
image-20241130213516952
这里先写个html页面video.html,用来播放指定片段的视频
<!DOCTYPE html>
<head>
<title>视频片段</title>
</head>
<body>
<div id="app">
<video id="video1" :src="video_url" controls autoplay muted height="720" @timeupdate="timeupdate($event)" @play="checkStartTime($event)" > </video>
</div>
<script src="https://unpkg.com/vue@3/dist/vue.global.js"></script>
<script>
const { createApp } = Vue
createApp({
data() {
return {
video_url: "./videos/video2/toddler_in_playground.mp4",
start: 2,
end: 5
}
}
, methods: {
timeupdate(event) {
const videoElement = event.target;
if (videoElement.currentTime > this.end) {
videoElement.pause();
}
},
checkStartTime(event) {
const videoElement = event.target;
if (videoElement.currentTime < this.start) {
videoElement.currentTime = this.start;
}
}
}, mounted() {
const urlParams = new URLSearchParams(window.location.search);
const ms = urlParams.get('ms');
const vpath = urlParams.get('vpath');
const vname = urlParams.get('vname');
this.start = (ms / 1000) - 3
this.end = ms / 1000 + 3
if (this.start < 0) {
this.start = 0
}
console.log(this.start, this.end,vpath,vname);
this.video_url =`./videos/${vpath}/${vname}`;
}
}).mount('#app')
</script>
</body>
</html
这个html是可选的,其实也可以打印搜索数据,自己手工播放也行;
搜索视频片段
#搜索视频片段,搜到后调用浏览器播放对应视频片段
def search_segment_by_prompt(prompt):
retriever = vectorstore.as_retriever(
search_type='similarity',
search_kwargs={"k": 1}
)
results = retriever.invoke(prompt)
ms=results[0].metadata['mid_time_ms']
video_path=results[0].metadata['video_path']
print(ms,video_path)
# split the path and get the last two part of them
normalized_path = os.path.normpath(video_path)
parts = normalized_path.split(os.sep)
result = parts[-2:]
#这里会调起上面的video.html,播放搜索到的视频片段
url = f'http://127.0.0.1:5500/deeplearing_rag/lesson/shared_data/video.html?ms={ms}&vpath={result[0]}&vname={result[1]}'
webbrowser.open(url)
搜索看看
#搜一个骑摩托车片段
search_segment_by_prompt("riding motorcycle")
这里搜出了阿汤哥骑摩托车的一个片段:
原视频地址:https://www.youtube.com/watch?v=9LPkmeY-0w0
因为图方便,本文Demo用的是Python,简单使用其实并没有太大难度,希望不要给大家带来困扰;
大家应该也发现了,目前搜索都是英文,这里主要是模型原因对英文还可以,中文的支持稀烂,后面看看有没有办法找到中文效果也不错的模型,这里仅浅尝辄止。
整个实现流程其实也不复杂,先预处理视频数据(拆解为字幕和帧)->处理元数据->再将元数据插入LanceDB->最后检索,因为作为学习使用的原因写的比较细大家将就看。
然后这里的方案离生产使用可能还有一段距离,需要大家留意相关模型,欢迎交流。
https://mp.weixin.qq.com/s/EfCT6pXBSI3UB2pLziZofA
https://learn.deeplearning.ai/courses/multimodal-rag-chat-with-videos
https://lancedb.github.io/lancedb/
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有