Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >netty案例,netty4.1源码分析篇五《一行简单的writeAndFlush的都做了哪些事》

netty案例,netty4.1源码分析篇五《一行简单的writeAndFlush的都做了哪些事》

作者头像
小傅哥
发布于 2020-07-14 07:01:05
发布于 2020-07-14 07:01:05
71000
代码可运行
举报
运行总次数:0
代码可运行

作者:付政委

前言介绍

对于使用netty的小伙伴来说,ctx.writeAndFlush()再熟悉不过了,它可以将我们的消息发送出去。那么它都执行了那些行为呢,是怎么将消息发送出去的呢。

源码分析

1、由一行简单发送消息开始

发送消息的代码非常简单,也是我们非常常用的发送消息的方式ctx.writeAndFlush

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
    //通知客户端链消息发送成功
    String str = "客户端收到[微信公众号:bugstack虫洞栈]:" + new Date() + " " + msg + "\r\n";
    ctx.writeAndFlush(str);
}

2、跟进writeAndFlush | ChannelHandlerContext.writeAndFlush

AbstractChannelHandlerContext.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
public ChannelFuture writeAndFlush(Object msg) {
    return writeAndFlush(msg, newPromise());
}

@Override
public ChannelPromise newPromise() {
    return new DefaultChannelPromise(channel(), executor());
}

在这段代码中我们可以看到,writeAndFlush方法里提供了一个默认的newPromise()作为参数传递。{promise:v. 许诺;承诺;答应;保证;使很可能;预示}在Netty中发送消息是一个异步操作,那么可以通过往hannelPromise中注册回调监听listener来得到该操作是否成功。

在发送消息时添加监听

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
ctx.writeAndFlush("hi 微信公众号:bugstack虫洞栈 | 欢迎关注&获取专题源码", ctx.newProgressivePromise().addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        future.isSuccess();
    }
}));

3、继续向下一层跟进代码 | AbstractChannelHandlerContext.invokeWriteAndFlush

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

3.1、首先通过invokeHandler()判断通道处理器已添加到管道

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
yet. If not return {@code false} and if called or could not detect return {@code true}.
If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.

3.2、执行消息处理 invokeWrite0;首先将消息内容放入输出缓冲区中[ChannelOutboundBuffer] invokeFlush0;然后将输出缓冲区中的数据通过socket发送到网络中

4、分析invokeWrite0执行内容 | AbstractChannelHandlerContext.invokeWrite0

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

((ChannelOutboundHandler) handler()).write是一个出站事件[ChannelOutboundHandler],会由ChannelOutboundHandlerAdapter处理;

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * Calls {@link ChannelHandlerContext#write(Object, ChannelPromise)} to forward
 * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
 *
 * Sub-classes may override this method to change behavior.
 */
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ctx.write(msg, promise);
}

接下来会走到ChannelPipeline中,来执行网络数据发送;| DefaultChannelPipeline.write

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    unsafe.write(msg, promise);
}

5、unsafe.write执行分析

unsafe是我们构建NioServerSocketChannel或NioSocketChannel对象时,一并构建一个成员属性,它会完成底层真正的网络操作等。NioServerSocketChannel中持有的unsafe成员变量是NioMessageUnsafe对象,而NioSocketChannel中持有的unsafe成员变量是NioSocketChannelUnsafe对象。这里我们要看的是NioSocketChannel的write流程

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        // If the outboundBuffer is null we know the channel was closed and so
        // need to fail the future right away. If it is not null the handling of th
        // will be done in flush0()
        // See https://github.com/netty/netty/issues/2362
        safeSetFailure(promise, newWriteException(initialCloseCause));
        // release message now to prevent resource-leak
        ReferenceCountUtil.release(msg);
        return;
    }
    int size;
    try {
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }
    outboundBuffer.addMessage(msg, size, promise);
}

https://github.com/netty/netty/issues/2362

  • 获取该NioSocketChannel的ChannelOutboundBuffer成员属性。(确切地来说ChannelOutboundBuffer是NioSocketChannelUnsafe对象中的成员属性,而NioSocketChannelUnsafe才是NioSocketChannel的成员属性。)每一个NioSocketChannel会维护一个它们自己的ChannelOutboundBuffer,用于存储待出站写请求。 判断该outboundBuffer是否为null,如果为null则说明该NioSocketChannel已经关闭了,那么就会标志该异步写操作为失败完成,并释放写消息后返回。

AbstractNioByteChannel.java | filterOutboundMessage过滤待发送的消息:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (buf.isDirect()) {
            return msg;
        }
        return newDirectBuffer(buf);
    }
    if (msg instanceof FileRegion) {
        return msg;
    }
    throw new UnsupportedOperationException(
            "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}

过滤待发送的消息,只有ByteBuf(堆 or 非堆)以及 FileRegion可以进行最终的Socket网络传输,其他类型的数据是不支持的,会抛UnsupportedOperationException异常。并且会把堆ByteBuf转换为一个非堆的ByteBuf返回。也就说,最后会通过socket传输的对象时非堆的ByteBuf和FileRegion。 [size = pipeline.estimatorHandle().size(msg);]估计待发送数据的大小:

DefaultMessageSizeEstimator.java | 通过ByteBuf.readableBytes()判断消息内容大小,估计待发送消息数据的大小,如果是FileRegion的话直接饭0,否则返回ByteBuf中可读取字节数。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private static final class HandleImpl implements Handle {
    private final int unknownSize;
    private HandleImpl(int unknownSize) {
        this.unknownSize = unknownSize;
    }
    @Override
    public int size(Object msg) {
        if (msg instanceof ByteBuf) {
            return ((ByteBuf) msg).readableBytes();
        }
        if (msg instanceof ByteBufHolder) {
            return ((ByteBufHolder) msg).content().readableBytes()
        }
        if (msg instanceof FileRegion) {
            return 0;
        }
        return unknownSize;
    }
}

ChannelOutboundBuffer.java | ChannelOutboundBuffer.addMessage将消息加入outboundBuffer中等待发送

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
 * the message was written.
 */
public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
    }
    tailEntry = entry;
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }
    // increment pending bytes after adding message to the unflushed arrays.
    // See https://github.com/netty/netty/issues/1619
    incrementPendingOutboundBytes(entry.pendingSize, false);
}

6、ChannelOutboundBuffer出栈

一个内部的数据结构,被AbstractChannel用于存储它的待出站写请求。 ChannelOutboundBuffer中有两个属性private Entry unflushedEntry、private Entry flushedEntry。它们都是用Entry对象通过next指针来维护的一个单向链表。以及一个private Entry tailEntry;对象表示始终指向最后一个Entry对象(即,最后加入到该ChannelOutboundBuffer中的写请求的数据消息) unflushedEntry表示还未刷新的ByteBuf的链表头;flushedEntry表示调用flush()操作时将会进行刷新的ByteBuf的链表头。

7、Entry对象

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
static final class Entry {
    private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
        @Override
        protected Entry newObject(Handle<Entry> handle) {
            return new Entry(handle);
        }
    };

    private final Handle<Entry> handle;
    Entry next;
    Object msg;
    ByteBuffer[] bufs;
    ByteBuffer buf;
    ChannelPromise promise;
    long progress;
    long total;
    int pendingSize;
    int count = -1;
    boolean cancelled;

    private Entry(Handle<Entry> handle) {
        this.handle = handle;
    }

    static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
        Entry entry = RECYCLER.get();
        entry.msg = msg;
        entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
        entry.total = total;
        entry.promise = promise;
        return entry;
    }

    int cancel() {
        if (!cancelled) {
            cancelled = true;
            int pSize = pendingSize;

            // release message and replace with an empty buffer
            ReferenceCountUtil.safeRelease(msg);
            msg = Unpooled.EMPTY_BUFFER;

            pendingSize = 0;
            total = 0;
            progress = 0;
            bufs = null;
            buf = null;
            return pSize;
        }
        return 0;
    }

    void recycle() {
        next = null;
        bufs = null;
        buf = null;
        msg = null;
        promise = null;
        progress = 0;
        total = 0;
        pendingSize = 0;
        count = -1;
        cancelled = false;
        handle.recycle(this);
    }

    Entry recycleAndGetNext() {
        Entry next = this.next;
        recycle();
        return next;
    }
}

Entry是ChannelOutboundBunffer的一个内部类,它是对真实的写消息数据以及其相关信息的一个封装。大致封装了如下信息: a) pendingSize:记录有该ByteBuf or ByteBufs 中待发送数据大小 和 对象本身内存大小 的累加和; b) promise:该异步写操作的ChannelPromise(用于在完成真是的网络层write后去标识异步操作的完成以及回调已经注册到该promise上的listeners); c) total:待发送数据包的总大小(该属性与pendingSize的区别在于,如果是待发送的是FileRegion数据对象,则pengdingSize中只有对象内存的大小,即真实的数据大小被记录为0;但total属性则是会记录FileRegion中数据大小,并且total属性是不包含对象内存大小,仅仅是对数据本身大小的记录); e) msg:原始消息对象的引用; f) count:写消息数据个数的记录(如果写消息数据是个数组的话,该值会大于1) 这里说明下,pendingSize属性记录的不单单是写请求数据的大小,记录的是这个写请求对象的大小。这是什么意思了?这里做个简单的介绍: 一个对象占用的内存大小除了实例数据(instance data),还包括对象头(header)以及对齐填充(padding)。所以一个对象所占的内存大小为『对象头 + 实例数据 + 对齐填充』,即

CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Assuming a 64-bit JVM:
//  - 16 bytes object header
//  - 8 reference fields
//  - 2 long fields
//  - 2 int fields
//  - 1 boolean field
//  - padding
static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
        SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);

假设的是64位操作系统下,且没有使用各种压缩选项的情况。对象头的长度占16字节;引用属性占8字节;long类型占8字节;int类型占4字节;boolean类型占1字节。同时,由于HotSpot VM的自动内存管理系统要求对象起始地址必须是8字节的整数倍,也就是说对象的大小必须是8字节的整数倍,如果最终字节数不为8的倍数,则padding会补足至8的倍数。

addMessage方法主要就是将请求写出的数据封装为Entry对象,然后加入到tailEntry和unflushedEntry中。 然后调用『incrementPendingOutboundBytes(entry.pendingSize, false);』对totalPendingSize属性以及unwritable字段做调整。 totalPendingSize字段记录了该ChannelOutboundBuffer中所有带发送Entry对象的占的总内存大小和所有带发送数据的大小。unwritable用来标示当前该Channel要发送的数据是否已经超过了设定 or 默认的WriteBufferWaterMark的high值。如果当前操作导致了待写出的数据(包括Entry对象大小以及真实需要传输数据的大小)超过了设置写缓冲区的高水位,那么将会触发fireChannelWritabilityChanged事件。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-09-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 bugstack虫洞栈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
canvas绘制动画原理及案例讲解(绘制小恐龙动画、时钟等)
这期为大家带来的是canvas的动画绘制案例与讲解。不知道大家上一期canvas绘制基本图形的入门教程看的怎么样了,如果已经遗忘了或者还没看的小伙伴建议先去看一下,上一期是学习这一期的基础:
用户9999906
2022/12/22
4.1K0
canvas绘制动画原理及案例讲解(绘制小恐龙动画、时钟等)
❤️创意网页:HTML5 Canvas技术实现绚丽多彩的烟花特效
烟花特效一直以来都是网页设计中的热门元素之一,它能够为用户带来视觉上的愉悦和惊喜。在这篇技术博客中,我们将使用HTML5 Canvas和JavaScript来实现一个绚丽多彩的烟花特效。我们将逐步解释代码的不同部分,介绍如何利用Canvas API和动画效果来创造这个引人注目的效果。
命运之光
2024/03/20
9540
❤️创意网页:HTML5 Canvas技术实现绚丽多彩的烟花特效
从Chrome小恐龙游戏学习2D游戏制作
在chrome浏览器的断网页面,按空格键或者向上键会出现一个小恐龙跑酷小游戏,这个2D小游戏在设计上精致小巧,在代码上也只有三千多行,思路清晰严谨,很有学习价值
异名
2020/10/29
1.8K0
从Chrome小恐龙游戏学习2D游戏制作
神奇canvas 带你实现魔法摄像头
我们用手机的摄像头自拍,很容易实现 简单的自拍效果,如复古、黑白等等。其实我们使用web端的JavaScript也是可以实现的。接下来就带领小伙伴实现一个魔法摄像头。并且提供了截图下载功能。
万少
2025/02/10
2790
神奇canvas 带你实现魔法摄像头
H5 canvas如何载入视频
Canvas可以载入图片,那么Canvas 也可以载入视频。 Canvas加载视频和图片是一样的,使用drawImage,区别就是给一个定 时器不停的抓取每帧的画面,放入Canvas画布里面。 示例1 : 附上demo: <!DOCTYPE html> <html lang="en"> <head> <meta name="viewport" content="width=640,user-scalable=0,target-densitydpi=device-dpi"
White feathe
2021/12/08
1.1K0
H5 canvas如何载入视频
[Canvas前端游戏开发]——FlappyBird详解
一直想自己做点小东西,直到最近看了本《HTML5游戏开发》,才了解游戏开发中的一点点入门知识。 本篇就针对学习的几个样例,自己动手实践,做了个FlappyBird,源码共享在度盘 ;也可以参考git
用户1154259
2018/01/17
8680
[Canvas前端游戏开发]——FlappyBird详解
HTML5 Canvas的简单使用
参考这里 https://www.runoob.com/w3cnote/html5-canvas-intro.html
不愿意做鱼的小鲸鱼
2022/09/24
1.7K0
HTML5 Canvas的简单使用
Canvas之使用图片 原
canvas有比较强的图片操作能力。可以用于动态的图像合成或者作为图形的背景。浏览器支持任意格式如PNG、GIF、或者JPEG,你甚至可以将同一个页面中的其他canvas元素生成的图片作为图片源(toDataURL("image/png"),canvas.toDataURL('image/jpeg', quality))
tianyawhl
2019/04/04
1.2K0
Canvas之使用图片
                                                                            原
canvas学习笔记(八)—- 基本动画
1.用window.setInterVal()、window.setTimeOut()和window.requestAnimationFrame()来定期执行一个指定函数
Java架构师必看
2021/08/19
7850
HTML5(六)——Canvas 高级操作
angle 旋转弧度,如果想使用角度,可以把角度转成弧度,公式为:deg * Path.PI/180。
呆呆
2021/09/29
1.5K0
Canvas如何实现滤镜效果
用过photoshop或者美颜相机,我们都知道滤镜可以帮助我们把图片修缮的更加完美。
terrence386
2022/07/15
1.5K0
Canvas如何实现滤镜效果
像素鸟html与js源码(4节课勉强做完)
《flappy bird》是一款由来自越南的独立游戏开发者Dong Nguyen所开发的作品,游戏于2013年5月24日上线,并在2014年2月突然暴红。2014年2月,《Flappy Bird》被开发者本人从苹果及谷歌应用商店(Google Play)撤下。2014年8月份正式回归App Store,正式加入Flappy迷们期待已久的多人对战模式。游戏中玩家必须控制一只小鸟,跨越由各种不同长度水管所组成的障碍。
红目香薰
2023/02/10
1.3K0
像素鸟html与js源码(4节课勉强做完)
Canvas初实现拍照小游戏
由于实现的比较简单,且在部分机型上会出现点小问题,此处仅作为js代码的记录,暂不打算写相关教程。
WindCoder
2018/09/20
1.1K0
canvas学习和滤镜实现
最近学习了 HTML5 中的重头戏–canvas。利用 canvas,前端人员可以很轻松地、进行图像处理。其 API 繁多,这次主要学习常用的 API,并且完成以下两个代码:
心谭博客
2020/04/20
7960
写了个小游戏,揭秘碰撞检测
checkCollision就是检测两个物体产生碰撞的方法,如果产生了碰撞就返回true,否则返回false,这是一段非常简单的判断碰撞逻辑,基本可以这个万能公式解决碰撞问题
Maic
2025/02/12
2581
写了个小游戏,揭秘碰撞检测
canvas 像素操作
在 canvas 中可以使用 context.drawImage(image,dx,dy) 方法将图片绘制在 canvas 上。将图片绘制上去后,还可以使用 context.getImageData(sx, sy, sw, sh) 方法获取 canvas 区域隐含的像素数据,该方法返回一个 ImageData 对象,里面包含的是 canvas 像素信息。
多云转晴
2020/01/14
2.1K0
canvas 像素操作
【Go 语言社区】HTML5 canvas验证码识别
canvas 的历史这个 HTML 元素是为了客户端矢量图形而设计的。它自己没有行为,但却把一个绘图 API 展现给客户端 JavaScript 以使脚本能够把想绘制的东西都绘制到一块画布上。canvas 标记由 Apple 在 Safari 1.3 Web 浏览器中引入。对 HTML 的这一根本扩展的原因在于,HTML 在 Safari 中的绘图能力也为 Mac OS X 桌面的 Dashboard 组件所使用,并且 Apple 希望有一种方式在 Dashboard 中支持脚本化的图形。Firefox 1
李海彬
2018/03/19
1.8K0
canvas入门实战--邀请卡生成与下载
写了很多的javascript和css3的文章,是时候写一篇canvas的了。canvas是html5提供的一个新的功能!至于作用,就是一个画布。然后画笔就是javascript。canvas的用途非常的广,特别是html5游戏以及数据可视化这两个方面。现在canvas给我的感觉就和css3一样,可以不用太厉害,但是必须要会基础的用法。但是以后对canvas的需求,肯定会越来越大。所以canvas很值得学习,而且学好canvas,就是很好的一个加分项。对于这篇文章,我也是以canvas初学者的角度写的,会有很多改善的地方。如果大家觉得我有什么可以改善的,或者建议,欢迎指点迷津!代码已上传github,需要的欢迎star(downloadImg)。
守候i
2018/08/22
6390
canvas入门实战--邀请卡生成与下载
Canvas基础-粒子动画Part4
在之前的文章 Canvas基础-粒子动画Part2 和 Canvas基础-粒子动画Part3 中分别讲了用图片和文字做粒子动画,今天我们来把代码简单整理一下,封装成一个类,能同时支持用图片和文字做粒子动画,而且有更好的灵活性。 封装类 HTML结构和上一篇的一样,这里从外部引入一个js文件,我们的类就写这里面。 <body> <div class="input-wrap"> <input id="txt" type="text" name="" valu
Bob.Chen
2018/05/02
1.2K0
Canvas基础-粒子动画Part4
【JS】928- 用 Canvas 编辑你的图片
大家好,我是 canvas ,我能让大家通过 canvas 标签,用JavaScript来绘制图形。除此之外,动画,游戏图形,数据可视化,照片处理和实时视频处理都难不倒我噢~
pingan8787
2021/04/26
4.9K0
【JS】928- 用 Canvas 编辑你的图片
相关推荐
canvas绘制动画原理及案例讲解(绘制小恐龙动画、时钟等)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验