前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >“超越极限 - 如何使用 Netty 高效处理大型数据?“ - 掌握 Netty 技巧,轻松应对海量数据处理!

“超越极限 - 如何使用 Netty 高效处理大型数据?“ - 掌握 Netty 技巧,轻松应对海量数据处理!

作者头像
JavaEdge
修改于 2025-05-18 12:21:26
修改于 2025-05-18 12:21:26
1.2K0
举报
文章被收录于专栏:JavaEdgeJavaEdge

本文已收录在Github关注我,紧跟本系列专栏文章,咱们下篇再续!

  • 🚀 魔都架构师 | 全网30W技术追随者
  • 🔧 大厂分布式系统/数据中台实战专家
  • 🏆 主导交易系统百万级流量调优 & 车联网平台架构
  • 🧠 AIGC应用开发先行者 | 区块链落地实践者
  • 🌍 以技术驱动创新,我们的征途是改变世界!
  • 👉 实战干货:编程严选网

1 写大型数据

因为网络饱和的可能性,如何在异步框架中高效地写大块的数据是特殊问题。由于写操作是非阻塞的,所以即使没有写出所有的数据,写操作也会在完成时返回并通知 ChannelFuture。当这种情况发生时,如果仍然不停地写入,就有内存耗尽的风险。所以在写大型数据时,需要准备好处理到远程节点的连接是慢速连接的情况,这种情况会导致内存释放的延迟。

考虑下将一个文件内容写出到网络。讨论传输(见 4.2 节)的过程中,提到 NIO 的零拷贝,这消除了将文件内容从文件系统移动到网络栈的复制过程。所有的这一切都发生在 Netty 的核心中,所以应用程序所有需要做的就是使用FileRegion接口实现,其在 Netty 的 API 文档中的定义是: “通过支持零拷贝的文件传输的 Channel 来发送的文件区域。”

如下展示如何通过从FileInputStream创建一个DefaultFileRegion,并将其写入Channel(甚至可利用 io.netty.channel.ChannelProgressivePromise实时获取传输的进度),利用零拷贝传输一个文件的内容。

代码语言:java
AI代码解释
复制
package io.netty.example.cp11;

import io.netty.channel.*;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.File;
import java.io.FileInputStream;

/**
 * 使用 FileRegion 传输文件的内容
 */
public class FileRegionWriteHandler extends ChannelInboundHandlerAdapter {
    private static final Channel CHANNEL_FROM_SOMEWHERE = new NioSocketChannel();
    private static final File FILE_FROM_SOMEWHERE = new File("");

    @Override
    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
        File file = FILE_FROM_SOMEWHERE;
        Channel channel = CHANNEL_FROM_SOMEWHERE;
        //...
        FileInputStream in = new FileInputStream(file);

        // 以该文件的完整长度创建一个新的 DefaultFileRegion
        FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length());
        // 发送该 DefaultFileRegion,并注册一个ChannelFutureListener
        channel.writeAndFlush(region).addListener(
                new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            // 处理失败
                            Throwable cause = future.cause();
                            // Do something
                        }
                    }
                });
    }
}

该示例只适用于文件内容的直接传输,不包括应用程序对数据的任何处理。在需要将数据从文件系统复制到用户内存中时,可以使用 ChunkedWriteHandler,它支持异步写大型数据流,而又不会导致大量内存消耗。

关键是 interface ChunkedInput<B>,类型参数 B 是 readChunk()方法返回的类型。Netty 预置该接口的 4 个实现,如下表ChunkedInput的实现:

名称

描述

ChunkedFile

从文件中逐块获取数据,当你的平台不支持零拷贝或者你需要转换数据时使用

ChunkedNioFile

和 ChunkedFile 类似,只是它使用了 FileChannel

ChunkedStream

从 InputStream 中逐块传输内容

ChunkedNioStream

从 ReadableByteChannel 中逐块传输内容

每个都代表了一个将由 ChunkedWriteHandler 处理的不定长度的数据流。

代码清单 11-12 说明 ChunkedStream 用法,最常用的实现。所示类使用File及SslContext进行实例化。当initChannel()被调用,它将使用所示的 ChannelHandler 链初始化该 Channel。

ChunkedInput的实现:

名称

描述

ChunkedFile

从文件中逐块获取数据,当你的平台不支持零拷贝或者你需要转换数据时使用

ChunkedNioFile

和 ChunkedFile 类似,只是它使用了 FileChannel

ChunkedStream

从 InputStream 中逐块传输内容

ChunkedNioStream

从 ReadableByteChannel 中逐块传输内容

当 Channel 的状态变为活动的时,WriteStreamHandler 将会逐块地把来自文件中的数据作为 ChunkedStream 写入。数据在传输之前将会由 SslHandler 加密。

代码语言:java
AI代码解释
复制
package io.netty.example.cp11;

import io.netty.channel.*;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedStream;
import io.netty.handler.stream.ChunkedWriteHandler;

import java.io.File;
import java.io.FileInputStream;

/**
 * 11.12 使用 ChunkedStream 传输文件内容
 */
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {

    private final File file;

    private final SslContext sslCtx;

    public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
        this.file = file;
        this.sslCtx = sslCtx;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 将 SslHandler 添加到ChannelPipeline
        pipeline.addLast(new SslHandler(sslCtx.newEngine(ch.alloc())));
        // 添加 ChunkedWriteHandler以处理作为ChunkedInput传入的数据
        pipeline.addLast(new ChunkedWriteHandler());
        // 一旦连接建立,WriteStreamHandler就开始写文件数据
        pipeline.addLast(new WriteStreamHandler());
    }

    public final class WriteStreamHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // 当连接建立时,channelActive()将使用ChunkedInput写文件数据
            super.channelActive(ctx);
            ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
        }
    }
}
逐块输入

要使用你自己的 ChunkedInput 实现,请在 ChannelPipeline 中安装一个ChunkedWriteHandler。

本节讨论如何通过使用零拷贝特性来高效地传输文件,以及如何通过使用ChunkedWriteHandler写大型数据而又不必冒OOM风险。下一节研究几种序列化 POJO 方法。

2 序列化数据

JDK提供ObjectOutputStream/ObjectInputStream,用于通过网络对POJO的基本数据类型和图进行序列化/反序列化。可被应用于任何实现java.io.Serializable接口的对象,但性能不是非常高效。那Netty必须为此提供啥呢?

2.1 JDK序列化

若应用须要和用了ObjectOutputStream、ObjectInputStream的远程节点交互,且兼容性也最关心,则JDK序列化是正确选择,下表列出Netty提供的用于和JDK进行互操作的序列化类:

名称

描述

CompatibleObjectDecoder

和使用 JDK 序列化的非基于 Netty 的远程节点进行互操作的解码器

CompatibleObjectEncoder

和使用 JDK 序列化的非基于 Netty 的远程节点进行互操作的编码器

ObjectDecoder

构建于 JDK 序列化之上的使用自定义的序列化来解码的解码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取

ObjectEncoder

构建于 JDK 序列化之上的使用自定义的序列化来编码的编码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取

CompatibleObjectDecoder类已在Netty 3.1废弃,并不存在于Netty 4.x:https://issues.jboss.org/browse/NETTY-136

若能自由使用外部依赖,则JBoss Marshalling是理想选择:比JDK序列化最多快3倍,更紧凑。在JBoss Marshalling官网对其定义:JBoss Marshalling 是一种序列化 API,它修复 JDK 序列化 API 所发现的许多问题,同时保留与 java.io.Serializable 及其相关类兼容性,并添加几个新的可调优参数及额外特性,所有这些都能通过工厂配置(如外部序列化器、类/实例查找表、类解析以及对象替换等)实现可插拔。

2.2 使用 JBoss Marshalling 进行序列化

Netty 通过下表所示的两组解码器/编码器对为 Boss Marshalling 提供支持:

  • 第一组兼容只使用 JDK 序列化的远程节点
  • 第二组提供最大性能,适用于和使用 JBoss Marshalling 的远程节点一起使用

JBoss Marshalling编解码器:

名称

描述

CompatibleMarshallingDecoder CompatibleMarshallingEncoder

与只使用JDK序列化的远程节点兼容

MarshallingDecoder MarshallingEncoder

适用于使用JBoss Marshalling的节点。这些类必须一起使用

使用 MarshallingDecoder/MarshallingEncoder

几乎仅配置ChannelPipeline:

代码语言:java
AI代码解释
复制
package io.netty.example.cp11;

import io.netty.channel.*;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

import java.io.Serializable;

/**
 * 使用 JBoss Marshalling
 */
public class MarshallingInitializer extends ChannelInitializer<Channel> {
    private final MarshallerProvider marshallerProvider;
    private final UnmarshallerProvider unmarshallerProvider;

    public MarshallingInitializer(UnmarshallerProvider unmarshallerProvider, MarshallerProvider marshallerProvider) {
        this.marshallerProvider = marshallerProvider;
        this.unmarshallerProvider = unmarshallerProvider;
    }

    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        // 添加 MarshallingDecoder 以 将 ByteBuf 转换为 POJO
        pipeline.addLast(new MarshallingDecoder(unmarshallerProvider));
        // 添加 MarshallingEncoder 以将POJO 转换为 ByteBuf
        pipeline.addLast(new MarshallingEncoder(marshallerProvider));
        pipeline.addLast(new ObjectHandler());
    }

    // 添加 ObjectHandler,以处理普通的实现了Serializable 接口的 POJO
    public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {

        @Override
        public void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) throws Exception {
            // Do something
        }
    }
}

2.3 通过 Protocol Buffers 序列化

Netty序列化的最后一个解决方案是利用Protocol Buffers(https://protobuf.dev/)的编解码器,由Google开发、现已开源的数据交换格式。可在https://github.com/google/protobuf找到源代码。Protocol Buffers 以紧凑高效方式对结构化的数据进行编解码。它具有许多的编程语言绑定,使得它很适合跨语言项目。表 11-10 展示Netty为支持 protobuf 所提供ChannelHandler 实现。

Protobuf编解码器:

名称

描述

ProtobufDecoder

使用 protobuf 解码消息

ProtobufEncoder

使用 protobuf 编码消息

ProtobufVarint32FrameDecoder

根据消息中的 Google Protocol Buffers 的“Base 128 Varints”整数长度字段值动态地分割所接收到的 ByteBuf

ProtobufVarint32LengthFieldPrepender

向 ByteBuf 前追加一个 Google Protocol Buffers 的“Base 128 Varints”整型的长度字段值

使用 protobuf 只不过是将正确的 ChannelHandler 添加到 ChannelPipeline 中:

代码语言:java
AI代码解释
复制
package io.netty.example.cp11;

import com.google.protobuf.MessageLite;
import io.netty.channel.*;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;

public class ProtoBufInitializer extends ChannelInitializer<Channel> {
    private final MessageLite lite;

    public ProtoBufInitializer(MessageLite lite) {
        this.lite = lite;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 添加 ProtobufVarint32FrameDecoder 以分隔帧
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        // 还需要在当前的 ProtobufEncoder 之前添加一个相应的 ProtobufVarint32LengthFieldPrepender 以编码进帧长度信息
        // 添加 ProtobufEncoder以处理消息的编码
        pipeline.addLast(new ProtobufEncoder());
        // 添加 ProtobufDecoder以解码消息
        pipeline.addLast(new ProtobufDecoder(lite));
        // 加 ObjectHandler 以处理解码消息
        pipeline.addLast(new ObjectHandler());
    }

    public static final class ObjectHandler extends SimpleChannelInboundHandler<Object> {

        @Override
        public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            // Do something with the object
        }
    }
}

这节探讨由 Netty 专门的解码器和编码器所支持的不同的序列化选项:标准JDK序列化、JBoss Marshalling 及 Google 的 Protocol Buffers。

3 总结

Netty 提供的编解码器以及各种 ChannelHandler 可以被组合和扩展,以实现非常广泛的处理方案。此外,它们也是被论证的、健壮的组件,已经被许多的大型系统所使用。

我们只涵盖最常见示例;Netty 的 API 文档提供了更加全面的覆盖。

后文学习另一种先进协议——WebSocket,被开发用以改进 Web 应用程序的性能以及响应性。Netty 提供你将会需要的工具,以便你快速、轻松地利用它强大的功能。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023/05/21 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Netty之Protobuf​编解码框架
Protobuf是一个灵活、高效、结构化的数据序列化框架,相比于传统的XML序列化工具,它更小、更快、更简单。支持结构化数据一次可以到处使用,包括跨语言,通过代码生成工具可以自动生成不同语言版本的源代码,可以在使用不同版本的数据结构进程间进行数据传递,实现数据结构的前向兼容。
Liusy
2020/08/31
7230
Netty 系列七(那些开箱即用的 ChannelHandler).
    Netty 为许多通用协议提供了编解码器和处理器,几乎可以开箱即用, 这减少了你在那些相当繁琐的事务上本来会花费的时间与精力。另外,这篇文章中,就不涉及 Netty 对 WebSocket协议 的支持了,因为涉及的篇幅有点大,会在下一篇文章做一个具体的介绍。
JMCui
2018/09/27
1.9K0
Netty 系列七(那些开箱即用的 ChannelHandler).
netty系列之:netty中的懒人编码解码器
netty之所以强大,是因为它内置了很多非常有用的编码解码器,通过使用这些编码解码器可以很方便的搭建出非常强大的应用程序,今天给大家讲讲netty中最基本的内置编码解码器。
程序那些事
2021/08/20
9110
netty系列之:在netty中使用protobuf协议
netty中有很多适配不同协议的编码工具,对于流行的google出品的protobuf也不例外。netty为其提供了ProtobufDecoder和ProtobufEncoder两个工具还有对应的frame detection,接下来我们会通过一个例子来详细讲解如何在netty中使用protobuf。
程序那些事
2021/08/25
7050
基于netty实现rpc远程调用
Netty中的HTTP处理要Netty内置的HTTP的编解码器来完成解析。现在我们来看自定义协议如何设定。在Netty中完成一个自定义协议其实非常简单,只需要定义一个普通的Java类即可。我们现在手写RPC主要是为了完成对Java代码的远程调用,类似于RMI(Remote Method Invocation,远程方法调用),大家应该都很熟悉了吧。在远程调用Java代码时,哪些内容是必须由网络来传输的呢?譬如,服务名称?需要调用该服务的哪个方法?方法的实参是什么?这些信息都需要通过客户端传送到服务端。
周杰伦本人
2022/10/25
6130
基于netty实现rpc远程调用
netty案例,netty4.1中级拓展篇十一《Netty基于ChunkedStream数据流切块传输》
在Netty这种异步NIO框架的结构下,服务端与客户端通信过程中,高效、频繁、大量的写入大块数据时,因网络传输饱和的可能性就会造成数据处理拥堵、GC频繁、用户掉线的可能性。那么由于写操作是非阻塞的,所以即使没有写出所有的数据,写操作也会在完成时返回并通知ChannelFuture。当这种情况发生时,如果仍然不停地写入,就有内存耗尽的风险。所以在写大块数据时,需要对大块数据进行切割发送处理。
小傅哥
2020/07/14
1.1K0
netty案例,netty4.1中级拓展篇十一《Netty基于ChunkedStream数据流切块传输》
浅析Netty
Netty是JBoss出品的高效的Java NIO开发框架,关于其使用,可参考我的另一篇文章 netty使用初步。本文将主要分析Netty实现方面的东西,由于精力有限,本人并没有对其源码做了极细致的研 究。如果下面的内容有错误或不严谨的地方,也请指正和谅解。对于Netty使用者来说,Netty提供了几个典型的example,并有详尽的API doc和guide doc,本文的一些内容及图示也来自于Netty的文档,特此致谢。
Java架构
2018/10/10
7150
厉害了,Netty 轻松实现文件上传!
今天我们来完成一个使用netty进行文件传输的任务。在实际项目中,文件传输通常采用FTP或者HTTP附件的方式。事实上通过TCP Socket+File的方式进行文件传输也有一定的应用场景,尽管不是主流,但是掌握这种文件传输方式还是比较重要的,特别是针对两个跨主机的JVM进程之间进行持久化数据的相互交换。
Java技术栈
2021/07/16
1.4K0
厉害了,Netty 轻松实现文件上传!
史诗级最强教科书式“NIO与Netty编程”
java.nio全称java non-blocking IO,是指JDK1.4开始提供的新API。从JDK1.4开始,Java提供了一系列改进的输入/输出的新特性,也被称为NIO(既New IO),新增了许多用于处理输入输出的类,这些类都被放在java.nio包及子包下,并且对原java.io包中的很多类进行改写,新增类满足NIO的功能。 NIO和BIO有着相同的目的和作用,但是它们的实现方式完全不同,BIO以流的方式处理数据,而NIO以块的方式处理数据,块I/O的效率比流I/O高很多。另外,NIO是非阻塞式的,这一点跟BIO也很不相同,使用它可以提供非阻塞式的高伸缩性网络。 NIO主要有三大核心部分 :Channel(通道),Buffer(缓冲区),Selector(选择器)。传统的BIO基于字节流和字符流进行操作,而NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如 :连接打开,数据到达)。因此使用单个线程就可以监听多个数据管道。
海仔
2019/08/26
9700
史诗级最强教科书式“NIO与Netty编程”
nio与netty编程(二)
Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序。 Netty 是一个基于 NIO 的网络编程框架,使用 Netty 可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了 NIO 的开发过程。 作为当前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。
周杰伦本人
2022/10/25
5170
nio与netty编程(二)
Protobuf+Netty
Protobuf编写客户端Netty客户端客户端处理类Protobuf编写服务端netty服务端netty服务端处理类
黑洞代码
2021/01/14
8330
websocket+netty实时视频弹幕交互功能(Java版)
2021年了,还有不支持弹幕的视频网站吗,现在各种弹幕玩法层出不穷,抽奖,ppt都上弹幕玩法了,不整个弹幕都说不过去了,今天笔者就抽空做了一个实时视频弹幕交互功能的实现,不得不说这样的形式为看视频看直播,讲义PPT,抽奖等形式增加了许多乐趣。
肉眼品世界
2021/10/15
6750
websocket+netty实时视频弹幕交互功能(Java版)
Netty一文深入
通过2个位置指针来协助缓冲区的读写,读使用 readerIndex,写使用 writerIndex。
趣学程序-shaofeer
2020/07/17
8090
Netty一文深入
Netty中序列化框架Protobuf的简单实现
  Protobuf是一种平台无关、语言无关、可扩展且轻便高效的序列化数据结构的协议,可以用于网络通信和数据存储。
用户4919348
2019/04/22
8230
Netty中序列化框架Protobuf的简单实现
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
Netty是一个高性能、异步的网络应用程序框架,它提供了对TCP、UDP和文件传输的支持。在Netty中,数据的发送和接收都是以字节流的形式进行的,因此需要将对象转换为字节流(编码)以及将字节流转换回对象(解码)。
小小工匠
2023/12/22
3560
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
netty案例,netty4.1中级拓展篇二《Netty使用Protobuf传输数据》
在netty数据传输过程中可以有很多选择,比如;字符串、json、xml、java对象,但为了保证传输的数据具备;良好的通用性、方便的操作性和传输的高性能,我们可以选择protobuf作为我们的数据传输格式。目前protobuf可以支持;C++、C#、Dart、Go、Java、Python等,也可以在JS里使用。知识点;ProtobufDecoder、ProtobufEncoder、ProtobufVarint32FrameDecoder、ProtobufVarint32LengthFieldPrepender。
小傅哥
2020/07/14
6990
netty案例,netty4.1中级拓展篇二《Netty使用Protobuf传输数据》
17-跨语言调用 Google ProtoBuf
定义其他复杂类型参考:https://blog.csdn.net/lijingjingchn/article/details/89466437
彼岸舞
2022/02/18
6400
17-跨语言调用 Google ProtoBuf
结合RPC框架通信谈 netty如何解决TCP粘包问题
因为自己造一个RPC框架的轮子时,需要解决TCP的粘包问题,特此记录,希望方便他人。这是我写的RPC框架的 GitHub地址 https://github.com/yangzhenkun/krpc。 欢迎star,fork。已经写了多篇文章对这个框架的原理进行说明。对原理有兴趣的欢迎交流。
Java高级架构
2018/08/16
9950
netty系列之:protobuf在UDP协议中的使用
netty中提供的protobuf编码解码器可以让我们直接在netty中传递protobuf对象。同时netty也提供了支持UDP协议的channel叫做NioDatagramChannel。如果直接使用NioDatagramChannel,那么我们可以直接从channel中读写UDP对象:DatagramPacket。
程序那些事
2022/05/23
1.4K0
『互联网架构』软件架构-netty粘包分包编码解码(57)
(1)粘包: 1.服务端 原因收到的数据放在系统接收缓冲区,用户进程从该缓冲区取数据 2.客户端 原因TCP为提高传输效率,要收集到足够多的数据后才发送一包数据
IT架构圈
2019/05/31
9120
相关推荐
Netty之Protobuf​编解码框架
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档