项目上前后端采用websocket通信,但是websocket连接经常会断开,虽然有重连机制,但是在重连的过程中,以及重连失败时,会影响前端数据的即时刷新。我们也不可能每次一出现问题就要求用户重启浏览器。因此需要设计一个websocket降级方案。
前端处理:当前端websocket断开或者超过一定时间没有收到消息时,将会自动切换为轮询,主动查询服务器最近是否有发送给前端的websocket消息。当websocket重连成功并收到消息后,取消轮询。
后端处理:当后端发送websocket请求时,对发送的消息进行缓存,当前端进行查询时,返回发送给该前端的消息。同时将超过一定时长的消息(过期消息),或者前端已查询过的消息(确保已经收到了),从缓存中剔除,避免oom。
这个方案前端的实现比较简单,不再赘述。下面着重写一下后端的实现。
首先我们要定义一个缓存服务接口,他需要实现的基本方法显然有三个,Get,Set,GetAll。
type CacheServer interface {
Get(key string) []messageCache
Set(receiver string, message string, time time.Time) error
GetAll() []messageCache
}
定义服务结构,包括服务名,服务缓存,缓存超时时间。
type cache struct {
Name string
Cache []messageCache
Timeout time.Duration
}
定义缓存结构。包含接收者,消息体,消息发送时间。
type messageCache struct {
receiver string
message string
time time.Time
}
其实定义完接口和结构体,任务就完成了一半了。接下来只要按照接口定义填一些实现。
Set方法实现,这里没有直接传入messageCache 类型的数据,也是因为不想把包内的数据类型扩散出去,外部调用不必知道包内的数据结构。每当读写数据时,先清理掉已经超时的数据。
func (s *cache) Set(receiver string, message string, sendtime time.Time) error {
s.clearOutTimeCache()
value := messageCache{
receiver: receiver,
message: message,
time: sendtime,
}
s.Cache = append(s.Cache, value)
return nil
}
GetAll 方法实现,清理完超时数据后,直接返回剩余的全部缓存。
func (s *cache) GetAll() []messageCache {
s.clearOutTimeCache()
return s.Cache
}
Get方法实现,在GetAll的基础上,加入了key值的判断,并且在用户获取完数据后,清理掉该key值的缓存。
func (s *cache) Get(key string) []messageCache {
s.clearOutTimeCache()
rlt := make([]messageCache, 1)
newCache := make([]messageCache, 1)
for i := 0; i < len(s.Cache); i++ {
if s.Cache[i].receiver == key {
rlt = append(rlt, s.Cache[i])
} else {
newCache = append(newCache, s.Cache[i])
}
}
s.Cache = newCache
return rlt
}
clearOutTimeCache实现,把超时的缓存剔除出去。
func (s *cache) clearOutTimeCache() {
for startindex := 0; startindex < len(s.Cache); startindex++ {
if time.Now().Sub(s.Cache[startindex].time) < s.Timeout {
s.Cache = s.Cache[startindex:]
break
} else if startindex == (len(s.Cache) - 1) {
s.Cache = make([]messageCache, 0)
}
}
}
我们的包还得暴露一个新建实例的方法给外部,一共两个参数,实例名,超时时间。
func NewCache(name string, timeout time.Duration) CacheServer {
return &cache{
Name: name,
Cache: make([]messageCache, 1),
Timeout: timeout,
}
}
好了这样我们的一个简单的cacheServer包就完成了。下面写一个测试代码来看一下效果。
直接把main文件贴上来了。两个goroutine,一个启动服务,一个模拟websocket消息发送。启动的服务三个接口,一个health页面,一个根据key获取缓存message,一个获取所有缓存。好运行下代码看看。
package main
import (
"cacheServer/cacheServer"
"fmt"
"math/rand"
"net/http"
"strconv"
"time"
)
func main() {
MessageCache := cacheServer.NewCache("MessageCache", time.Second*5)
go Start(":8080", MessageCache)
go MockWebSocketMessage(MessageCache)
select {}
}
func MockWebSocketMessage(cache cacheServer.CacheServer) {
rand.Seed(time.Now().UnixNano())
var i int = 0
for {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
var receiver string
if i%2 == 0 {
receiver = "HYC"
} else {
receiver = "ZMN"
}
message := "send message " + strconv.Itoa(i) + " times"
cache.Set(receiver, message, time.Now())
i = i + 1
}
}
func Start(Port string, cache cacheServer.CacheServer) error {
mux := http.NewServeMux()
mux.HandleFunc("/", health)
mux.HandleFunc("/syncCache", syncCache(cache))
mux.HandleFunc("/getCache", getCache(cache))
svr := &http.Server{Addr: Port, Handler: mux}
err := svr.ListenAndServe()
return err
}
func health(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "server work")
}
func getCache(cache cacheServer.CacheServer) func(w http.ResponseWriter, r *http.Request) {
fmt.Printf("getCache Success\n")
return func(w http.ResponseWriter, r *http.Request) {
key := GetUrlArg(r, "key")
fmt.Fprintf(w, "Cache Key:%s,%v", key, cache.Get(key))
}
}
func syncCache(cache cacheServer.CacheServer) func(w http.ResponseWriter, r *http.Request) {
fmt.Printf("syncCache Success\n")
return func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "All Cache:%v \n", cache.GetAll())
}
}
func GetUrlArg(r *http.Request, name string) string {
var arg string
values := r.URL.Query()
arg = values.Get(name)
return arg
}
先查询全部缓存,等一会查询一次,确保缓存数据正常被清除。咱们NewCache时传入的超时时间是5秒,模拟随机发送消息是1-1000毫秒,因此预期查到的缓存数量在8-12条之间。随着刷新可以看到过期的缓存被清除。
再试试根据key查询,因为模拟了两个人在发消息,因此单个人预计在4-6条左右,并且清理掉已查询的数据,快速刷新两次应该第二次只能查到0-2条。
这样一个简单的cacheServer就完成了,测试下来数据也正确。那么还有哪些未完成的工作呢。
首先我们的服务大概率是集群部署的,在服务内部使用缓存不可避免的存在同步问题。之前可能大家会疑惑为啥通过key获取的缓存要清除掉,获取所有的缓存就不用清理掉。因为获取所有缓存的接口是准备留给服务器之间同步用的。我们不会允许用户去获取其他用户收到的消息。
其次是cacheServer里messageCache不是一个良好的定义,receiver和message与websocket消息的含义耦合太紧,可以换为更松散的定义。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有