大群口嗨一时爽,不得不为公司HR做了一个基于内部文档的ChatBot。大概花了2周的个人业余时间,算起来有2个工作日。Open AI ChatGPT Key缺乏、网络延迟以及Open LLM性能不佳的问题,索性不使用LLM进行搜索关联文档后输出优化。
系统方案如上图所示。数据流程分为两个部分:
excel文档导入十分简单:
import pandas as pd
df = pd.read_excel(example_qa_data, sheet_name='Sheet1')
pdf文档导入使用langchain的PyPDFLoader
from langchain.document_loaders import PyPDFLoader
loader = PyPDFLoader(data_file)
pages = loader.load()
Excel文档基本就是格式化的内容,基于就是column的处理,不再赘述。
pdf的文档经过PyPDFLoader后是只是TextString并且含有无用信息(如页眉,页尾),缺乏结构化信息。
一般做法是采用Langchain text_splitter RecursiveCharacterTextSplitter来处理,
r_splitter = RecursiveCharacterTextSplitter(
chunk_size=450,
chunk_overlap=50,
separators=["\n\n", "\n", " ", ""]
)
因为后面接chatgpt LLM对文档做过滤和总结和重构输出,具有鲁棒性所以缺乏结构化信息问题不大。我不打算采用LLM做后端输出(没有api key),所以需要匹配中的内容就是直接展示给用户的信息,需要chunk文本块具有内聚的完整性,所以对pdf的文档需要提取结构化信息(章节)。
页眉页首位置固定,取到每个页Page信息,丢弃掉前面或者后面几行就可以
pages[4].page_content.split("\n")[9:]
//###############check row type#####################
def is_contain_chinese(check_str):
cn_min = u'\u4e00'
cn_max = u'\u9fff'
for ch in check_str:
if cn_min <= ch <= cn_max:
return True
return False
def is_title(text):
title_regex = r'^\d+\.[\d+]?[\.\d+]*\s+.*[\u4e00-\u9fa5]+\s*$'
return re.match(title_regex, text) is not None
def is_empty_line(text):
return text.isspace()
def get_title_from_line(line):
title_reg = r'^(\d+\.[\d+]?[\.\d+]*)\s([/&A-Za-z\s’,-]+)[\s]?([\u4e00-\u9fa5、/\s&]+)\s*$'
result = re.search(title_reg, line)
if result is None:
return None
title_idx, title_en, title_cn = result.groups()
title_lvl = len(title_idx[:-1].split(".")) - 1
return result.group(1), result.group(2), result.group(3), title_lvl
def get_chaper_strunct_content_from_file(pdf_file,page_range):
pages = get_pages_from_file(pdf_file,page_range)
lines = get_lines_from_pages(pages)
df = pd.DataFrame(lines, columns=["text"])
df['is_title'] = df['text'].apply(is_title)
df['is_empty_line'] = df['text'].apply(is_empty_line)
df['contain_chinese'] = df['text'].apply(is_contain_chinese)
chapter_sessions=[]
cur_session = {}
passage_lines = []
cur_lan = None
for i, row in df.iterrows():
if row['is_title']:
result = get_title_from_line(row['text'])
if result is None:
print(row['text'])
continue
title_idx, title_en, title_cn, title_lvl = result
if title_lvl >= 1:
if len(cur_session.keys()) > 0:
if len(passage_lines) > 0:
if cur_lan == 'cn':
cur_session['sessions_cn'].append(" ".join(passage_lines))
else:
cur_session['sessions_en'].append(" ".join(passage_lines))
cur_session['text_en_full'] = "\n".join(cur_session['sessions_en'])
cur_session['text_cn_full'] = "\n".join(cur_session['sessions_cn'])
chapter_sessions.append(cur_session)
cur_session = {}
passage_lines = []
cur_session['title_en'] = title_en
cur_session['title_cn'] = title_cn
cur_session['title_idx'] = title_idx
cur_session['title_lvl'] = title_lvl
cur_session['sessions_cn'] = []
cur_session['sessions_en'] = []
cur_session['text_en_full'] = ""
cur_session['text_cn_full'] = ""
elif row['is_empty_line']:
continue
elif row['contain_chinese']:
if cur_lan == 'en' or cur_lan is None:
if len(passage_lines) > 0:
if 'sessions_en' not in cur_session.keys():
cur_session['sessions_en'] = []
cur_session['sessions_en'].append(" ".join(passage_lines))
passage_lines = []
cur_lan = 'cn'
passage_lines.append(row['text'])
else:
if cur_lan == 'cn' or cur_lan is None:
if len(passage_lines) > 0:
if 'sessions_cn' not in cur_session.keys():
cur_session['sessions_cn'] = []
cur_session['sessions_cn'].append(" ".join(passage_lines))
passage_lines = []
cur_lan = 'en'
passage_lines.append(row['text'])
if len(passage_lines) > 0:
if cur_lan == 'cn':
if 'sessions_cn' not in cur_session.keys():
cur_session['sessions_cn'] = []
cur_session['sessions_cn'].append(" ".join(passage_lines))
else:
if 'sessions_en' not in cur_session.keys():
cur_session['sessions_en'] = []
cur_session['sessions_en'].append(" ".join(passage_lines))
cur_session['text_en_full'] = "\n".join(cur_session['sessions_en'])
cur_session['text_cn_full'] = "\n".join(cur_session['sessions_cn'])
chapter_sessions.append(cur_session)
return chapter_sessions
import pandas as pd
import numpy as np
import redis
from redis.commands.search.indexDefinition import (
IndexDefinition,
IndexType
)
from redis.commands.search.query import Query
from redis.commands.search.field import (
TextField,
VectorField
)
from typing import List, Iterator
def create_qa_index(redis_client: redis.Redis):
VECTOR_DIM_ADA = 1536 # length of the vectors
VECTOR_DIM_M3E = 768 # length of the vectors
VECTOR_NUMBER = 1000 # initial number of vectors
INDEX_NAME = "qa-ada-m3e" # name of the search index
PREFIX = "qa" # prefix for the document keys
DISTANCE_METRIC = "COSINE" # distance metric for the vectors (ex. COSINE, IP, L2)
city = TextField("city")
question = TextField("question")
keywords = TextField("keywords")
answer = TextField("answer")
qa = TextField("qa")
question_embedding_ada = VectorField("question_vector_ada",
"FLAT", {
"TYPE": "FLOAT32",
"DIM": VECTOR_DIM_ADA,
"DISTANCE_METRIC": DISTANCE_METRIC,
"INITIAL_CAP": VECTOR_NUMBER,
}
)
answer_embedding_ada = VectorField("answer_vector_ada",
"FLAT", {
"TYPE": "FLOAT32",
"DIM": VECTOR_DIM_ADA,
"DISTANCE_METRIC": DISTANCE_METRIC,
"INITIAL_CAP": VECTOR_NUMBER,
}
)
qa_embedding_ada = VectorField("qa_vector_ada",
"FLAT", {
"TYPE": "FLOAT32",
"DIM": VECTOR_DIM_ADA,
"DISTANCE_METRIC": DISTANCE_METRIC,
"INITIAL_CAP": VECTOR_NUMBER,
}
)
question_embedding_m3e = VectorField("question_vector_m3e",
"FLAT", {
"TYPE": "FLOAT32",
"DIM": VECTOR_DIM_M3E,
"DISTANCE_METRIC": DISTANCE_METRIC,
"INITIAL_CAP": VECTOR_NUMBER,
}
)
answer_embedding_m3e = VectorField("answer_vector_m3e",
"FLAT", {
"TYPE": "FLOAT32",
"DIM": VECTOR_DIM_M3E,
"DISTANCE_METRIC": DISTANCE_METRIC,
"INITIAL_CAP": VECTOR_NUMBER,
}
)
qa_embedding_m3e = VectorField("qa_vector_m3e",
"FLAT", {
"TYPE": "FLOAT32",
"DIM": VECTOR_DIM_M3E,
"DISTANCE_METRIC": DISTANCE_METRIC,
"INITIAL_CAP": VECTOR_NUMBER,
}
)
fields = [city,question,keywords,answer,qa,question_embedding_ada,answer_embedding_ada,qa_embedding_ada,question_embedding_m3e,answer_embedding_m3e,qa_embedding_m3e]
try:
redis_client.ft(INDEX_NAME).info()
print("Index already exists")
except:
# Create RediSearch Index
redis_client.ft(INDEX_NAME).create_index(
fields = fields,
definition = IndexDefinition(prefix=[PREFIX], index_type=IndexType.HASH)
)
def index_qa_documents(client: redis.Redis, prefix: str, documents: pd.DataFrame):
records = documents.to_dict("records")
for doc in records:
key = f"{prefix}:{str(doc['city'])}:{str(doc['question'])}"
# create byte vectors for title and content
question_embedding_ada = np.array(doc["question_vector_ada"], dtype=np.float32).tobytes()
answer_embedding_ada = np.array(doc["answer_vector_ada"], dtype=np.float32).tobytes()
qa_embedding_ada = np.array(doc["qa_vector_ada"], dtype=np.float32).tobytes()
# replace list of floats with byte vectors
doc["question_vector_ada"] = question_embedding_ada
doc["answer_vector_ada"] = answer_embedding_ada
doc["qa_vector_ada"] = qa_embedding_ada
# create byte vectors for title and content
question_embedding_m3e = np.array(doc["question_vector_m3e"], dtype=np.float32).tobytes()
answer_embedding_m3e = np.array(doc["answer_vector_m3e"], dtype=np.float32).tobytes()
qa_embedding_m3e = np.array(doc["qa_vector_m3e"], dtype=np.float32).tobytes()
# replace list of floats with byte vectors
doc["question_vector_m3e"] = question_embedding_m3e
doc["answer_vector_m3e"] = answer_embedding_m3e
doc["qa_vector_m3e"] = qa_embedding_m3e
client.hset(key, mapping = doc)
def create_hb_index(redis_client: redis.Redis):
...
def index_hb_documents(client: redis.Redis, prefix: str, documents: pd.DataFrame):
...
def search_redis(
redis_client: redis.Redis,
user_query: str,
index_name: str = "qa-ada-m3e",
vector_field: str = "question_vector_m3e",
return_fields: list = ['city','question','keywords','answer','qa',"vector_score"],
hybrid_fields = "*",
k: int = 3,
embedding_model='m3e',
print_results: bool = True
) -> List[dict]:
embedded_query = get_query_embeddings(user_query,embedding_model)
# Prepare the Query
base_query = f'{hybrid_fields}=>[KNN {k} @{vector_field} $vector AS vector_score]'
query = (
Query(base_query)
.return_fields(*return_fields)
.sort_by("vector_score")
.paging(0, k)
.dialect(2)
)
params_dict = {"vector": np.array(embedded_query).astype(dtype=np.float32).tobytes()}
# perform vector search
results = redis_client.ft(index_name).search(query, params_dict)
if print_results:
for i, article in enumerate(results.docs):
score = 1 - float(article.vector_score)
print(f"{i}. {article[return_fields[0]]} : {article[return_fields[1]]} (Score: {round(score ,3) })")
return results.docs
def do_all_search(query:str,city:str):
qa_rlt_ls = []
qa_search_cnt = len(redis_search_params['qa']['vector_fields'] )
for i in range(qa_search_cnt):
qa_rlt = search_redis(redis_client,
query,
index_name=redis_search_params['qa']['index_name'],
vector_field=redis_search_params['qa']['vector_fields'][i],
return_fields=redis_search_params['qa']['return_fields'],
hybrid_fields=create_hybrid_field('city',city),
k=2
)
qa_rlt_ls.append(qa_rlt)
hb_rlt_ls = []
hb_search_cnt = len(redis_search_params['hb']['vector_fields'] )
for i in range(hb_search_cnt):
hb_rlt = search_redis(redis_client,
query,
index_name=redis_search_params['hb']['index_name'],
vector_field=redis_search_params['hb']['vector_fields'][i],
return_fields=redis_search_params['hb']['return_fields'],
k=2
)
hb_rlt_ls.append(hb_rlt)
df_ls = []
for i in range(len(qa_rlt_ls)):
df_qa = pd.DataFrame([it.__dict__ for it in qa_rlt_ls[i]])
df_qa['score'] = df_qa['vector_score'].apply(lambda x: 1 - float(x))
df_qa['search_by'] = 'question' if i == 0 else 'answer'
df_qa.rename(columns={'question':'question_title','answer':'answer_text'},inplace=True)
df_ls.append(df_qa[['score','id','answer_text','question_title','search_by']])
for i in range(len(hb_rlt_ls)):
df_hb = pd.DataFrame([it.__dict__ for it in hb_rlt_ls[i]])
df_hb['score'] = df_hb['vector_score'].apply(lambda x: 1 - float(x))
df_hb['search_by'] = 'title' if i == 0 else 'text'
df_hb.rename(columns={'title_cn':'question_title','text_cn_full':'answer_text'},inplace=True)
df_ls.append(df_hb[['score','id','answer_text','question_title','search_by']])
df = pd.concat(df_ls).sort_values(by='score',ascending=False).reset_index(drop=True)
return df
实现Query的history Database,另外有评估反馈的按钮,就可以:收集群众大多数没有满意的问题,人工回答并反馈,并且构建填充QA数据集。另外后期批量文档数据导入,不用人工对这些文档做精细处理,有chatgpt的话,可以提取满意的回答固化到qa数据集中去。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。