在进行WebSocket协议连接或者WebSocket接口测试的时候,一旦遇到超大连接数量的场景中时,之前使用过的实现 Java-WebSocket
以及 Netty-WebSocket
两种实现就会显示出巨大的性能差距。当然 Netty-WebSocket
就是为了解决性能问题而来的。
so,今天我就来展示一下两个 WebSocket
实现在使用中具体的差异,本文集中在资源占用上,特别是线程占用。
据可靠资料显示,两者的差异主要以在管理 WebSocket
连接时使用的线程数不同,以下是使用org.java_websocket.client.WebSocketClient
创建WebSocket客户端时,它会创建以下几个线程:
WebSocketClient.connect()
方法时,WebSocket客户端会创建一个单独的线程来处理连接建立的过程。这个线程负责建立实际的WebSocket连接。WebSocket.send()
方法发送消息时,消息将被发送到这个线程,然后由该线程负责将消息写入到底层的WebSocket连接中。这些线程的存在使得WebSocket
客户端能够在后台处理连接、发送和接收消息,而不会阻塞主线程。这有助于确保应用程序在与WebSocket服务器进行通信时能够保持响应性。
据资料显示不同版本的实现线程是不一样的,这里我没有找到具体的版本差异,也没有进行测试。
Netty其实并不存在上面这个问题,因为WebSocket连接和线程数并没有强的绑定关系。Netty只有一个处理事件的 io.netty.channel.EventLoopGroup
需要使用线程池设计,其他均没有设置线程和创建线程的设置。
这里我用Go写了一个 WebSocket
的服务端,一来省事儿,二来性能高足以应付接下来的测试。服务端代码如下:
// CreateServer
// @Description: 重建一个WebSocket服务
// @param port 端口
// @param path 路径
func CreateServer(port int, path string) {
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
HandshakeTimeout: 5 * time.Second,
}
http.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
conn, _ := upgrader.Upgrade(w, r, nil)
conn.WriteMessage(websocket.TextMessage, []byte("msg"))
for {
msgType, msg, err := conn.ReadMessage()
if err != nil {
log.Println(err)
return
}
fmt.Printf("%s receive: %s\n", conn.RemoteAddr(), string(msg))
if err = conn.WriteMessage(msgType, msg); err != nil {
log.Println("ffahv")
return
}
}
})
http.ListenAndServe(":"+strconv.Itoa(port), nil)
}
首先测试一下空的Java进行消耗的线程数详情,测试客户端如下:
import com.funtester.frame.SourceCode
class Empty extends SourceCode{
static void main(String[] args) {
waitForKey("按任意键退出")
}
}
运行时,进行监控:
空Java进行
只创建1个WebSocket客户端,测试代码如下:
package com.funtest.websocket
import com.funtester.frame.SourceCode
import com.funtester.socket.WebSocketFunClient
class WebSocket extends SourceCode {
static String url = "ws://localhost:12345/test"
static void main(String[] args) {
def instance = WebSocketFunClient.getInstance(url)
instance.connect()
instance.send("Hello FunTester")
waitForKey("按任意键退出")
}
}
运行线程监控:
WebSocket单线程
逻辑同上,代码如下:
package com.funtest.websocket
import com.funtester.frame.SourceCode
import com.funtester.socket.netty.WebSocketConnector
import groovy.util.logging.Log4j2
@Log4j2
class NettySocket extends SourceCode {
static void main(String[] args) {
String serverIp = "ws://127.0.0.1";
int serverPort = 12345;
def h = {String x ->
log.info("收到消息:{}", x)
}
WebSocketConnector client = new WebSocketConnector(serverIp, serverPort, "/test",h)
client.connect()
client.getHandshakeFuture().get()
client.sendText("Hello FunTester").get()
waitForKey("按任意键退出")
}
}
运行时线程监控:
Netty-WebSocket
Java-WebSocket额外创建了3个线程,而Netty-WebSocket额外创建了1个线程。这里我采取了默认的 io.netty.channel.EventLoopGroup
创建策略。
测试代码如下:
package com.funtest.websocket
import com.funtester.frame.SourceCode
import com.funtester.socket.netty.WebSocketConnector
import groovy.util.logging.Log4j2
@Log4j2
class NettySocket extends SourceCode {
static void main(String[] args) {
String serverIp = "ws://127.0.0.1";
int serverPort = 12345;
def h = {String x ->
log.info("收到消息:{}", x)
}
1000.times {
WebSocketConnector client = new WebSocketConnector(serverIp, serverPort, "/test", h)
client.connect()
client.getHandshakeFuture().get()
client.sendText("Hello FunTester").get()
}
waitForKey("按任意键退出")
}
}
运行时线程监控:
Netty1000连接
由于创建实在太慢了,我测试了100个连接,测试代码如下:
package com.funtest.websocket
import com.funtester.frame.SourceCode
import com.funtester.socket.WebSocketFunClient
class WebSocket extends SourceCode {
static String url = "ws://localhost:12345/test"
static void main(String[] args) {
100.times {
fun {
def instance = WebSocketFunClient.getInstance(url)
instance.connect()
instance.send("Hello FunTester")
}
} waitForKey("按任意键退出")
}
}
运行时线程监控:
WebSocket1000连接
如果我们只是单纯测试连接数量的话,并没有必要创建很多处理WebSocket事件的线程,我们可以直接写死成1个线程。下面是测试结果:
Netty极限1000连接
Netty稳如狗!
在本次的实践中,我对Netty-WebSocket的实现又做了一批更新,主要增加WebSocket接口路径和消息处理闭包功能,并且偷偷修复了BUG。
package com.funtester.socket.netty
import com.funtester.frame.execute.ThreadPoolUtil
import groovy.util.logging.Log4j2
import io.netty.bootstrap.Bootstrap
import io.netty.channel.*
import io.netty.channel.group.ChannelGroup
import io.netty.channel.group.DefaultChannelGroup
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http.DefaultHttpHeaders
import io.netty.handler.codec.http.HttpClientCodec
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.websocketx.*
import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.util.concurrent.GlobalEventExecutor
@Log4j2
class WebSocketConnector {
static Bootstrap bootstrap = new Bootstrap()
/**
* 处理事件的线程池
*/
static EventLoopGroup group = new NioEventLoopGroup(ThreadPoolUtil.getFactory("N"))
static {
bootstrap.group(group).channel(NioSocketChannel.class)
}
/**
* 用于记录和管理所有客户端的channel
*/ static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
WebSocketClientHandshaker handShaker
ChannelPromise handshakeFuture
String host
int port
String path
/**
* 网络通道
*/
Channel channel
WebSocketIoHandler handler
/**
* WebSocket协议类型的模拟客户端连接器构造方法
*
* @param serverIp
* @param serverSocketPort
* @param group
*/ WebSocketConnector(String host, int port, String path, Closure closure = null) {
this.host = host
this.port = port
this.path = path
String URL = this.host + ":" + this.port + path
URI uri = new URI(URL)
handler = new WebSocketIoHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()))
if (closure != null) handler.closure = closure
bootstrap.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline()
pipeline.addLast(new HttpClientCodec())
pipeline.addLast(new ChunkedWriteHandler())
pipeline.addLast(new HttpObjectAggregator(1024 * 1024))
pipeline.addLast(handler)
}
})
}
/**
* 连接
*/
void connect() {
try {
try {
ChannelFuture future = bootstrap.connect(this.host - "ws://" - "wss://", this.port).sync()
this.channel = future.channel()
clients.add(channel)
} catch (e) {
log.error("创建channel失败", e)
}
} catch (Exception e) {
log.error("连接服务失败", e)
} finally {
this.handshakeFuture = handler.handshakeFuture()
}
}
/**
* 发送文本消息
*/
ChannelFuture sendText(String msg) {
channel.writeAndFlush(new TextWebSocketFrame(msg))
}
/**
* 发送ping消息
*/
ChannelFuture ping() {
channel.writeAndFlush(new PingWebSocketFrame())
}
/**
* 关闭
*/
void close() {
group.shutdownGracefully()
}
}
package com.funtester.socket.netty
import groovy.util.logging.Log4j2
import io.netty.channel.*
import io.netty.channel.group.ChannelGroup
import io.netty.channel.group.DefaultChannelGroup
import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.websocketx.*
import io.netty.handler.timeout.IdleState
import io.netty.handler.timeout.IdleStateEvent
import io.netty.util.concurrent.GlobalEventExecutor
/**
* WebSocket协议类型的模拟客户端IO处理器类
*/
@Log4j2
class WebSocketIoHandler extends SimpleChannelInboundHandler<Object> {
/**
* 用于记录和管理所有客户端的channel
*/ private ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
private final WebSocketClientHandshaker handShaker
Closure closure
private ChannelPromise handshakeFuture
WebSocketIoHandler(WebSocketClientHandshaker handShaker) {
this.handShaker = handShaker
}
ChannelFuture handshakeFuture() {
return handshakeFuture
}
@Override
void handlerAdded(ChannelHandlerContext ctx) {
handshakeFuture = ctx.newPromise()
}
@Override
void channelActive(ChannelHandlerContext ctx) {
handShaker.handshake(ctx.channel());
}
@Override
void channelInactive(ChannelHandlerContext ctx) {
ctx.close()
try {
super.channelInactive(ctx)
} catch (Exception e) {
log.error("channelInactive 异常.", e)
}
log.warn("WebSocket链路与服务器连接已断开.")
}
@Override
void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel()
if (!handShaker.isHandshakeComplete()) {
try {
handShaker.finishHandshake(ch, (FullHttpResponse) msg)
handshakeFuture.setSuccess()
} catch (WebSocketHandshakeException e) {
log.warn("WebSocket Client failed to connect", e)
handshakeFuture.setFailure(e)
}
return
}
WebSocketFrame frame = (WebSocketFrame) msg
if (frame instanceof TextWebSocketFrame) {
if (closure != null) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame
closure(textFrame.text())
}
} else if (frame instanceof CloseWebSocketFrame) {
log.info("WebSocket Client closing")
ch.close()
}
}
@Override
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("WebSocket链路由于发生异常,与服务器连接已断开.", cause)
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause)
}
ctx.close()
super.exceptionCaught(ctx, cause)
}
@Override
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt
// 如果写通道处于空闲状态,就发送心跳命令
if (IdleState.WRITER_IDLE == event.state() || IdleState.READER_IDLE == event.state()) {
// 发送心跳数据
def channel = ctx.channel()
channel.writeAndFlush(new TextWebSocketFrame("dsf"))
}
} else {
super.userEventTriggered(ctx, evt)
}
}
}
FunTester原创专题推荐~