在Python的FastAPI框架中,你可以使用websockets来建立双向通信
首先,确保你已经安装了FastAPI和Uvicorn,以及websockets库。如果没有安装,可以使用以下命令进行安装:
pip install fastapi uvicorn websockets
然后,你可以创建一个FastAPI应用,并使用websockets来并行发送/接收消息。以下是一个简单的示例:
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
import asyncio
import websockets
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<div id="messages"></div>
<input id="message-input" type="text" placeholder="Type a message...">
<button id="send-message">Send</button>
<script>
var ws = new WebSocket("ws://localhost/ws");
ws.onmessage = function(event) {
var messages = document.getElementById("messages");
var message = document.createElement("div");
message.textContent = event.data;
messages.appendChild(message);
};
document.getElementById("send-message").onclick = function() {
var input = document.getElementById("message-input");
ws.send(input.value);
input.value = "";
};
</script>
</div>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
connected_clients = set()
async def websocket_handler(websocket: websockets.WebSocketServerProtocol, _: str):
await websocket.accept()
connected_clients.add(websocket)
try:
while True:
message = await websocket.recv()
for client in connected_clients:
if client != websocket:
await client.send(message)
except websockets.exceptions.ConnectionClosed:
connected_clients.remove(websocket)
@app.websocket("/ws")
async def websocket_endpoint(websocket: websockets.WebSocketServerProtocol):
await websocket_handler(websocket, "")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
在这个示例中,我们创建了一个简单的聊天应用。当用户访问根URL时,会返回一个包含HTML页面的响应,该页面包含用于发送和接收消息的JavaScript代码。
websocket_handler
函数处理与客户端的连接、接收消息以及将消息广播给其他已连接的客户端。我们使用connected_clients
集合来跟踪所有已连接的客户端。
最后,我们定义了一个WebSocket端点/ws
,它将处理传入的WebSocket连接并调用websocket_handler
函数。
要运行此示例,请保存代码到一个Python文件(例如main.py
),然后在终端中运行以下命令:
uvicorn main:app --reload
领取专属 10元无门槛券
手把手带您无忧上云