
LLM介绍


核心:Transforme


1.什么是RAG:RAG(Retrieval-Augmented Generation)技术的原理与实践

工作流



2.Embedding

3.知识库设计:常用向量数据库

4.大语言模型:prompt提示设计

5.Rerank

基础的RAG demo上,加入Rerank机制
好的,以下是完整代码,包括app.py和Rerank.py的内容,以及在app.py中加入Rerank机制的修改。
Python复制
# Rerank.py
from typing import List
import numpy as np
from mindnlp.sentence import SentenceTransformer
class BaseReranker:
"""
Base class for reranker
"""
def __init__(self, path: str) -> None:
self.path = path
def rerank(self, text: str, content: List[str], k: int) -> List[str]:
raise NotImplementedError
class MindNLPReranker(BaseReranker):
"""
class for MindNLP reranker
"""
def __init__(self, path: str = 'BAAI/bge-reranker-base') -> None:
super().__init__(path)
self._model = self.load_model(path)
def rerank(self, text: str, content: List[str], k: int) -> List[str]:
query_embedding = self._model.encode(text, normalize_embeddings=True)
sentences_embedding = self._model.encode(sentences=content, normalize_embeddings=True)
similarity = query_embedding @ sentences_embedding.T
ranked_indices = np.argsort(similarity)[::-1] # 按相似度降序排序
top_k_sentences = [content[i] for i in ranked_indices[:k]]
return top_k_sentences
def load_model(self, path: str):
model = SentenceTransformer(path)
return modelPython复制
# app.py
import gradio as gr
from typing import List, Dict
import os
import PyPDF2
from mindspore import Tensor
from copy import copy
import numpy as np
from tqdm import tqdm
import json
import markdown
from bs4 import BeautifulSoup
import re
import tiktoken
from datetime import datetime
from Rerank import MindNLPReranker # 导入Rerank模块
# 初始化 tiktoken 编码器
enc = tiktoken.get_encoding("cl100k_base")
class ReadFiles:
"""
class to read files
"""
def __init__(self, path: str) -> None:
self._path = path
self.file_list = self.get_files()
@classmethod
def read_pdf(cls, file_path: str):
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
text = ""
for page_num in range(len(reader.pages)):
text += reader.pages[page_num].extract_text()
return text
@classmethod
def read_markdown(cls, file_path: str):
with open(file_path, 'r', encoding='utf-8') as file:
md_text = file.read()
html_text = markdown.markdown(md_text)
soup = BeautifulSoup(html_text, 'html.parser')
plain_text = soup.get_text()
text = re.sub(r'http\S+', '', plain_text)
return text
@classmethod
def read_text(cls, file_path: str):
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
def get_files(self):
file_list = []
for filepath, dirnames, filenames in os.walk(self._path):
for filename in filenames:
if filename.endswith(".md"):
file_list.append(os.path.join(filepath, filename))
elif filename.endswith(".txt"):
file_list.append(os.path.join(filepath, filename))
elif filename.endswith(".pdf"):
file_list.append(os.path.join(filepath, filename))
return file_list
def get_content(self, max_token_len: int = 600, cover_content: int = 150):
docs = []
for file in self.file_list:
content = self.read_file_content(file)
chunk_content = self.get_chunk(
content, max_token_len=max_token_len, cover_content=cover_content)
docs.extend(chunk_content)
return docs
@classmethod
def get_chunk(cls, text: str, max_token_len: int = 600, cover_content: int = 150):
chunk_text = []
curr_len = 0
curr_chunk = ''
token_len = max_token_len - cover_content
lines = text.splitlines()
for line in lines:
line = line.replace(' ', '')
line_len = len(enc.encode(line))
if line_len > max_token_len:
num_chunks = (line_len + token_len - 1) // token_len
for i in range(num_chunks):
start = i * token_len
end = start + token_len
while not line[start:end].rstrip().isspace():
start += 1
end += 1
if start >= line_len:
break
curr_chunk = curr_chunk[-cover_content:] + line[start:end]
chunk_text.append(curr_chunk)
start = (num_chunks - 1) * token_len
curr_chunk = curr_chunk[-cover_content:] + line[start:end]
chunk_text.append(curr_chunk)
if curr_len + line_len <= token_len:
curr_chunk += line
curr_chunk += '\n'
curr_len += line_len
curr_len += 1
else:
chunk_text.append(curr_chunk)
curr_chunk = curr_chunk[-cover_content:] + line
curr_len = line_len + cover_content
if curr_chunk:
chunk_text.append(curr_chunk)
return chunk_text
@classmethod
def read_file_content(cls, file_path: str):
if file_path.endswith('.pdf'):
return cls.read_pdf(file_path)
elif file_path.endswith('.md'):
return cls.read_markdown(file_path)
elif file_path.endswith('.txt'):
return cls.read_text(file_path)
else:
raise ValueError("Unsupported file type")
class BaseEmbeddings:
"""
Base class for embeddings
"""
def __init__(self, path: str, is_api: bool) -> None:
self.path = path
self.is_api = is_api
def get_embedding(self, text: str, model: str) -> List[float]:
raise NotImplementedError
@classmethod
def cosine_similarity(cls, vector1: List[float], vector2: List[float]) -> float:
dot_product = np.dot(vector1, vector2)
magnitude = np.linalg.norm(vector1) * np.linalg.norm(vector2)
if not magnitude:
return 0
return dot_product / magnitude
class MindNLPEmbedding(BaseEmbeddings):
"""
class for MindNLP embeddings
"""
def __init__(self, path: str = 'BAAI/bge-base-zh-v1.5', is_api: bool = False) -> None:
super().__init__(path, is_api)
self._model = self.load_model(path)
def get_embedding(self, text: str):
sentence_embedding = self._model.encode([text], normalize_embeddings=True)
return sentence_embedding
def load_model(self, path: str):
from mindnlp.sentence import SentenceTransformer
model = SentenceTransformer(path)
return model
@classmethod
def cosine_similarity(cls, sentence_embedding_1, sentence_embedding_2):
similarity = sentence_embedding_1 @ sentence_embedding_2.T
return similarity
class VectorStore:
def __init__(self, document: List[str] = ['']) -> None:
self.document = document
def get_vector(self, EmbeddingModel: BaseEmbeddings):
self.vectors = []
for doc in tqdm(self.document, desc="Calculating embeddings"):
self.vectors.append(EmbeddingModel.get_embedding(doc))
return self.vectors
def persist(self, path: str = 'storage'):
if not os.path.exists(path):
os.makedirs(path)
with open(f"{path}/document.json", 'w', encoding='utf-8') as f:
json.dump(self.document, f, ensure_ascii=False)
if self.vectors:
vectors_list = [vector.tolist() for vector in self.vectors]
with open(f"{path}/vectors.json", 'w', encoding='utf-8') as f:
json.dump(vectors_list, f)
def load_vector(self, EmbeddingModel: BaseEmbeddings, path: str = 'storage'):
with open(f"{path}/vectors.json", 'r', encoding='utf-8') as f:
vectors_list = json.load(f)
with open(f"{path}/document.json", 'r', encoding='utf-8') as f:
self.document = json.load(f)
if isinstance(EmbeddingModel, MindNLPEmbedding):
self.vectors = [np.array(vector) for vector in vectors_list]
else:
self.vectors = vectors_list
def get_similarity(self, vector1, vector2, EmbeddingModel: BaseEmbeddings):
return EmbeddingModel.cosine_similarity(vector1, vector2)
def query(self, query: str, EmbeddingModel: BaseEmbeddings, k: int = 1):
query_vector = EmbeddingModel.get_embedding(query)
similarities = [self.get_similarity(query_vector, vector, EmbeddingModel) for vector in self.vectors]
results = []
for similarity, vector, document in zip(similarities, self.vectors, self.document):
results.append({
'similarity': similarity,
'vector': vector,
'document': document
})
results.sort(key=lambda x: x['similarity'], reverse=True)
top_k_documents = [result['document'] for result in results[:k]]
return top_k_documents
class BaseModel:
def __init__(self, path: str = '') -> None:
self.path = path
def chat(self, prompt: str, history: List[dict], content: str) -> str:
pass
def load_model(self):
pass
return tokenizer, model
class MindNLPChat(BaseModel):
def __init__(self, path: str = '') -> None:
super().__init__(path)
self.load_model()
def chat(self, prompt: str, history: List = [], content: str = '') -> str:
prompt = PROMPT_TEMPLATE['MindNLP_PROMPT_TEMPALTE'].format(question=prompt, context=content)
response, history = self.model.chat(self.tokenizer, prompt, history, max_length=512)
return response
def load_model(self):
import mindspore
from mindnlp.transformers import AutoTokenizer, AutoModelForCausalLM
self.tokenizer = AutoTokenizer.from_pretrained(self.path, mirror="huggingface")
self.model = AutoModelForCausalLM.from_pretrained(self.path, ms_dtype=mindspore.float16, mirror="huggingface")
PROMPT_TEMPLATE = dict(
RAG_PROMPT_TEMPALTE="""使用以上下文来回答用户的问题。如果你不知道答案,请输出我不知道。总是使用中文回答。
问题: {question}
可参考的上下文:
···
{context}
···
如果给定的上下文无法让你做出回答,请回答数据库中没有这个内容,你不知道。
有用的回答:""",
MindNLP_PROMPT_TEMPALTE="""先对上下文进行内容总结,再使用上下文来回答用户的问题。如果你不知道答案,请输出我不知道。总是使用中文回答。
问题: {question}
可参考的上下文:
···
{context}
···
如果给定的上下文无法让你做出回答,请回答数据库中没有这个内容,你不知道。
有用的回答:"""
)
def rag_retrieval(query: str, embedding_model: 'MindNLPEmbedding', vector_store: 'VectorStore', reranker: 'MindNLPReranker') -> List[Dict]:
vector_store.get_vector(embedding_model)
vector_store.persist(path='storage')
retrieved_documents = vector_store.query(query, embedding_model, k=5)
top_k_documents = reranker.rerank(query, retrieved_documents, k=3)
retrieved_results = [{"content": doc, "score": 1.0} for doc in top_k_documents]
print(retrieved_results)
return retrieved_results
def generate_response(query: str, retrieved_results: List[Dict], chat_model: 'MindNLPChat') -> str:
context = "\n".join([result["content"] for result in retrieved_results])
response = chat_model.chat(query, [], context)
return response
def process_uploaded_files(files: List[str]) -> None:
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
save_dir = os.path.join("./data", current_time)
os.makedirs(save_dir, exist_ok=True)
for file_path in files:
file_name = os.path.basename(file_path)
save_path = os.path.join(save_dir, file_name)
with open(file_path, 'rb') as src_file, open(save_path, 'wb') as dst_file:
dst_file.write(src_file.read())
documents = ReadFiles('./data').get_content(max_token_len=600, cover_content=150)
return documents
def rag_app(query: str, files: List[str]) -> str:
documents = process_uploaded_files(files)
embedding_model = MindNLPEmbedding("BAAI/bge-base-zh-v1.5")
vector_store = VectorStore(documents)
chat_model = MindNLPChat(path='openbmb/MiniCPM-2B-dpo-bf16')
reranker = MindNLPReranker(path='BAAI/bge-reranker-base')
retrieved_results = rag_retrieval(query, embedding_model, vector_store, reranker)
response = generate_response(query, retrieved_results, chat_model)
return response
interface = gr.Interface(
fn=rag_app,
inputs=[
gr.Textbox(label="请输入你的问题"),
gr.Files(label="上传文件(支持 .md, .txt, .pdf)")
],
outputs=gr.Textbox(label="生成的回复"),
title="RAG 应用",
description="上传文件并提问,系统将基于文件内容生成回复。"
)
interface.launch(share=True)MindNLPReranker类,用于重排检索结果。MindNLPReranker类。rag_retrieval函数,增加了Rerank机制。rag_app函数,初始化了reranker对象并传递给rag_retrieval函数。这样,就在现有的RAG demo基础上加入了Rerank机制,可以更精准地选择与用户查询最相关的文档内容。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。