前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java-WebSocket vs Netty-WebSocket 资源占用

Java-WebSocket vs Netty-WebSocket 资源占用

作者头像
FunTester
发布2023-10-10 14:11:06
5580
发布2023-10-10 14:11:06
举报
文章被收录于专栏:FunTester

在进行WebSocket协议连接或者WebSocket接口测试的时候,一旦遇到超大连接数量的场景中时,之前使用过的实现 Java-WebSocket 以及 Netty-WebSocket 两种实现就会显示出巨大的性能差距。当然 Netty-WebSocket 就是为了解决性能问题而来的。

so,今天我就来展示一下两个 WebSocket 实现在使用中具体的差异,本文集中在资源占用上,特别是线程占用。

理论差异

Java-WebSocket

据可靠资料显示,两者的差异主要以在管理 WebSocket 连接时使用的线程数不同,以下是使用org.java_websocket.client.WebSocketClient创建WebSocket客户端时,它会创建以下几个线程:

  • 「ConnectThread(连接线程)」:当你调用WebSocketClient.connect()方法时,WebSocket客户端会创建一个单独的线程来处理连接建立的过程。这个线程负责建立实际的WebSocket连接。
  • 「WriteThread(写线程)」:WebSocket客户端还会创建一个单独的线程,用于发送WebSocket消息。当你调用WebSocket.send()方法发送消息时,消息将被发送到这个线程,然后由该线程负责将消息写入到底层的WebSocket连接中。
  • 「ReadThread(读线程)」:WebSocket客户端会创建一个用于接收WebSocket消息的线程。这个线程会持续监听来自WebSocket服务器的消息,并在接收到消息时触发相应的事件处理器。

这些线程的存在使得WebSocket客户端能够在后台处理连接、发送和接收消息,而不会阻塞主线程。这有助于确保应用程序在与WebSocket服务器进行通信时能够保持响应性。

据资料显示不同版本的实现线程是不一样的,这里我没有找到具体的版本差异,也没有进行测试。

Netty-WebSocket

Netty其实并不存在上面这个问题,因为WebSocket连接和线程数并没有强的绑定关系。Netty只有一个处理事件的 io.netty.channel.EventLoopGroup 需要使用线程池设计,其他均没有设置线程和创建线程的设置。

被测服务

这里我用Go写了一个 WebSocket 的服务端,一来省事儿,二来性能高足以应付接下来的测试。服务端代码如下:

代码语言:javascript
复制
// CreateServer  
// @Description: 重建一个WebSocket服务  
// @param port 端口  
// @param path 路径  
func CreateServer(port int, path string) {  
  
   var upgrader = websocket.Upgrader{  
      ReadBufferSize:   1024,  
      WriteBufferSize:  1024,  
      HandshakeTimeout: 5 * time.Second,  
   }  
  
   http.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {  
      conn, _ := upgrader.Upgrade(w, r, nil)  
      conn.WriteMessage(websocket.TextMessage, []byte("msg"))  
  
      for {  
         msgType, msg, err := conn.ReadMessage()  
         if err != nil {  
            log.Println(err)  
            return  
         }  
         fmt.Printf("%s receive: %s\n", conn.RemoteAddr(), string(msg))  
  
         if err = conn.WriteMessage(msgType, msg); err != nil {  
            log.Println("ffahv")  
            return  
         }  
      }  
   })  
  
   http.ListenAndServe(":"+strconv.Itoa(port), nil)  
}

单链接对比

空Java进程

首先测试一下空的Java进行消耗的线程数详情,测试客户端如下:

代码语言:javascript
复制
import com.funtester.frame.SourceCode  
  
class Empty extends SourceCode{  
  
    static void main(String[] args) {  
        waitForKey("按任意键退出")  
    }  
      
  
}

运行时,进行监控:

空Java进行

Java-WebSocket

只创建1个WebSocket客户端,测试代码如下:

代码语言:javascript
复制
package com.funtest.websocket

import com.funtester.frame.SourceCode
import com.funtester.socket.WebSocketFunClient

class WebSocket extends SourceCode {

    static String url = "ws://localhost:12345/test"


    static void main(String[] args) {
        def instance = WebSocketFunClient.getInstance(url)
        instance.connect()
        instance.send("Hello FunTester")
        waitForKey("按任意键退出")
    }
}

运行线程监控:

WebSocket单线程

Netty-WebSocket

逻辑同上,代码如下:

代码语言:javascript
复制
package com.funtest.websocket

import com.funtester.frame.SourceCode
import com.funtester.socket.netty.WebSocketConnector
import groovy.util.logging.Log4j2

@Log4j2
class NettySocket extends SourceCode {

    static void main(String[] args) {
        String serverIp = "ws://127.0.0.1";
        int serverPort = 12345;
        def h = {String x ->
            log.info("收到消息:{}", x)
        }
        WebSocketConnector client = new WebSocketConnector(serverIp, serverPort, "/test",h)
        client.connect()
        client.getHandshakeFuture().get()
        client.sendText("Hello FunTester").get()
        waitForKey("按任意键退出")
    }
}

运行时线程监控:

Netty-WebSocket

结论

Java-WebSocket额外创建了3个线程,而Netty-WebSocket额外创建了1个线程。这里我采取了默认的 io.netty.channel.EventLoopGroup 创建策略。

1000连接

Netty-WebSocket

测试代码如下:

代码语言:javascript
复制
package com.funtest.websocket  
  
import com.funtester.frame.SourceCode  
import com.funtester.socket.netty.WebSocketConnector  
import groovy.util.logging.Log4j2  
  
@Log4j2  
class NettySocket extends SourceCode {  
  
    static void main(String[] args) {  
        String serverIp = "ws://127.0.0.1";  
        int serverPort = 12345;  
        def h = {String x ->  
            log.info("收到消息:{}", x)  
        }  
        1000.times {  
            WebSocketConnector client = new WebSocketConnector(serverIp, serverPort, "/test", h)  
            client.connect()  
            client.getHandshakeFuture().get()  
            client.sendText("Hello FunTester").get()  
        }  
        waitForKey("按任意键退出")  
    }  
}

运行时线程监控:

Netty1000连接

Java-WebSocket

由于创建实在太慢了,我测试了100个连接,测试代码如下:

代码语言:javascript
复制
package com.funtest.websocket  
  
import com.funtester.frame.SourceCode  
import com.funtester.socket.WebSocketFunClient  
  
class WebSocket extends SourceCode {  
  
    static String url = "ws://localhost:12345/test"  
  
  
    static void main(String[] args) {  
        100.times {  
            fun {  
                def instance = WebSocketFunClient.getInstance(url)  
                instance.connect()  
                instance.send("Hello FunTester")  
            }  
        }        waitForKey("按任意键退出")  
    }  
}

运行时线程监控:

WebSocket1000连接

Netty极限

如果我们只是单纯测试连接数量的话,并没有必要创建很多处理WebSocket事件的线程,我们可以直接写死成1个线程。下面是测试结果:

Netty极限1000连接

结论

Netty稳如狗!

代码更新

在本次的实践中,我对Netty-WebSocket的实现又做了一批更新,主要增加WebSocket接口路径和消息处理闭包功能,并且偷偷修复了BUG。

WebSocketConnector

代码语言:javascript
复制
package com.funtester.socket.netty  
  
import com.funtester.frame.execute.ThreadPoolUtil  
import groovy.util.logging.Log4j2  
import io.netty.bootstrap.Bootstrap  
import io.netty.channel.*  
import io.netty.channel.group.ChannelGroup  
import io.netty.channel.group.DefaultChannelGroup  
import io.netty.channel.nio.NioEventLoopGroup  
import io.netty.channel.socket.SocketChannel  
import io.netty.channel.socket.nio.NioSocketChannel  
import io.netty.handler.codec.http.DefaultHttpHeaders  
import io.netty.handler.codec.http.HttpClientCodec  
import io.netty.handler.codec.http.HttpObjectAggregator  
import io.netty.handler.codec.http.websocketx.*  
import io.netty.handler.stream.ChunkedWriteHandler  
import io.netty.util.concurrent.GlobalEventExecutor  
  
@Log4j2  
class WebSocketConnector {  
  
    static Bootstrap bootstrap = new Bootstrap()  
  
    /**  
     * 处理事件的线程池  
     */  
    static EventLoopGroup group = new NioEventLoopGroup(ThreadPoolUtil.getFactory("N"))  
  
    static {  
        bootstrap.group(group).channel(NioSocketChannel.class)  
    }  
  
    /**  
     * 用于记录和管理所有客户端的channel  
     */    static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)  
  
    WebSocketClientHandshaker handShaker  
  
    ChannelPromise handshakeFuture  
  
    String host  
  
    int port  
  
    String path  
  
    /**  
     * 网络通道  
     */  
    Channel channel  
  
    WebSocketIoHandler handler  
  
    /**  
     * WebSocket协议类型的模拟客户端连接器构造方法  
     *  
     * @param serverIp  
     * @param serverSocketPort  
     * @param group  
     */    WebSocketConnector(String host, int port, String path, Closure closure = null) {  
        this.host = host  
        this.port = port  
        this.path = path  
        String URL = this.host + ":" + this.port + path  
        URI uri = new URI(URL)  
        handler = new WebSocketIoHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()))  
        if (closure != null) handler.closure = closure  
        bootstrap.option(ChannelOption.TCP_NODELAY, true)  
                .option(ChannelOption.SO_KEEPALIVE, true)  
                .handler(new ChannelInitializer<SocketChannel>() {  
  
                    @Override  
                    protected void initChannel(SocketChannel ch) throws Exception {  
                        ChannelPipeline pipeline = ch.pipeline()  
                        pipeline.addLast(new HttpClientCodec())  
                        pipeline.addLast(new ChunkedWriteHandler())  
                        pipeline.addLast(new HttpObjectAggregator(1024 * 1024))  
                        pipeline.addLast(handler)  
                    }  
                })  
    }  
  
  
    /**  
     * 连接  
     */  
    void connect() {  
        try {  
            try {  
                ChannelFuture future = bootstrap.connect(this.host - "ws://" - "wss://", this.port).sync()  
                this.channel = future.channel()  
                clients.add(channel)  
            } catch (e) {  
                log.error("创建channel失败", e)  
            }  
        } catch (Exception e) {  
            log.error("连接服务失败", e)  
        } finally {  
            this.handshakeFuture = handler.handshakeFuture()  
        }  
    }  
  
    /**  
     * 发送文本消息  
     */  
    ChannelFuture sendText(String msg) {  
        channel.writeAndFlush(new TextWebSocketFrame(msg))  
    }  
  
    /**  
     * 发送ping消息  
     */  
    ChannelFuture ping() {  
        channel.writeAndFlush(new PingWebSocketFrame())  
    }  
  
    /**  
     * 关闭  
     */  
    void close() {  
        group.shutdownGracefully()  
    }  
  
}

WebSocketIoHandler

代码语言:javascript
复制
package com.funtester.socket.netty  
  
import groovy.util.logging.Log4j2  
import io.netty.channel.*  
import io.netty.channel.group.ChannelGroup  
import io.netty.channel.group.DefaultChannelGroup  
import io.netty.handler.codec.http.FullHttpResponse  
import io.netty.handler.codec.http.websocketx.*  
import io.netty.handler.timeout.IdleState  
import io.netty.handler.timeout.IdleStateEvent  
import io.netty.util.concurrent.GlobalEventExecutor  
  
/**  
 * WebSocket协议类型的模拟客户端IO处理器类  
 */  
@Log4j2  
class WebSocketIoHandler extends SimpleChannelInboundHandler<Object> {  
  
    /**  
     * 用于记录和管理所有客户端的channel  
     */    private ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)  
  
    private final WebSocketClientHandshaker handShaker  
  
    Closure closure  
  
    private ChannelPromise handshakeFuture  
  
    WebSocketIoHandler(WebSocketClientHandshaker handShaker) {  
        this.handShaker = handShaker  
    }  
  
    ChannelFuture handshakeFuture() {  
        return handshakeFuture  
    }  
  
    @Override  
    void handlerAdded(ChannelHandlerContext ctx) {  
        handshakeFuture = ctx.newPromise()  
    }  
  
    @Override  
    void channelActive(ChannelHandlerContext ctx) {  
        handShaker.handshake(ctx.channel());  
    }  
  
    @Override  
    void channelInactive(ChannelHandlerContext ctx) {  
        ctx.close()  
        try {  
            super.channelInactive(ctx)  
        } catch (Exception e) {  
            log.error("channelInactive 异常.", e)  
        }  
        log.warn("WebSocket链路与服务器连接已断开.")  
    }  
  
    @Override  
    void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {  
        Channel ch = ctx.channel()  
        if (!handShaker.isHandshakeComplete()) {  
            try {  
                handShaker.finishHandshake(ch, (FullHttpResponse) msg)  
                handshakeFuture.setSuccess()  
            } catch (WebSocketHandshakeException e) {  
                log.warn("WebSocket Client failed to connect", e)  
                handshakeFuture.setFailure(e)  
            }  
            return  
        }  
  
        WebSocketFrame frame = (WebSocketFrame) msg  
        if (frame instanceof TextWebSocketFrame) {  
            if (closure != null) {  
                TextWebSocketFrame textFrame = (TextWebSocketFrame) frame  
                closure(textFrame.text())  
            }  
        } else if (frame instanceof CloseWebSocketFrame) {  
            log.info("WebSocket Client closing")  
            ch.close()  
        }  
    }  
  
    @Override  
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
        log.error("WebSocket链路由于发生异常,与服务器连接已断开.", cause)  
        if (!handshakeFuture.isDone()) {  
            handshakeFuture.setFailure(cause)  
        }  
        ctx.close()  
        super.exceptionCaught(ctx, cause)  
    }  
  
    @Override  
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
        if (evt instanceof IdleStateEvent) {  
            IdleStateEvent event = (IdleStateEvent) evt  
            // 如果写通道处于空闲状态,就发送心跳命令  
            if (IdleState.WRITER_IDLE == event.state() || IdleState.READER_IDLE == event.state()) {  
                // 发送心跳数据  
                def channel = ctx.channel()  
                channel.writeAndFlush(new TextWebSocketFrame("dsf"))  
            }  
        } else {  
            super.userEventTriggered(ctx, evt)  
        }  
    }  
}

FunTester原创专题推荐~

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-10-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 FunTester 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 理论差异
    • Java-WebSocket
      • Netty-WebSocket
      • 被测服务
      • 单链接对比
        • 空Java进程
          • Java-WebSocket
            • Netty-WebSocket
              • 结论
              • 1000连接
                • Netty-WebSocket
                  • Java-WebSocket
                    • Netty极限
                      • 结论
                      • 代码更新
                        • WebSocketConnector
                          • WebSocketIoHandler
                          相关产品与服务
                          云服务器
                          云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档