首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Node.js Stream 背压 — 消费端数据积压来不及处理会怎么样?

Stream 在 Node.js 中是一个被广泛应用的模块,流的两端可读流、可写流之间通过管道链接,通常写入磁盘速度是低于读取磁盘速度的,这样管道的两端就会产生压力差,就需要一种平衡的机制,使得平滑顺畅的从一个端流向另一个端...问题来源 “数据是以流的形式从可读流流向可写流的,不会全部读入内存,我想说的是上游流速过快下游来不及消费造成数据积压 即“背压” 问题会怎样” 这个问题来自于「Nodejs技术栈-交流群」一位朋友的疑问...流数据读取->写入示例 先构造一个大文件,我在本地创建了一个 2.2GB 大小的文件,通过大文件能够显著看到处理积压与不处理积压之间的差别。...', err); } })(); write() 源码修改与编译 write(chunk) 方法介绍 可写流对象的 write(chunk) 方法接收一些数据写入流,当内部缓冲区小于创建可写流对象时配置的...因为一旦缓冲区中的数据超过了 highWaterMark 限制,可写流的 write() 方法就会返回 false,处理数据积压的这一机制也会被触发。

1.2K40

Nodejs Stream pipe 的使用与实现原理分析

看下 ondata() 方法里的几个核心实现: dest.write(chunk):接收 chunk 写入数据,如果内部的缓冲小于创建流时配置的 highWaterMark,则返回 true,否则返回...之所以调用 src.pause() 是为了防止读入数据过快来不及写入,什么时候知道来不及写入呢,要看 dest.write(chunk) 什么时候返回 false,是根据创建流时传的 highWaterMark...dest 耗尽时,它将会在可读流对象 source 上减少 awaitDrain 计数器 // 为了确保所有需要缓冲的写入都完成,即 state.awaitDrain === 0 和 src 可读流上的...,执行 dest.end() 方法,表明已没有数据要被写入可写流,进行关闭(关闭可写流的 fd),之后再调用 stream.write() 会导致错误。...第二部分仍以 Nodejs Stream pipe 方法为题,查找它的实现,以及对源码的一个简单分析,其实 pipe 方法核心还是要去监听 data 事件,向可写流写入数据,如果内部缓冲大于创建流时配置的

5.8K41
  • 您找到你想要的搜索结果了吗?
    是的
    没有找到

    NodeJS模块研究 - stream

    可读流和可写流都会在内部缓存器存储数据,Duplex 和 Transform 也在内部维护了缓存器。在开发者基于流开发时,可以通过传递highWaterMark参数,来修改默认缓冲的大小。...在这个过程中,如果可写/可读缓冲中的数据总大小超过了 highWaterMark: 可写流的 write()会返回 false,直到缓冲可以继续写入,触发drain事件 可读流会停止从底层资源读取数据...结合前面对缓冲的讲解,在向可写流写入数据的时候,如果超过可写缓存,应该暂停数据读取,等待缓存中数据被消耗完毕后,再继续流动可读流。...pipe 函数实现了以下几个功能: 不断从来源可读流中获得一个指定长度的数据。 将获取到的数据写入目标可写流。 平衡读取和写入速度,防止读取速度大大超过写入速度时,出现大量滞留数据。...在实现自己的类库的时候,可以借助流来处理大容量数据。nodejs 提供给开发者 API,来定制 4 种类型的流。 实现可写流 继承 Writable 类,需要重写_write()方法。

    93930

    Nodejs 中的 Stream

    nodejs 的基础能力,与其它模块以及互相之间依赖关系其只会更复杂。...Writable - 可写入数据的流,可以通过管道写入、但不能通过管道读取的流 Readable - 可读取数据的流,可以通过管道读取、但不能通过管道写入的流 Duplex - 可读又可写的流,可以通过管道写入和读取的流...,基本上相对于是可读流和可写流的组合 Transform - 在读写过程中可以修改或转换数据的 Duplex 流。...2.独立缓冲区: 可读流和可写流都有自己的独立于 V8 堆内存之外的独立缓冲区。...3.字符编码: 我们通常在进行文件读写时,操作的其实是字节流,所以在设置流参数 options 时需要注意编码格式,格式不同 chunk 的内容和大小就会不同。可读流与可写流默认的编码格式不同。

    2.3K10

    NodeJS的Stream

    这里仅做自己在尝试stream中遇到的问题和需要记录的概念与知识. Stream Stream是用来干什么的 Stream是NodeJS提供的一个基于"流"这么一个概念的....概念 分类与使用 Stream在NodeJS中存在这么几个基础分类, 为了方便理解, 我还是打算用水和水池来作比喻: Writable: 可写的流....但是需要注意的是pipe方法默认会把可写流close掉, 因此实际上pipe方法在调用时并不会立即执行而是会被添加到EventLoop中最后执行....NodeJS在10.x版本中提出了一个新东西: pipeline. 虽然它并不能解决我们刚才提到的同时包含了Writable.push和Readable.pipe的异步问题....stdin与stdout 顺带一提, NodeJS中的标准输入和标准输出也都是Stream, 前者是可读流, 后者是可写流.

    66230

    Node.js--Stream 1. 概述2. Readable Stream(可读流)3. Writable Stream(可写流)4. 流模式(objectMode )5. 缓冲(highWa

    Node.js--Stream 1. 概述   流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。 stream 模块提供了基础的 API 。...Writable Stream(可写流) 创建可写流,需要继承Writable,并实现_write()方法。 ① 上游通过调用writable.write(data)将数据写入可写流中。...通过传入可选的 chunk 和 encoding 参数,可以在关闭流之前再写入一段数据。如果传入了可选的 callback 函数,它将作为 'finish' 事件的回调函数。 4. ...缓冲(highWaterMark) Writable 和 Readable 流都会将数据存储到内部的缓冲器(buffer)中。这些缓冲器可以 通过相应的 writable....Duplex 和 Transform(可读写流)、pipe(管道) Duplex   创建可读可写流。 Duplex实际上就是继承了Readable和Writable的一类流。

    1.5K20

    大厂面试题分享:如何让6000万数据包和300万数据包在仅50M内存环境中求交集

    梳理了上面的数据包结构之后,我们就得看看 50M 内存是什么情况了,由于面试在线上进行,只能短时间在本地测试下上面这个数据量在本地会占有有多大空间,那由于限于是场前端面试,所以笔者选用了 NodeJS...主逻辑文件 在不急不慢的分类好目录结构之后,总得再弄点代码给面试官瞧瞧吧o(╥﹏╥)o,不能让人家空等啊 当然既然是面试用 NodeJS 第三方模块解决也不够好,当时是先屡一下用什么原生模块实现比较好,...选项的块读取流,highWaterMark 的默认值为: 64 * 1024(即64KB),我们可以根据需要进行调整,当内部的可读缓冲的总大小达到 highWaterMark 设置的阈值时,流会暂时停止从底层资源读取数据...这里自我感觉有些丢分项,是当时忘记了 fs.createReadStream 里面一些配置项,在现场临时翻阅 NodeJS 的官方 API 文档,这里非常感谢当时面试官的理解(^▽^) 下面,我们就要写最关键的代码了...3百万,所以可以把全部数据放入内存,我们在data-3M.js写入以下代码,这里用Promise封装,方便在外部配合async和await使用: const fs = require('fs'); const

    90230

    Node中的流

    /big.file'); src.pipe(res); });server.listen(8000); 其中pipe方法把可读流的输出(数据源)作为可写流的输入(目标),直接把读文件的输出流作为输入连接到...从中可以消耗数据,如fs.createReadStream Writable 可写流是对可写入数据的目标的抽象,如fs.createWriteStream Duplex(双工) 双工流既可读又可写,如...TCP socket Transform(转换) 转换流本质上是双工流,用于在写入和读取数据时对其进行修改或转换,如zlib.createGzip用gzip压缩数据 转换流看一看做一个输入可写流,输出可读流的函数...P.S.有一种转换流叫(Pass)Through Stream(通过流),类似于FP中的identity = x => x 三.管道 src.pipe(res)要求源必须可读,目标必须可写,所以,如果是对双工流进行管道传输...等价于 a.pipe(b) b.pipe(c) c.pipe(d) # Linux下,等价于 $ a | b | c | d 四.流与事件 事件驱动是Node在设计上的一个重要特点,很多Node原生对象都是基于事件机制

    2.3K10

    探索异步迭代器在 Node.js 中的使用

    上述示例中 chunk 每次接收的值是根据创建可读流时 highWaterMark 这个属性决定的,为了能清晰的看到效果,在创建 readable 对象时我们指定了 highWaterMark 属性为...(chunk); // 0 1 2 3 }); 传送异步迭代器到可写流 使用 pipeline 可以将一系列的流和生成器函数通过管道一起传送,并在管道完成时获取通知。...传送 cursor 到可写流 MongoDB 游标对象本身也是一个可迭代对象(Iterable),结合流模块的 Readable.from() 则可转化为可读流对象,是可以通过流的方式进行写入文件。...但是要注意 MongoDB 中的游标每次返回的是单条文档记录,是一个 Object 类型的,如果直接写入,可写流是会报参数类型错误的,因为可写流默认是一个非对象模式(仅接受 String、Buffer、...Unit8Array),所以才会看到在 pipeline 传输的中间又使用了生成器函数,将每次接收的数据块处理为可写流 Buffer 类型。

    7.5K20

    Node.js 中的这几个场景都可以使用异步迭代器

    上述示例中 chunk 每次接收的值是根据创建可读流时 highWaterMark 这个属性决定的,为了能清晰的看到效果,在创建 readable 对象时我们指定了 highWaterMark 属性为...(chunk); // 0 1 2 3 }); 传送异步迭代器到可写流 使用 pipeline 可以将一系列的流和生成器函数通过管道一起传送,并在管道完成时获取通知。...传送 cursor 到可写流 MongoDB 游标对象本身也是一个可迭代对象(Iterable),结合流模块的 Readable.from() 则可转化为可读流对象,是可以通过流的方式进行写入文件。...但是要注意 MongoDB 中的游标每次返回的是单条文档记录,是一个 Object 类型的,如果直接写入,可写流是会报参数类型错误的,因为可写流默认是一个非对象模式(仅接受 String、Buffer、...Unit8Array),所以才会看到在 pipeline 传输的中间又使用了生成器函数,将每次接收的数据块处理为可写流 Buffer 类型。

    3.8K40

    nodejs可读流源码分析

    可读流是对数据消费的抽象,nodejs中可读流有两种工作模式:流式和暂停式,流式就是有数据的时候就会触发回调,并且把数据传给回调,暂停式就是需要用户自己手动执行读取的操作。...我们先看一下ReadableState,这个对象是表示可读流的一些状态和属性的。...如果我们是以继承的方式使用Readable,那必须实现_read函数。nodejs只是抽象了流的逻辑,具体的操作(比如可读流就是读取数据)是由用户自己实现的,因为读取操作是业务相关的。...下面我们分析一下可读流的操作。 1 可读流从底层资源获取数据 对用户来说,可读流是用户获取数据的地方,但是对可读流来说,他提供数据给用户的前提是他自己得有数据,所以可读流首先需要生产数据。..._read函数的逻辑大概是 const data = getSomeData(); readableStream.push(data); 通过push函数,往可读流里写入数据,然后就可以为用户提供数据,

    87530

    Node.js 流源码解读之可读流

    1. 基本概念 1.1. 流的历史演变 流不是 Node.js 特有的概念。它们是几十年前在 Unix 操作系统中引入的,程序可以通过管道运算符(|)对流进行相互交互。...,它们都是 Node.js 中 EventEmitter 的实例: 可读流(Readable Stream) 可写流(Writable Stream) 可读可写全双工流(Duplex Stream) 转换流...有数据流出时,就会触发可写流的写入事件,从而做到数据传递,实现像管道一样的操作。并且会自动将处于暂停模式的可读流变为流动模式。...总结 Node.js 为了解决内存问题和时间问题,实现了自己的流,从而可以将数据一小块一小块的读到内存里给消费者消费 流并不是 Node.js 特有的概念,它们是几十年前在 Unix 操作系统中引入的...流一共有四种类型:可读流、可写流、可读可写流、转换流,它们都继承了 EventEmiiter 的实例方法和静态方法,都是 EE 的实例 流的底层容器是基于 BufferList 的,这是一种自定义的链表实现

    2.2K10

    巧妙复制一个流

    实现 复制流并不像复制一个对象一样简单与直接,流的使用是一次性的,一旦一个可读流被消费(写入一个Writeable对象中),那么这个可读流就是不可再生的,无法再使用。...,依次存放需要写入的数据。...目的流使用的是cloneReq对象,该对象在实例化的过程中 transform函数直接通过调用next函数将接受到的数据传入到Transform对象的可读流缓存中,同时触发‘readable和data事件...使用Transform流实现clone 可读流的弊端: 上例中,Transfrom流的实例化传入了一个参数 highWaterMark,该参数在Transfrom中的作用 在 上文 深入node之Transform...因此,当要clone的源内容大于highWaterMark时,就无法正常使用这种方式进行clone了,因为由于源内容>highWaterMark,在没有后续消费Transfrom流的情况下就不执行transfrom

    42630

    Node.js Stream - 进阶篇

    该方法在拿到底层数据后,调用push方法将数据交由流处理(立即输出或存入缓存)。 可以结合readable事件和read方法来将数据全部消耗,这是暂停模式的消耗方法。...数据的消耗模式 可以在两种模式下消耗可读流中的数据:暂停模式(paused mode)和流动模式(flowing mode)。 流动模式下,数据会源源不断地生产出来,形成“流动”现象。...首次监听readable事件时,还会触发一次read(0)的调用,从而引起_read和push方法的调用,从而启动循环。 总之,在暂停模式下需要使用readable事件和read方法来消耗流。...Transform中有两个缓存:可写端的缓存和可读端的缓存。 调用transform.write()时,如果可读端缓存未满,数据会经过变换后加入到可读端的缓存中。...所以,上面的transform中实际存储了4个数据,ab在可读端(经过了_transform的处理),cd在可写端(还未经过_transform处理)。

    1.6K62

    Nodejs 中基于 Stream 的多文件合并实现

    本文先从一个 Stream 的基本示例开始,有个初步认识,中间会讲在 Stream 中什么时候会出现内存泄漏,及如何避免最后基于 Nodejs 中的 Stream 实现一个多文件合并为一个文件的例子。...writeable 就是一个可写流对象 options: end:读取结束时终止写入流,默认值是 true readable.pipe(destination[, options]) 默认情况下我们是不需要手动调用写入流的...现在我们改一下,设置 end 为 false 写入的目标流将会一直处于打开状态, 此时就需要监听可读流的 end 事件,结束之后手动调用可写流的 end 事件。...,则写入的目标流将不会关闭,例如:process.stderr 和 process.stdout 可写流在 Nodejs 进程退出前将永远不会关闭,所以需要监听错误事件,手动关闭可写流,防止内存泄漏。...Linux 下一切皆文件,为了测试,在创建可读流时,你可以不创建 test1.txt 文件,让可读流自动触发 error 事件并且将 writeable 的 close 方法注释掉,通过 linux 命令

    2.6K30

    深入node之Transform

    Transform流特性 在开发中直接接触Transform流的情况不是很多,往往是使用相对成熟的模块或者封装的API来完成流的处理,最为特殊的莫过于through2模块和gulp流操作。...从名称上说,Transform意为处理,类似于生产流水线上的每一道工序,每道工序针对到来的产品作相应的处理;从结构上看,Transform是一个双工流,通俗的解释它既可以作为可读流,也可作为可写流。...Transform流由于包含了Readable和Writeable特性,因此Transform在实际使用中有着多种方式:它既可以只作为消费者消费数据,也可同时作为生产者和消费者完成数据中间处理。...传统意义的流(即Readable和Writeable)的实现者都需要实现对应的内部函数_read()和_write(),对于Readable实例而言,_read函数用于准备从源文件中获取数据并添加到读缓冲中...transform作为消费者,会在其write函数中消费数据,在node中的Stream文中介绍了write函数的实现细节,通过内部调用_write函数实现数据的写入。

    1.4K50

    Node理论笔记:理解Buffer

    在utf8编码下,中文占3个字符,字母和半角符号占用1个字符。 类似Array,length属性可以返回Buffer长度,通过下标可以访问元素。...通过下标可以为Buffer赋值,但仅限数字型,且遵循以下几个原则: 如果小于0,就将该值逐次加256,直到得到一个0到255之间的值 如果大于255,就将该值逐次减256,直到得到一个0到255之间的值...语法:buf.write(str,[offset],[length],[encoding]) 注意默认offset为0,所以重复写入后边会覆盖前边,而不是自动写入空余位置。...四、Buffer与性能 Buffer在文件I/O和网络I/O中运用广泛,特别是网络传输中。在应用中,通常操作的是字符串,但是在网络中传输则都要转化为Buffer,以进行二进制数据传输。...fs模块的createReadStream()方法可以创建一个文件读取流,其工作方式是在内存中准备一段Buffer,然后逐步从磁盘中将字节复制到Buffer中。

    1.4K30
    领券