本教程灵感来源于自塾大模型 API 开发和智谱 AI 官方手册。 本教程需先注册智谱 AI,所有注册用户均可免费使用 GLM-4-Flash API 。 本教程运行环境为 Python 3.10.10,由 ModelArts AIGallery 提供算力支持。
在 NoteBook 中,我们可以通过虚拟环境安装指定版本的 Python,具体教程请参考:ModelArts codelab 创建虚拟环境切换 Python
# 创建虚拟环境
!/home/ma-user/anaconda3/bin/conda create -n python-3.10.10 python=3.10.10 -y --override-channels --channel https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
!/home/ma-user/anaconda3/envs/python-3.10.10/bin/pip install ipykernel
# 接着将虚拟环境挂载到 kernel
import json
import os
data = {
# kernel 显示的名称
"display_name": "python-3.10.10",
# 上面创建的虚拟环境路径
"env": {
"PATH": "/home/ma-user/anaconda3/envs/python-3.10.10/bin:/home/ma-user/anaconda3/envs/python-3.7.10/bin:/modelarts/authoring/notebook-conda/bin:/opt/conda/bin:/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/home/ma-user/modelarts/ma-cli/bin:/home/ma-user/modelarts/ma-cli/bin:/home/ma-user/anaconda3/envs/PyTorch-1.8/bin"
},
# 人生苦短,我用python
"language": "python",
# 执行参数
"argv": [
"/home/ma-user/anaconda3/envs/python-3.10.10/bin/python",
"-m",
"ipykernel",
"-f",
"{connection_file}"
]
}
if not os.path.exists("/home/ma-user/anaconda3/share/jupyter/kernels/python-3.10.10/"):
os.mkdir("/home/ma-user/anaconda3/share/jupyter/kernels/python-3.10.10/")
with open('/home/ma-user/anaconda3/share/jupyter/kernels/python-3.10.10/kernel.json', 'w') as f:
json.dump(data, f, indent=4)
# 创建完成后,稍等片刻,或刷新页面,点击右上角kernel选择python-3.10.10
# 接着验证环境有没有生效
!python -V
!pip -V
requests 是 Python 常用的网络请求包,类似于前端的 Axios,我们通过 requests 来调用大模型服务 API。
# 安装 requests
!pip install requests
# 使用 requsets 包可以调用任何大模型 API
# API Key: https://bigmodel.cn/usercenter/apikeys
# 使用 requsets 包可以调用任何大模型 API
import requests
api_key = 'fd3cfd4bddd068e28e7175104002689b.Lm7aWJUYwMK2P5I9'
url = "https://open.bigmodel.cn/api/paas/v4/chat/completions"
# 把令牌封装到header中
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
# 创建数据结构
data = {
"model": "glm-4-flash",
"messages": [
{
"role": "system",
"content": "your are a helpful assistant"
},
{
"role": "user",
"content": "can you tell me a joke?"
}
],
"max_tokens": 8192,
"temperature": 0.8,
"stream": False # 如果需要流式输出只需将 stream 设置为 True 即可
}
# 用requests发送post请求
response = requests.post(url, headers=headers, json=data)
ans = response.json()
ans
为了方便安全的测试,我们一般不会直接将 api_key 和 api_secret 这里敏感数据直接硬编码到代码中,常规的做法是在项目里新建单独存放敏感数据的文件,比如前端项目常见的 .env 文件,这里我们把需要用到的 api_key 和 api_secret 写入到 keys.txt 文件中,如:
# keys.txt
fd3cfd4bddd068e28e7175104002689b.Lm7aWJUYwMK2P5I9
使用起来也十分方便:
def get_api_keys(file_path, line_number=None):
with open(file_path, 'r', encoding='utf-8') as f:
# 读取文件并按换行符分割成列表,过滤空行和注释行
keylist = [line.strip() for line in f.read().split('\n') if line.strip() and not line.startswith('#')]
# 如果未传 line_number 或行号超出范围,则返回整个 keylist
if line_number is None or line_number > len(keylist):
return keylist
# 否则,返回指定行的 api_key 和 api_secret
api_key = keylist[line_number - 1]
return api_key
# 使用示例
file_path = 'keys.txt'
line_number = 1 # 如果你不想传入行号,则将此行设置为 None
# 获取第二行的 keys,如没有就返回整个列表
result = get_api_keys(file_path, line_number)
result
import requests
# 调用智谱API 流式输出
def get_api_keys(file_path, line_number=None):
with open(file_path, 'r', encoding='utf-8') as f:
# 读取文件并按换行符分割成列表,过滤空行和注释行
keylist = [line.strip() for line in f.read().split('\n') if line.strip() and not line.startswith('#')]
# 如果未传 line_number 或行号超出范围,则返回整个 keylist
if line_number is None or line_number > len(keylist):
return keylist
# 否则,返回指定行的 api_key 和 api_secret
api_key = keylist[line_number - 1]
return api_key
# 从文件导入所需要的secret keys
keylist = get_api_keys('keys.txt')
# 获取 1 个 keys
api_key = get_api_keys('keys.txt', 1)
url = "https://open.bigmodel.cn/api/paas/v4/chat/completions"
# 把令牌封装到header中
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
# 创建数据结构
data = {
"model": "glm-4-flash",
"messages": [
{
"role": "system",
"content": "your are a helpful assistant"
},
{
"role": "user",
"content": "can you tell me a joke?"
}
],
"max_tokens": 8192,
"temperature": 0.8,
"stream": True # 如果需要流式输出只需将 stream 设置为 True 即可
}
response = requests.post(url, headers=headers, json=data)
response
for chunk in response:
print(chunk)
面向开发者的产品,大多都会提供 SDK 方面大家调用,就算官方不提供,只有产品被频繁使用,开源社区也会有开发者会实现。在智谱 AI 开发者平台,我们能找到 SDK 的使用文档。
!pip install zhipuai
from zhipuai import ZhipuAI
# client = ZhipuAI(api_key="") # 请填写您自己的API Key
# 从文件导入所需要的secret keys
with open('keys.txt', 'r', encoding='utf-8') as f:
# 读取文件并按换行符分割成列表,过滤空行和注释行
keylist = [line.strip() for line in f.read().split('\n') if line.strip() and not line.startswith('#')]
client = ZhipuAI(api_key=keylist[0]) # 填写您自己的APIKey
response = client.chat.completions.create(
model="glm-4-0520", # 填写需要调用的模型编码
messages=[
{"role": "user", "content": "你好!你叫什么名字"},
],
stream=True,
)
for chunk in response:
print(chunk.choices[0].delta)
继续调用:
# 简单封装
def gen_gpt_messages(prompt):
'''
构造 GPT 模型请求参数 messages
请求参数:
prompt: 对应的用户提示词
'''
messages = [{"role": "user", "content": prompt}]
return messages
def get_completion(prompt, model="glm-4-flash", temperature = 0):
'''
获取 GPT 模型调用结果
请求参数:
prompt: 对应的提示词
model: 调用的模型,默认为 glm-4-flash,也可以按需选择 glm-4-plus 等其他模型
temperature: 模型输出的温度系数,控制输出的随机程度,取值范围是 0~2。温度系数越低,输出内容越一致。
'''
response = client.chat.completions.create(
model=model,
messages=gen_gpt_messages(prompt),
temperature=temperature,
)
if len(response.choices) > 0:
return response.choices[0].message.content
return "generate answer error"
response = get_completion("大模型领域的MCTS是什么意思?给我详细讲讲")
print(response)
当然,除了常规的语言模型调用,智谱 SDK 还支持向量模型调用。文本向量模型,将输入的文本信息进行向量化表示,以便于结合向量数据库为大模型提供外部知识库,提高大模型推理的准确性,模型支持自定义向量维度,建议选择 256、512、1024 或 2048 维度。
from zhipuai import ZhipuAI
# client = ZhipuAI(api_key="") # 请填写您自己的API Key
# 从文件导入所需要的secret keys
with open('keys.txt', 'r', encoding='utf-8') as f:
# 读取文件并按换行符分割成列表,过滤空行和注释行
keylist = [line.strip() for line in f.read().split('\n') if line.strip() and not line.startswith('#')]
client = ZhipuAI(api_key=keylist[0]) # 填写您自己的APIKey
# Embedding-2
response_2 = client.embeddings.create(
model="embedding-2", #填写需要调用的模型编码
input="你好",
)
print(response_2)
# Embedding-3
response_3 = client.embeddings.create(
model="embedding-3", #填写需要调用的模型编码
input=["美食非常美味,服务员也很友好。","这部电影既刺激又令人兴奋。","阅读书籍是扩展知识的好方法。"],
)
print(response_3)
print("------ Embedding-2 ------")
print(response_2.usage.total_tokens) # 打印 response_2 的总 token 数
print(len(response_2.data[0].embedding)) # 打印 response_2 的第一个 embedding 的长度
print(response_2.data[0].embedding[:10]) # 打印 response_2 的第一个 embedding 的前 10 个值
print("------ Embedding-3 ------")
print(response_3.usage.total_tokens) # 打印 response_3 的总 token 数
print(len(response_3.data[0].embedding)) # 打印 response_3 的第一个 embedding 的长度
print(response_3.data[0].embedding[:10]) # 打印 response_3 的第一个 embedding 的前 10 个值
RGA(Retrieval-Augmented Generation with Attention)是一种结合检索和生成的自然语言处理技术,通常用于提高生成模型的回答质量。RGA 系统通过检索相关文档并利用注意力机制来生成更准确和上下文相关的回答。
使用工具如 scikit-learn 和 scipy 进行数据清洗、特征提取和降维。
使用 faiss-cpu 从大规模文档库中检索与查询相关的文档。
使用注意力机制将检索到的文档与查询进行融合,生成更相关的回答。
使用生成模型(如 zhipuai)生成自然语言回答。
# 安装其他的依赖
!pip install faiss-cpu scikit-learn scipy
# 从文件导入所需要的secret keys
with open('keys.txt', 'r', encoding='utf-8') as f:
# 读取文件并按换行符分割成列表,过滤空行和注释行
keylist = [line.strip() for line in f.read().split('\n') if line.strip() and not line.startswith('#')]
client = ZhipuAI(api_key=keylist[0]) # 填写您自己的APIKey
"""
数据预处理:
对文章进行切分后存入到数据库,将文章分成了 150 个字符一段的小文本块
"""
import numpy as np
import faiss
# 定义要嵌入的文本
embedding_text = """
Multimodal Agent AI systems have many applications. In addition to interactive AI, grounded multimodal models could help drive content generation for bots and AI agents, and assist in productivity applications, helping to re-play, paraphrase, action prediction or synthesize 3D or 2D scenario. Fundamental advances in agent AI help contribute towards these goals and many would benefit from a greater understanding of how to model embodied and empathetic in a simulate reality or a real world. Arguably many of these applications could have positive benefits.
However, this technology could also be used by bad actors. Agent AI systems that generate content can be used to manipulate or deceive people. Therefore, it is very important that this technology is developed in accordance with responsible AI guidelines. For example, explicitly communicating to users that content is generated by an AI system and providing the user with controls in order to customize such a system. It is possible the Agent AI could be used to develop new methods to detect manipulative content - partly because it is rich with hallucination performance of large foundation model - and thus help address another real world problem.
For examples, 1) in health topic, ethical deployment of LLM and VLM agents, especially in sensitive domains like healthcare, is paramount. AI agents trained on biased data could potentially worsen health disparities by providing inaccurate diagnoses for underrepresented groups. Moreover, the handling of sensitive patient data by AI agents raises significant privacy and confidentiality concerns. 2) In the gaming industry, AI agents could transform the role of developers, shifting their focus from scripting non-player characters to refining agent learning processes. Similarly, adaptive robotic systems could redefine manufacturing roles, necessitating new skill sets rather than replacing human workers. Navigating these transitions responsibly is vital to minimize potential socio-economic disruptions.
Furthermore, the agent AI focuses on learning collaboration policy in simulation and there is some risk if directly applying the policy to the real world due to the distribution shift. Robust testing and continual safety monitoring mechanisms should be put in place to minimize risks of unpredictable behaviors in real-world scenarios. Our “VideoAnalytica" dataset is collected from the Internet and considering which is not a fully representative source, so we already go through-ed the ethical review and legal process from both Microsoft and University Washington. Be that as it may, we also need to understand biases that might exist in this corpus. Data distributions can be characterized in many ways. In this workshop, we have captured how the agent level distribution in our dataset is different from other existing datasets. However, there is much more than could be included in a single dataset or workshop. We would argue that there is a need for more approaches or discussion linked to real tasks or topics and that by making these data or system available.
We will dedicate a segment of our project to discussing these ethical issues, exploring potential mitigation strategies, and deploying a responsible multi-modal AI agent. We hope to help more researchers answer these questions together via this paper.
"""
# 定义每个文本块的大小
chunk_size = 150
# 将文本分割成多个块,每块大小为chunk_size
chunks = [embedding_text[i:i + chunk_size] for i in range(0, len(embedding_text), chunk_size)]
"""
文档检索:
将上述小文本块进行 Embedding,得到一个 1024 维的向量。
然后,我们将这些向量存入到一个向量数据库中,以便后续进行检索
"""
from sklearn.preprocessing import normalize
# 初始化一个空的嵌入列表
embeddings = []
# 遍历每个文本块,生成嵌入
for chunk in chunks:
response = client.embeddings.create(
model="embedding-2", # 使用指定的模型生成嵌入
input=chunk,
)
embeddings.append(response.data[0].embedding) # 将生成的嵌入添加到列表中
# 将嵌入转换为float32类型的NumPy数组并进行归一化
normalized_embeddings = normalize(np.array(embeddings).astype('float32'))
# 定义嵌入的维度
d = 1024
# 创建一个基于内积的FAISS索引
index = faiss.IndexFlatIP(d)
# 将归一化后的嵌入添加到索引中
index.add(normalized_embeddings)
# 获取索引中向量的总数
n_vectors = index.ntotal
# 输出向量的总数
n_vectors
"""
文档检索:
将上述小文本块进行 Embedding,得到一个 1024 维的向量。
然后,我们将这些向量存入到一个向量数据库中,以便后续进行检索
"""
from sklearn.preprocessing import normalize # 从 sklearn.preprocessing 模块中导入 normalize 函数
def match_text(input_text, index, chunks, k=2):
k = min(k, len(chunks)) # 将 k 设置为 k 和 chunks 长度的最小值,以确保 k 不超过 chunks 的数量
response = client.embeddings.create(
model="embedding-2",
input=input_text,
) # 调用 client.embeddings.create 方法,使用指定的模型("embedding-2")对 input_text 进行嵌入生成,并将结果存储在 response 中
input_embedding = response.data[0].embedding # 从 response 中提取生成的嵌入向量,并存储在 input_embedding 变量中
input_embedding = normalize(np.array([input_embedding]).astype('float32')) # 将 input_embedding 转换为 NumPy 数组并进行归一化处理
distances, indices = index.search(input_embedding, k) # 使用 index 对象的 search 方法,查找与 input_embedding 最相似的 k 个嵌入向量,并返回它们的距离和索引
for i, idx in enumerate(indices[0]): # 遍历 indices[0] 中的每个索引值
print(f"similarity: {distances[0][i]:.4f}\nmatching text: \n{chunks[idx]}\n") # 打印出相似度和对应的匹配文本块
input_text = "What are the risks of Agent AI systems ?"
matched_texts = match_text(input_text=input_text, index=index, chunks=chunks, k=2)
# 构造提问prompt
prompt = f"""
从文档
{matched_texts}
中找问题
{input_text}
的答案,
找到答案就仅使用文档语句回答,找不到答案就用自身知识回答并告诉用户该信息不是来自文档。
不要复述问题,直接开始回答。
"""
# 构建对话引擎
def get_completion_stream(prompt):
response = client.chat.completions.create(
model="glm-4-flash", # 填写需要调用的模型名称
messages=[
{"role": "user", "content": prompt},
],
stream=True,
)
if response:
for chunk in response:
content = chunk.choices[0].delta.content
print(content, end='', flush=True)
# 流式返回
get_completion_stream(prompt)
首先,需要安装 OpenAI 的 Python 库:
!pip install openai
接下来,使用智谱的 api_key
和 base_url
创建一个客户端:
from openai import OpenAI
# 从文件导入所需的密钥
with open('keys.txt', 'r', encoding='utf-8') as f:
# 读取文件并按换行符分割成列表,过滤空行和注释行
keylist = [line.strip() for line in f.read().split('\n') if line.strip() and not line.startswith('#')]
client = OpenAI(
api_key=keylist[0],
base_url="https://open.bigmodel.cn/api/paas/v4/"
)
使用客户端实现单轮对话:
completion = client.chat.completions.create(
model="glm-4-flash",
messages=[
{"role": "system", "content": "你是人工智能助手,擅长中英文对话,提供安全、准确的回答。"},
{"role": "user", "content": "你好,我叫李雷,1+1等于多少?"}
],
temperature=0.3,
)
print(completion.choices[0].message.content)
通过将模型的输出作为输入的一部分,可以实现多轮对话:
history = [
{"role": "system", "content": "你是人工智能助手,擅长中英文对话,提供安全、准确的回答。"}
]
def chat(query, history):
history.append({"role": "user", "content": query})
completion = client.chat.completions.create(
model="glm-4-flash",
messages=history,
temperature=0.3,
)
result = completion.choices[0].message.content
history.append({"role": "assistant", "content": result})
return result
print(chat("地球的自转周期是多少?", history))
print(chat("月球呢?", history))
流式输出可以通过以下方式实现:
def get_completion_stream(prompt):
response = client.chat.completions.create(
model="glm-4-flash",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
if response:
for chunk in response:
content = chunk.choices[0].delta.content
print(content, end='', flush=True)
get_completion_stream("大模型领域的MCTS是什么意思?给我详细讲讲")
我们会看到输出是打字效果,从而像是水流一样。
智谱 AI 支持使用 OpenAI 接口调用 CogView-3 模型:
response_cogview = client.images.generate(
model="cogview-3",
prompt="一个城市在水晶瓶中的场景",
)
# 输出图片
from IPython.display import display, Image
display(Image(url=response_cogview.data[0].url))
使用 OpenAI SDK 调用嵌入模型:
response_embedding = client.embeddings.create(
model="embedding-2",
input="你好",
)
# 输出前10个维度
print(response_embedding.data[0].embedding[:10]) # 只显示前10个
embedding-2
的维度为 1024,embedding-3
的维度为 2048,后者消耗更多 tokens。首先,确保安装所需的库:
!pip uninstall jwt -y
!pip install pyjwt
!pip install langchain langchain_community httpx_sse
# 如果后续还是报错 AttributeError: module “jwt“ has no attribute “encode“
# 请重启 kernel
使用 ChatZhipuAI
类创建客户端并进行简单对话:
from langchain_community.chat_models import ChatZhipuAI
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
import os
# 设置ZHIPU AI API密钥
with open('keys.txt', 'r', encoding='utf-8') as f:
# 读取文件并按换行符分割成列表,过滤空行和注释行
keylist = [line.strip() for line in f.read().split('\n') if line.strip() and not line.startswith('#')]
os.environ["ZHIPUAI_API_KEY"] = keylist[0]
chat = ChatZhipuAI(
model="glm-4-flash",
temperature=0.5,
)
messages = [
AIMessage(content="Hi."),
SystemMessage(content="Your role is a poet."),
HumanMessage(content="Write a short poem about AI in four lines."),
]
response = chat.invoke(messages)
print(response.content) # 显示AI生成的诗
利用 Message
对象进行更复杂的对话:
# 利用 Message 对象进行更复杂的对话
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
messages = [
AIMessage(content="你好!"),
SystemMessage(content="你的角色是一个唐代诗人"),
HumanMessage(content="写一首七言绝句"),
]
response = chat.invoke(messages)
print(response.content) # 输出示例:江畔孤舟听晚风,柳岸花明醉梦中。诗心一片随流水,明月清风共长空。
使用流式输出功能时,确保安装 httpx_sse
库,并使用回调处理输出:
from langchain_core.callbacks.manager import CallbackManager
from langchain_core.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
streaming_chat = ChatZhipuAI(
model="glm-4-flash",
api_key=keylist[0],
streaming=True,
temperature=0.5,
callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]),
)
messages = [
AIMessage(content="你好!"),
SystemMessage(content="你的角色是一个唐代诗人"),
HumanMessage(content="写一首七言律诗"),
]
streaming_chat(messages)
使用异步调用实现对话:
# 使用异步调用实现对话
import asyncio
import nest_asyncio
# 在 Jupyter 等环境中,可以使用 nest_asyncio 库来允许嵌套事件循环
nest_asyncio.apply()
async def main():
llm = ChatZhipuAI(
model="glm-4-flash",
api_key=keylist[0],
temperature=0.5,
)
messages = [
AIMessage(content="你好!"),
SystemMessage(content="你的角色是一个唐代诗人"),
HumanMessage(content="写一首七言绝句"),
]
response = await llm.agenerate([messages])
print(response)
asyncio.run(main())
使用 ZhipuAIEmbeddings
调用嵌入模型:
from langchain_community.embeddings import ZhipuAIEmbeddings
embedding = ZhipuAIEmbeddings(
model="embedding-3",
api_key=keylist[0],
)
emb = embedding.embed_query("你好呀呀")
print(len(emb), type(emb)) # 输出:2048 <class 'list'>
LlamaIndex 是一个灵活强大的数据框架,能够将自定义数据源连接到大型语言模型。我们将介绍如何将智谱 AI 与 LlamaIndex 集成,创建一个知识库。
首先,使用 pip 安装 LlamaIndex:
!pip install llama-index
由于 LlamaIndex 尚未直接支持智谱 AI,我们需要自定义一个集成。定义一个 ZhipuLLM
类,继承自 CustomLLM
:
# 创建 CustomLLM 类
from typing import Optional, List, Mapping, Any
from llama_index.core import SimpleDirectoryReader, SummaryIndex
from llama_index.core.callbacks import CallbackManager
from llama_index.core.llms import (
CustomLLM,
CompletionResponse,
CompletionResponseGen,
LLMMetadata,
)
from llama_index.core.llms.callbacks import llm_completion_callback
from llama_index.core import Settings
class ZhipuLLM(CustomLLM):
context_window: int = 3900
num_output: int = 256
model_name: str = "glm-4-flash"
dummy_response: str = "My response"
@property
def metadata(self) -> LLMMetadata:
"""Get LLM metadata."""
return LLMMetadata(
context_window=self.context_window,
num_output=self.num_output,
model_name=self.model_name,
)
@llm_completion_callback()
def complete(self, prompt: str, **kwargs: Any) -> CompletionResponse:
return CompletionResponse(text=self.dummy_response)
@llm_completion_callback()
def stream_complete(
self, prompt: str, **kwargs: Any
) -> CompletionResponseGen:
response = ""
for token in self.dummy_response:
response += token
yield CompletionResponse(text=response, delta=token)
接下来实现 ZhipuEmbedding
类,用于文本嵌入:
from llama_index.embeddings import BaseEmbedding
class ZhipuEmbedding(BaseEmbedding):
def __init__(self, instructor_model_name: str = "text_embedding", instruction: str = "Represent a document for semantic search:", **kwargs: Any) -> None:
self._model = instructor_model_name
self._instruction = instruction
super().__init__(**kwargs)
async def _aget_query_embedding(self, query: str) -> List[float]:
return self._get_query_embedding(query)
async def _aget_text_embedding(self, text: str) -> List[float]:
return self._get_text_embedding(text)
def _get_query_embedding(self, query: str) -> List[float]:
return invoke_embedding(query)
def _get_text_embedding(self, text: str) -> List[float]:
return invoke_embedding(text)
def _get_text_embeddings(self, texts: List[str]) -> List[List[float]]:
return [self._get_text_embedding(text) for text in texts]
现在将 ZhipuLLM
和 ZhipuEmbedding
结合,创建知识库:
# 定义 LLM 和嵌入模型
llm = ZhipuLLM()
embed_model = ZhipuEmbedding()
service_context = ServiceContext.from_defaults(
llm=llm, embed_model=embed_model
)
# 加载数据
documents = SimpleDirectoryReader(doc_path).load_data()
index = VectorStoreIndex.from_documents(documents, service_context=service_context)
# 查询并输出响应
query_engine = index.as_query_engine()
response = query_engine.query("广州全面取消限购,请分析原因")
print(response)
# 因 HuggingFace 网络问题在环境中暂未解决,一些模型无法下载,暂未运行成功
有时,我们不想直接调用大模型厂商的 API,而是希望使用自己的 API 接口,实际上调用现成的大模型接口。其实也是很简单的!
首先,确保安装以下库:
pip install uvicorn flask requests
以下是使用 FastAPI 构建的后端代码:
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import requests
import json
# 从文件导入所需的密钥
with open('keys.txt', 'r', encoding='utf-8') as f:
# 读取文件并按换行符分割成列表,过滤空行和注释行
keylist = [line.strip() for line in f.read().split('\n') if line.strip() and not line.startswith('#')]
BASE_URL = "https://open.bigmodel.cn/api/paas/v4/chat/completions"
app = FastAPI()
@app.get('/stream_chat')
async def stream_chat(param: str = "你好"):
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {keylist[0]}"
}
async def generate():
payload = {
"model": "glm-4-flash",
"messages": [{"role": "user", "content": param}],
"max_tokens": 8192,
"temperature": 0.8,
"stream": True
}
response = requests.post(BASE_URL, json=payload, headers=headers, stream=True)
for chunk in response.iter_lines():
if chunk:
chunk_str = chunk.decode('utf-8')
json_start_pos = chunk_str.find('{"id"')
if json_start_pos != -1:
json_str = chunk_str[json_start_pos:]
json_data = json.loads(json_str)
for choice in json_data.get('choices', []):
delta = choice.get('delta', {})
content = delta.get('content', '')
print(content)
yield content
return StreamingResponse(generate(), media_type='text/event-stream')
if __name__ == '__main__':
config = uvicorn.Config(app, host='0.0.0.0', port=8000)
server = uvicorn.Server(config)
await server.serve()
运行后,可以通过以下地址访问 API:
http://192.168.0.123:8000/stream_chat
http://127.0.0.1:8000/stream_chat
http://localhost:8000/stream_chat
您可以通过在 URL 后添加 ?param=...
来查询不同内容。
比如:
!wget http://127.0.0.1:8000/stream_chat?param=请写一篇一千字左右的文章,分析广州房产不限购
以下是使用 Flask 构建的后端代码:
from flask import Flask, request, Response
import requests
import json
BASE_URL = "https://open.bigmodel.cn/api/paas/v4/chat/completions"
app = Flask(__name__)
# 从文件导入所需的密钥
with open('keys.txt', 'r', encoding='utf-8') as f:
# 读取文件并按换行符分割成列表,过滤空行和注释行
keylist = [line.strip() for line in f.read().split('\n') if line.strip() and not line.startswith('#')]
@app.route('/stream_chat')
def stream_chat():
msg = request.args.get('param', None)
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {keylist[0]}"
}
def generate():
payload = {
"model": "glm-4-flash",
"messages": [{"role": "user", "content": msg}],
"max_tokens": 8192,
"temperature": 0.8,
"stream": True
}
response = requests.post(BASE_URL, json=payload, headers=headers, stream=True)
for chunk in response.iter_lines():
if chunk:
chunk_str = chunk.decode('utf-8')
json_start_pos = chunk_str.find('{"id"')
if json_start_pos != -1:
json_str = chunk_str[json_start_pos:]
json_data = json.loads(json_str)
for choice in json_data.get('choices', []):
delta = choice.get('delta', {})
content = delta.get('content', '')
yield content
return Response(generate(), mimetype='text/event-stream')
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000)
说明
启动后,您可以用类似于下面的 URL 来访问 API:
http://127.0.0.1:8000/stream_chat?param=请写一篇一千字左右的文章,分析广州房产不限购
通过浏览器或 requests
库发送 GET 请求,即可获取流式输出。