在我们的日常工作中,我们往往会遇到客户端需要实时获取服务端最新数据
的场景,例如聊天系统(WeChat/Telegram),股票行情查看软件(同花顺/富途),feed推送系统(Twitter/微博)等等。在实现这些需求的时候,我们的技术方案是有很多的,本文将会给大家介绍四种常见的实时获取服务端数据的方案,它们分别是:短轮询(polling)
,长轮询(long polling)
,长连接(WebSocket)
以及服务器事件推送(Sever-Sent Events, aka SSE)
。本篇文章将会介绍每种方案的基本原理,以及分别使用他们来实现同一个需求:动态事件列表,我们用到的技术栈是React + 原生NodeJS。
先说一下这个动态事件列表的需求:我们的服务器每隔5秒会产生一个新的事件,每个事件都有一个id字段以及timestamp字段,id和timestamp字段都是该事件生成的时间戳,前端会以列表的形式展示目前服务端已产生的所有事件信息,后面当服务器产生新的事件时,前端会获取到最新的事件并添加到页面列表的末尾。
下面是项目的运行效果图:
我相信大多数程序员或多或少都使用过轮询来获取服务端的资源,简单来说轮询就是客户端不停地调用服务端接口以获得最新的数据。下图是一个简单的轮询过程:
在上图中客户端在发起请求后服务端会立即响应,不过因为这时服务端的数据没有更新所以返回了一个空的结果给客户端。客户端在等待了一段时间后(可能是几秒),再次请求服务端的数据,这时由于服务端的数据发生了更新,所以会给客户端返回最新的数据,客户端在拿到数据后等待一下然后继续发送请求,如此反复。
下面就让我们用轮询来实现动态事件列表的需求, 首先是Node代码:
// node/polling.js
const http = require('http')
const url = require('url')
// 时间列表
const events = []
// 最新生成的事件时间
let latestTimestamp = 0
// 事件生产者
const EventProducer = () => {
const event = {
id: Date.now(),
timestamp: Date.now()
}
events.push(event)
latestTimestamp = event.timestamp
}
// 每隔5秒生成一个新的事件
setInterval(() => {
EventProducer()
}, 5000)
const server = http.createServer((req, resp) => {
const urlParsed = url.parse(req.url, true)
resp.setHeader('Access-Control-Allow-Origin', '*')
resp.setHeader('Origin', '*')
if (urlParsed.pathname == '/events') {
// 每次客户端都带上它最后拿到的事件时间戳来获取新的事件
const timestamp = parseInt(urlParsed.query['timestamp'])
// 判断客户端是否拿到最新事件
if (timestamp < latestTimestamp) {
// 将所有没发送过给这个客户端的事件一次性发送出去
resp.write(JSON.stringify(events.filter(event => event.timestamp > timestamp)))
}
resp.end()
}
})
server.listen(8080, () => {
console.log('server is up')
})
上面的代码十分简单,我们实现了一个events
API,前端每次都会带上上一次的时间戳来请求这个时间点后的最新事件。接着再来看一下前端的实现:
// react/Polling.jsx
import { useEffect, useRef, useState } from 'react'
const fetchLatestEvents = async (timestamp) => {
// 获取最新的事件
const body = await fetch(`http://localhost:8080/events?timestamp=${timestamp}`)
if (body.ok) {
const json = await body.json()
return json
} else {
console.error('failed to fetch')
}
}
function App() {
// 记录当前客户端拿到的最新事件的timestamp
const timestampRef = useRef(0)
const [events, setEvents] = useState([])
useEffect(() => {
const timer = setInterval(async () => {
const latestEvents = await fetchLatestEvents(timestampRef.current)
if (latestEvents && latestEvents.length) {
timestampRef.current = latestEvents[latestEvents.length - 1].timestamp
setEvents(events => [...events, ...latestEvents])
}
}, 3000)
return () => {
clearInterval(timer)
}
}, [])
return (
<div className="App">
<h2>event list</h2>
<ol>
{
events.map(event => {
return <li key={event.id}>{`${event.id}`}</li>
})
}
</ol>
</div>
);
}
export default App
打开Chrome的Devtools我们发现,前端每隔3s向后端请求一次,请求得相当频繁,并且在后端没有产生新数据的时候,很多请求的返回值是空的,也就是说大多数的网络资源都被浪费了
。
请求有很多空响应
从上面的代码我们可以看出,短轮询这个技术方案最大的优点就是实现简单
,而它的缺点也很明显:
一般生产级别的应用都不会使用短轮询这个方案,除非你只是写一些给少数人用的系统。
看完了上面关于短轮询的介绍,我们知道了轮询有两个重大的缺陷:一个是无用请求过多,另外一个是数据实时性差
。为了解决这两个问题,某些聪明的程序员就发明了另外一个方案:长轮询。下面是一个简单的长轮询示意图:
在上图中,客户端发起请求后,服务端发现当前没有新的数据,这个时候服务端没有立即返回请求,而是将请求挂起
,在等待一段时间后(一般为30s或者是60s),发现还是没有数据更新的话,就返回一个空结果给客户端。客户端在收到服务端的回复后,立即再次向服务端发送新的请求
。这次服务端在接收到客户端的请求后,同样等待了一段时间,这次好运的是服务端的数据发生了更新,服务端给客户端返回了最新的数据。客户端在拿到结果后再次发送下一个请求,如此反复。
接着就让我们使用长轮询来动态实现事件列表的功能,先看一下后端代码:
// node/long-polling.js
const http = require('http')
const url = require('url')
const events = []
let timers = new Set()
// 当前挂起的请求
let subscribers = new Set()
const EventProducer = () => {
const event = {
id: Date.now(),
timestamp: Date.now()
}
events.push(event)
// 通知所有挂起的请求
subscribers.forEach(subscriber => {
subscriber.resp.write(JSON.stringify(events.filter(event => event.timestamp > subscriber.timestamp)))
subscriber.resp.end()
})
// 重置subscribers
subscribers.clear()
// 取消请求的超时回调
timers.forEach(timer => clearTimeout(timer))
// 重置timers
timers.clear()
}
// 5秒生成一个事件
setInterval(() => {
EventProducer()
}, 5000)
const server = http.createServer((req, resp) => {
const urlParsed = url.parse(req.url, true)
resp.setHeader('Access-Control-Allow-Origin', '*')
resp.setHeader('Origin', '*')
if (urlParsed.pathname == '/list') {
// 发送服务端现存事件
resp.write(JSON.stringify(events))
resp.end()
} else if (urlParsed.pathname == '/subscribe') {
const timestamp = parseInt(urlParsed.query['timestamp'])
const subscriber = {
timestamp,
resp
}
// 新建的连接挂起来
subscribers.add(subscriber)
// 30s超时,自动关闭连接
const timer = setTimeout(() => {
resp.end()
timers.delete(timer)
}, 30000)
// 客户端主动断开连接
req.on('close', () => {
subscribers.delete(subscriber)
clearTimeout(timer)
})
timers.add(timer)
}
})
server.listen(8080, () => {
console.log('server is up')
})
上面的代码中每来一个新的连接我们都会将它挂起来(保存在set里面),然后当有新的事件产生时再将所有该客户端没有获取过的事件返回给它,接着来看一下前端代码的实现:
// react/LongPolling.jsx
import { useEffect, useRef, useState } from 'react'
const fetchLatestEvents = async (timestamp) => {
const body = await fetch(`http://localhost:8080/subscribe?timestamp=${timestamp}`)
if (body.ok) {
const json = await body.json()
return json
} else {
console.error('failed to fetch')
}
}
const listEvents = async () => {
const body = await fetch(`http://localhost:8080/list`)
if (body.ok) {
const json = await body.json()
return json
} else {
console.error('failed to fetch')
}
}
function App() {
const timestampRef = useRef(0)
const eventsRef = useRef([])
const [refresh, setRefresh] = useState(false)
useEffect(() => {
const fetchTask = async () => {
if (timestampRef.current === 0) {
// 初次加载
const currentEvents = await listEvents()
timestampRef.current = currentEvents[currentEvents.length - 1].timestamp
eventsRef.current = [...eventsRef.current, ...currentEvents]
}
const latestEvents = await fetchLatestEvents(timestampRef.current)
if (latestEvents && latestEvents.length) {
timestampRef.current = latestEvents[latestEvents.length - 1].timestamp
eventsRef.current = [...eventsRef.current, ...latestEvents]
}
}
fetchTask()
.catch(err => {
console.error(err)
})
.finally(() => {
// 触发下次加载
setRefresh(refresh => !refresh)
})
}, [refresh])
return (
<div className="App">
<h2>event list</h2>
<ol>
{
eventsRef.current.map(event => {
return <li key={event.id}>{`${event.id}`}</li>
})
}
</ol>
</div>
);
}
export default App
值得注意的是,这个时候,我们打开浏览器的调试工具可以发现浏览器每一次发出的请求都不会立马收到回复,而是pending一段时间后(大概是5秒)才会有结果,并且结果里面都是有数据的。
长轮询很完美地解决了短轮询的问题,首先服务端在没有数据更新的情况下没有给客户端返回数据,所以避免了客户端大量的重复请求。再者客户端在收到服务端的返回后,马上发送下一个请求,这就保证了更好的数据实时性。不过长轮询也不是完美的:
从网上找的资料来看之前的WebQQ和Web微信都是基于长轮询实现的,现在是不是我就不知道了,有兴趣的读者可以自行验证一下。
上面说到长轮询不适用于服务端资源频繁更新的场景,而解决这类问题的一个方案就是WebSocket。用最简单的话来介绍WebSocket就是:客户端和服务器之间建立一个持久的长连接,这个连接是双工的,客户端和服务端都可以实时地给对方发送消息。下面是WebSocket的图示:
在上图中,首先客户端会给服务端发送一个HTTP请求,这个请求的Header会告诉服务端它想基于WebSocket协议通信,如果服务端支持升级协议的话,会给客户端发送一个Switching Protocal的响应,它们之间后面都是基于WebSocket协议来通信了。
我们再来看一下如何使用WebSocket来实现动态事件列表的需求,下面是后端代码:
// node/websocket.js
const WebSocket = require('ws')
const events = []
let latestTimestamp = Date.now()
const clients = new Set()
const EventProducer = () => {
const event = {
id: Date.now(),
timestamp: Date.now()
}
events.push(event)
latestTimestamp = event.timestamp
// 推送给所有连接着的socket
clients.forEach(client => {
client.ws.send(JSON.stringify(events.filter(event => event.timestamp > client.timestamp)))
client.timestamp = latestTimestamp
})
}
// 每5秒生成一个新的事件
setInterval(() => {
EventProducer()
}, 5000)
// 启动socket服务器
const wss = new WebSocket.Server({ port: 8080 })
wss.on('connection', (ws, req) => {
console.log('client connected')
// 首次连接,推送现存事件
ws.send(JSON.stringify(events))
const client = {
timestamp: latestTimestamp,
ws,
}
clients.add(client)
ws.on('close', _ => {
clients.delete(client)
})
})
上面的代码中客户端连接服务端的时候,服务端会记住客户端的时间戳,当新事件产生的时候会给客户端推送所有的新事件。下面是前端代码实现:
// react/LongPolling.jsx
import { useEffect, useRef, useState } from 'react'
function App() {
const timestampRef = useRef(0)
const eventsRef = useRef([])
const [_, setRefresh] = useState(false)
useEffect(() => {
const ws = new WebSocket(`ws://localhost:8080/ws?timestamp=${timestampRef.current}`)
ws.addEventListener('open', e => {
console.log('successfully connected')
})
ws.addEventListener('close', e => {
console.log('socket close')
})
ws.addEventListener('message', (ev) => {
const latestEvents = JSON.parse(ev.data)
if (latestEvents && latestEvents.length) {
timestampRef.current = latestEvents[latestEvents.length - 1].timestamp
eventsRef.current = [...eventsRef.current, ...latestEvents]
setRefresh(refresh => !refresh)
}
})
return () => {
ws.close()
}
}, [])
return (
<div className="App">
<h2>event list</h2>
<ol>
{
eventsRef.current.map(event => {
return <li key={event.id}>{`${event.id}`}</li>
})
}
</ol>
</div>
);
}
export default App
打开Chrome的网络调试工具点击ws,你会发现客户端和服务端只有一个websocket连接,它们所有的通信都是发生在这个连接上面的:
总的来说,我认为WebSocket有下面这些优点:
同样WebSocket也不是完美的,它有下面这些问题:
有状态的
。这就意味着在扩容的时候很麻烦,系统设计也会较复杂。WebSocket的应用场景是一些实时性要求很高的而且需要双工通信
的系统例如IM软件等。
Server-Sent Events简称SSE
,是一个基于HTTP协议的服务端向客户端推送数据的技术
。下面是一个简单的SSE图示:
在上图中,客户端向服务端发起一个持久化的HTTP连接
,服务端接收到请求后,会挂起客户端的请求,有新消息时,再通过这个连接将数据推送给客户端。这里需要指出的是和WebSocket长连接不同,SSE的连接是单向的
,也就是说它不允许客户端向服务端发送消息
。
和上面一样,我们使用SSE来实现一下动态事件列表的需求,先看后端代码:
// node/sse.js
const http = require('http')
const events = []
const clients = new Set()
let latestTimestamp = Date.now()
const headers = {
// 告诉HTTP连接,它是一个event-stream的请求
'Content-Type': 'text/event-stream',
// 保持HTTP连接不断开
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
'Access-Control-Allow-Origin': '*',
"Origin": '*'
}
const EventProducer = () => {
const event = {
id: Date.now(),
timestamp: Date.now()
}
events.push(event)
latestTimestamp = event.timestamp
clients.forEach(client => {
client.resp.write(`id: ${(new Date()).toLocaleTimeString()}\n`)
// 后面的两个\n\n一定要有,可以理解为服务端先客户端推送信息的特殊格式
client.resp.write(`data: ${JSON.stringify(events.filter(event => event.timestamp > client.timestamp))}\n\n`)
client.timestamp = latestTimestamp
})
}
// 每5秒生成一个新的事件
setInterval(() => {
EventProducer()
}, 5000)
const server = http.createServer((req, resp) => {
const urlParsed = url.parse(req.url, true)
if (urlParsed.pathname == '/subscribe') {
resp.writeHead(200, headers)
// 发送现存事件
resp.write(`id: ${(new Date()).toLocaleTimeString()}\n`)
resp.write(`data: ${JSON.stringify(events)}\n\n`)
const client = {
timestamp: latestTimestamp,
resp
}
clients.add(client)
req.on('close', () => {
clients.delete(client)
})
}
})
server.listen(8080, () => {
console.log('server is up')
})
在上面的代码中,每次客户端给服务端发送请求后,服务端先给客户端返回所有的现存事件然后将该请求挂起,在新的事件生成时再给客户端返回所有的新事件。下面是前端代码实现:
// react/SSE.jsx
import { useEffect, useRef, useState } from 'react'
function App() {
const timestampRef = useRef(0)
const eventsRef = useRef([])
const [, setRefresh] = useState(false)
useEffect(() => {
const source = new EventSource(`http://localhost:8080/subscribe?timestamp=${timestampRef.current}`)
source.onopen = () => {
console.log('connected')
}
source.onmessage = event => {
const latestEvents = JSON.parse(event.data)
if (latestEvents.length) {
timestampRef.current = latestEvents[latestEvents.length - 1].timestamp
eventsRef.current = [...eventsRef.current, ...latestEvents]
setRefresh(refresh => !refresh)
}
}
source.addEventListener('error', (e) => {
console.error('Error: ', e);
})
return () => {
source.close()
}
}, [])
return (
<div className="App">
<h2>event list</h2>
<ol>
{
eventsRef.current.map(event => {
return <li key={event.id}>{`${event.id}`}</li>
})
}
</ol>
</div>
);
}
export default App
打开Chrome的网络调试工具,会发现HTTP请求变成了EventStream类型,而且服务端给客户端所有的事件推送都在这个连接上,而无需建立新的连接。
在我看来,SSE的技术有下面的优点:
SSE的问题也很明显:
SSE技术适合一些只需要服务端单向推送事件给客户端的场景,例如股票行情推送软件。
在本篇文章中我通过图解和实际代码给大家介绍了四种不同的和服务端保持数据同步的方案,看完本篇文章后,相信你后面再遇到类似的需求时,除了短轮询你会有更多的方案可以选择。同时这里也还是要强调一下:任何一种技术都不是瑞士军刀,都有自己适用和不适用的场景,一定要根据自己的实际情况进行取舍,从而选择最适合的方案,千万不要为了用某个技术而用某个技术
!