首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >基于MindSpore NLP的LLM应用开发 - - 五点

基于MindSpore NLP的LLM应用开发 - - 五点

原创
作者头像
用户10497140
修改2025-01-19 14:54:23
修改2025-01-19 14:54:23
2540
举报

LLM介绍

核心:Transforme

1.什么是RAG:RAG(Retrieval-Augmented Generation)技术的原理与实践

工作流

2.Embedding

3.知识库设计:常用向量数据库

4.大语言模型:prompt提示设计

5.Rerank

基础的RAG demo上,加入Rerank机制

好的,以下是完整代码,包括app.pyRerank.py的内容,以及在app.py中加入Rerank机制的修改。

Rerank.py

Python复制

代码语言:javascript
复制
# Rerank.py
from typing import List
import numpy as np
from mindnlp.sentence import SentenceTransformer

class BaseReranker:
    """
    Base class for reranker
    """
    def __init__(self, path: str) -> None:
        self.path = path

    def rerank(self, text: str, content: List[str], k: int) -> List[str]:
        raise NotImplementedError

class MindNLPReranker(BaseReranker):
    """
    class for MindNLP reranker
    """
    def __init__(self, path: str = 'BAAI/bge-reranker-base') -> None:
        super().__init__(path)
        self._model = self.load_model(path)

    def rerank(self, text: str, content: List[str], k: int) -> List[str]:
        query_embedding = self._model.encode(text, normalize_embeddings=True)
        sentences_embedding = self._model.encode(sentences=content, normalize_embeddings=True)
        similarity = query_embedding @ sentences_embedding.T
        ranked_indices = np.argsort(similarity)[::-1]  # 按相似度降序排序
        top_k_sentences = [content[i] for i in ranked_indices[:k]]
        return top_k_sentences

    def load_model(self, path: str):
        model = SentenceTransformer(path)
        return model

app.py

Python复制

代码语言:javascript
复制
# app.py
import gradio as gr
from typing import List, Dict
import os
import PyPDF2
from mindspore import Tensor
from copy import copy
import numpy as np
from tqdm import tqdm
import json
import markdown
from bs4 import BeautifulSoup
import re
import tiktoken
from datetime import datetime
from Rerank import MindNLPReranker  # 导入Rerank模块

# 初始化 tiktoken 编码器
enc = tiktoken.get_encoding("cl100k_base")

class ReadFiles:
    """
    class to read files
    """
    def __init__(self, path: str) -> None:
        self._path = path
        self.file_list = self.get_files()

    @classmethod
    def read_pdf(cls, file_path: str):
        with open(file_path, 'rb') as file:
            reader = PyPDF2.PdfReader(file)
            text = ""
            for page_num in range(len(reader.pages)):
                text += reader.pages[page_num].extract_text()
            return text

    @classmethod
    def read_markdown(cls, file_path: str):
        with open(file_path, 'r', encoding='utf-8') as file:
            md_text = file.read()
            html_text = markdown.markdown(md_text)
            soup = BeautifulSoup(html_text, 'html.parser')
            plain_text = soup.get_text()
            text = re.sub(r'http\S+', '', plain_text)
            return text

    @classmethod
    def read_text(cls, file_path: str):
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()

    def get_files(self):
        file_list = []
        for filepath, dirnames, filenames in os.walk(self._path):
            for filename in filenames:
                if filename.endswith(".md"):
                    file_list.append(os.path.join(filepath, filename))
                elif filename.endswith(".txt"):
                    file_list.append(os.path.join(filepath, filename))
                elif filename.endswith(".pdf"):
                    file_list.append(os.path.join(filepath, filename))
        return file_list

    def get_content(self, max_token_len: int = 600, cover_content: int = 150):
        docs = []
        for file in self.file_list:
            content = self.read_file_content(file)
            chunk_content = self.get_chunk(
                content, max_token_len=max_token_len, cover_content=cover_content)
            docs.extend(chunk_content)
        return docs

    @classmethod
    def get_chunk(cls, text: str, max_token_len: int = 600, cover_content: int = 150):
        chunk_text = []
        curr_len = 0
        curr_chunk = ''
        token_len = max_token_len - cover_content
        lines = text.splitlines()

        for line in lines:
            line = line.replace(' ', '')
            line_len = len(enc.encode(line))
            if line_len > max_token_len:
                num_chunks = (line_len + token_len - 1) // token_len
                for i in range(num_chunks):
                    start = i * token_len
                    end = start + token_len
                    while not line[start:end].rstrip().isspace():
                        start += 1
                        end += 1
                        if start >= line_len:
                            break
                    curr_chunk = curr_chunk[-cover_content:] + line[start:end]
                    chunk_text.append(curr_chunk)
                start = (num_chunks - 1) * token_len
                curr_chunk = curr_chunk[-cover_content:] + line[start:end]
                chunk_text.append(curr_chunk)
            if curr_len + line_len <= token_len:
                curr_chunk += line
                curr_chunk += '\n'
                curr_len += line_len
                curr_len += 1
            else:
                chunk_text.append(curr_chunk)
                curr_chunk = curr_chunk[-cover_content:] + line
                curr_len = line_len + cover_content

        if curr_chunk:
            chunk_text.append(curr_chunk)

        return chunk_text

    @classmethod
    def read_file_content(cls, file_path: str):
        if file_path.endswith('.pdf'):
            return cls.read_pdf(file_path)
        elif file_path.endswith('.md'):
            return cls.read_markdown(file_path)
        elif file_path.endswith('.txt'):
            return cls.read_text(file_path)
        else:
            raise ValueError("Unsupported file type")


class BaseEmbeddings:
    """
    Base class for embeddings
    """
    def __init__(self, path: str, is_api: bool) -> None:
        self.path = path
        self.is_api = is_api

    def get_embedding(self, text: str, model: str) -> List[float]:
        raise NotImplementedError

    @classmethod
    def cosine_similarity(cls, vector1: List[float], vector2: List[float]) -> float:
        dot_product = np.dot(vector1, vector2)
        magnitude = np.linalg.norm(vector1) * np.linalg.norm(vector2)
        if not magnitude:
            return 0
        return dot_product / magnitude


class MindNLPEmbedding(BaseEmbeddings):
    """
    class for MindNLP embeddings
    """
    def __init__(self, path: str = 'BAAI/bge-base-zh-v1.5', is_api: bool = False) -> None:
        super().__init__(path, is_api)
        self._model = self.load_model(path)

    def get_embedding(self, text: str):
        sentence_embedding = self._model.encode([text], normalize_embeddings=True)
        return sentence_embedding

    def load_model(self, path: str):
        from mindnlp.sentence import SentenceTransformer
        model = SentenceTransformer(path)
        return model

    @classmethod
    def cosine_similarity(cls, sentence_embedding_1, sentence_embedding_2):
        similarity = sentence_embedding_1 @ sentence_embedding_2.T
        return similarity


class VectorStore:
    def __init__(self, document: List[str] = ['']) -> None:
        self.document = document

    def get_vector(self, EmbeddingModel: BaseEmbeddings):
        self.vectors = []
        for doc in tqdm(self.document, desc="Calculating embeddings"):
            self.vectors.append(EmbeddingModel.get_embedding(doc))
        return self.vectors

    def persist(self, path: str = 'storage'):
        if not os.path.exists(path):
            os.makedirs(path)
        with open(f"{path}/document.json", 'w', encoding='utf-8') as f:
            json.dump(self.document, f, ensure_ascii=False)
        if self.vectors:
            vectors_list = [vector.tolist() for vector in self.vectors]
            with open(f"{path}/vectors.json", 'w', encoding='utf-8') as f:
                json.dump(vectors_list, f)

    def load_vector(self, EmbeddingModel: BaseEmbeddings, path: str = 'storage'):
        with open(f"{path}/vectors.json", 'r', encoding='utf-8') as f:
            vectors_list = json.load(f)
        with open(f"{path}/document.json", 'r', encoding='utf-8') as f:
            self.document = json.load(f)

        if isinstance(EmbeddingModel, MindNLPEmbedding):
            self.vectors = [np.array(vector) for vector in vectors_list]
        else:
            self.vectors = vectors_list

    def get_similarity(self, vector1, vector2, EmbeddingModel: BaseEmbeddings):
        return EmbeddingModel.cosine_similarity(vector1, vector2)

    def query(self, query: str, EmbeddingModel: BaseEmbeddings, k: int = 1):
        query_vector = EmbeddingModel.get_embedding(query)
        similarities = [self.get_similarity(query_vector, vector, EmbeddingModel) for vector in self.vectors]
        results = []
        for similarity, vector, document in zip(similarities, self.vectors, self.document):
            results.append({
                'similarity': similarity,
                'vector': vector,
                'document': document
            })
        results.sort(key=lambda x: x['similarity'], reverse=True)
        top_k_documents = [result['document'] for result in results[:k]]
        return top_k_documents


class BaseModel:
    def __init__(self, path: str = '') -> None:
        self.path = path

    def chat(self, prompt: str, history: List[dict], content: str) -> str:
        pass

    def load_model(self):
        pass
        return tokenizer, model


class MindNLPChat(BaseModel):
    def __init__(self, path: str = '') -> None:
        super().__init__(path)
        self.load_model()

    def chat(self, prompt: str, history: List = [], content: str = '') -> str:
        prompt = PROMPT_TEMPLATE['MindNLP_PROMPT_TEMPALTE'].format(question=prompt, context=content)
        response, history = self.model.chat(self.tokenizer, prompt, history, max_length=512)
        return response

    def load_model(self):
        import mindspore
        from mindnlp.transformers import AutoTokenizer, AutoModelForCausalLM
        self.tokenizer = AutoTokenizer.from_pretrained(self.path, mirror="huggingface")
        self.model = AutoModelForCausalLM.from_pretrained(self.path, ms_dtype=mindspore.float16, mirror="huggingface")


PROMPT_TEMPLATE = dict(
    RAG_PROMPT_TEMPALTE="""使用以上下文来回答用户的问题。如果你不知道答案,请输出我不知道。总是使用中文回答。
        问题: {question}
        可参考的上下文:
        ···
        {context}
        ···
        如果给定的上下文无法让你做出回答,请回答数据库中没有这个内容,你不知道。
        有用的回答:""",
    MindNLP_PROMPT_TEMPALTE="""先对上下文进行内容总结,再使用上下文来回答用户的问题。如果你不知道答案,请输出我不知道。总是使用中文回答。
        问题: {question}
        可参考的上下文:
        ···
        {context}
        ···
        如果给定的上下文无法让你做出回答,请回答数据库中没有这个内容,你不知道。
        有用的回答:"""
)


def rag_retrieval(query: str, embedding_model: 'MindNLPEmbedding', vector_store: 'VectorStore', reranker: 'MindNLPReranker') -> List[Dict]:
    vector_store.get_vector(embedding_model)
    vector_store.persist(path='storage')
    retrieved_documents = vector_store.query(query, embedding_model, k=5)
    top_k_documents = reranker.rerank(query, retrieved_documents, k=3)
    retrieved_results = [{"content": doc, "score": 1.0} for doc in top_k_documents]
    print(retrieved_results)
    return retrieved_results


def generate_response(query: str, retrieved_results: List[Dict], chat_model: 'MindNLPChat') -> str:
    context = "\n".join([result["content"] for result in retrieved_results])
    response = chat_model.chat(query, [], context)
    return response


def process_uploaded_files(files: List[str]) -> None:
    current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    save_dir = os.path.join("./data", current_time)
    os.makedirs(save_dir, exist_ok=True)

    for file_path in files:
        file_name = os.path.basename(file_path)
        save_path = os.path.join(save_dir, file_name)
        with open(file_path, 'rb') as src_file, open(save_path, 'wb') as dst_file:
            dst_file.write(src_file.read())

    documents = ReadFiles('./data').get_content(max_token_len=600, cover_content=150)
    return documents


def rag_app(query: str, files: List[str]) -> str:
    documents = process_uploaded_files(files)
    embedding_model = MindNLPEmbedding("BAAI/bge-base-zh-v1.5")
    vector_store = VectorStore(documents)
    chat_model = MindNLPChat(path='openbmb/MiniCPM-2B-dpo-bf16')
    reranker = MindNLPReranker(path='BAAI/bge-reranker-base')
    retrieved_results = rag_retrieval(query, embedding_model, vector_store, reranker)
    response = generate_response(query, retrieved_results, chat_model)
    return response


interface = gr.Interface(
    fn=rag_app,
    inputs=[
        gr.Textbox(label="请输入你的问题"),
        gr.Files(label="上传文件(支持 .md, .txt, .pdf)")
    ],
    outputs=gr.Textbox(label="生成的回复"),
    title="RAG 应用",
    description="上传文件并提问,系统将基于文件内容生成回复。"
)

interface.launch(share=True)

说明

  1. Rerank.py:定义了MindNLPReranker类,用于重排检索结果。
  2. app.py
    • 导入了MindNLPReranker类。
    • 修改了rag_retrieval函数,增加了Rerank机制。
    • 修改了rag_app函数,初始化了reranker对象并传递给rag_retrieval函数。

这样,就在现有的RAG demo基础上加入了Rerank机制,可以更精准地选择与用户查询最相关的文档内容。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Rerank.py
  • app.py
  • 说明
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档