java的stream流,stream node
什么是溪流?怎么理解流量?下面这篇文章就带你深入了解一下Node中的流,希望对你有所帮助!
node.js速度课程简介:进入学习
最近作者在开发中经常用到管道函数。他只知道它是溪流的管道,却不知道它是如何运作的。于是,抱着一探究竟的心理,他干脆开始从流学习,随便把读过的知识和源代码整理成一篇文章,分享给大家。
流是Nodejs中一个非常基本的概念。很多基础模块都是基于stream实现的,起着非常重要的作用。同时,流量也是一个很难理解的概念,这主要是因为缺乏相关的文献。对于NodeJs的初学者来说,往往需要花费大量的时间去理解流程,才能真正掌握这个概念。幸运的是,对于大多数NodeJs用户来说,它只是用来开发Web应用,对flow的认识不足并不影响他们的使用。但是,理解流程可以帮助你更好地理解NodeJs中的其他模块,在某些情况下,使用流程处理数据会有更好的效果。【相关教程推荐:nodejs视频教程】
如何理解流
对于流的用户来说,流可以看作一个数组。我们只需要专注于从中获取(消费)和写作(生产)。
对于流的开发人员(用流模块创建一个新的实例)来说,他们关心的是如何在流中实现一些方法,通常关注两点:谁是目标资源,如何操作目标资源。确定之后,它们需要根据流的不同状态和事件来操作目标资源。
缓存池
NodeJS中的所有流都有缓冲池。缓冲池的目的是提高流的效率。当数据生产和消费都需要时间时,我们可以提前生产数据,并在下次消费之前将其存储在缓冲池中。然而,缓冲池并不总是在使用中。例如,当缓冲池为空时,数据在生产后不会被放入缓冲池,而是直接被消耗。
如果数据生产的速度快于数据消费的速度,多余的数据就会在某个地方等待。如果数据的生产速度小于过程数据的消耗速度,数据就会在某个地方积累到一定的量,然后被消耗掉。(开发者无法控制数据生产和消费的速度,只能尽量在可能的情况下生产或消费数据)
数据等待,积累数据,然后发生的地方。是缓冲池。缓冲池通常位于计算机的RAM(内存)中。
举一个缓冲区的常见例子。我们在看在线视频的时候,如果你的网速很快,缓冲区总是会马上填满,然后发送到系统播放,然后下一个视频会马上缓冲。在观看的过程中,不会出现卡顿。如果网速慢,你会看到加载,这表明缓冲区正在被填满。当数据被发送到系统后,你可以看到这个视频。
NodeJs流的缓存池是一个缓冲区链表。每当您想要将数据添加到缓存池时,您将重新创建一个缓冲区节点,并将其插入到链表的末尾。
EventEmitter
NodeJs是实现EventEmitter的抽象接口,所以我先简单介绍一下EventEmitter。
EventEmitter是一个实现发布和订阅事件功能的类。几种常用的方法(on、once、off、emit)相信大家都很熟悉,就不一一介绍了。
const { EventEmitter }=require(事件)
const event emitter=new event emitter()
eventA事件的绑定处理程序函数
eventEmitter.on(eventA ,()={
console . log( eventA active 1 );
});
eventB事件的绑定处理程序
eventEmitter.on(eventB ,()={
console . log(“eventB active 1”);
});
eventEmitter.once(eventA ,()={
console . log( eventA active 2 );
});
//触发eventA
eventEmitter.emit(eventA )
//eventA活动1
//事件活动2值得注意的是,事件发射器有两个叫做新闻听众和移除监听器的事件,当你向一个事件对象中添加任何事件监听函数后,都会触发新侦听器(事件发射器。发出(新侦听器),当一个处理函数被移除时同理会触发删除侦听器。
还需要注意的是,曾经绑定的处理函数只会执行一次,删除侦听器将在其执行前被触发,这意味着一次绑定的监听函数是先被移除才被触发的。
const { EventEmitter }=require(事件)
常量事件发射器=新事件发射器()
eventEmitter.on(newListener ,(event,listener)={
console.log(newListener ,event,Listener)
})
事件发射器。on( remove listener ),(event,listener)={
console.log(removeListener ),事件,侦听器)
})
//新侦听器删除侦听器[函数(匿名)]
eventEmitter.on(eventA ,()={
控制台。日志(“eventA active 1”);
});
//新侦听器eventA[函数(匿名)]
函数listenerB(){ console。日志( event b active 1 );}
eventEmitter.on(eventB ,listener b);
//newListener事件b[函数(匿名)]
eventEmitter.once(eventA ,()={
控制台。日志(“eventA active 2”);
});
//新侦听器eventA[函数(匿名)]
eventEmitter.emit(eventA )
//事件活动一
//移除侦听器eventA[Function:bound once wrapper]{ listener:[Function(anonymous)]}
//事件活动2
eventEmitter.off(eventB ,listenerB)
//删除侦听器事件b[函数:听众B]不过这对于我们后面的内容来说并不重要。
Stream
流是在节点。射流研究…中处理流数据的抽象接口溪流。并不是一个实际的接口,而是对所有流的一种统称。实际的接口有可读流、可写流、读写流这几个。
接口可读流扩展事件发射器{
可读:布尔型;
阅读(大小?数字):字符串缓冲区
setEncoding(编码:缓冲编码):这个;
暂停():这个;
简历():这个;
isPaused():boolean;
球管扩展可写流(目的地:T,选项?{ end?布尔值未定义;}):T;
取消固定(目的地?WritableStream):这个;
un shift(chunk:string uint 8数组,编码?缓冲区编码):void
wrap(旧流:可读流):this
【符号。async iterator]():asyncinchangeiteratorstring Buffer;
}
接口可写流扩展事件发射器{
可写:布尔型
write(buffer: Uint8Array string,cb?(呃?Error null)=void):boolean;
写(字符串:字符串,编码?BufferEncoding,cb?(呃?Error null)=void):boolean;
end(cb?()=void):这个;
end(data: string Uint8Array,cb?()=void):这个;
结束(字符串:字符串,编码?BufferEncoding,cb?()=void):这个;
}
接口读写流扩展ReadableStream,WritableStream { }可以看出可读流和可写流都是继承事件发射器类的接口(ts中接口是可以继承类的,因为他们只是在进行类型的合并)。
上面这些接口对应的实现类分别是可读、可写和双层公寓
开发中的流有四种:
易读的可读流(实现ReadableStream)可写可写流(实现可写流)双工可读可写流(继承易读的后实现可写流)转换转换流(继承双工)
背压问题
磁盘写入数据的速度是远低于内存的,我们想象内存和磁盘之间有一个"管道","管道"中是"流",内存的数据流入管道是非常快的,当管道塞满时,内存中就会产生数据背压,数据积压在内存中,占用资源。
开发流的解决办法是为每一个流的缓存池(就是图中写入队列)设置一个浮标值,当其中数据量达到这个浮标值后,往缓存池再次推数据时就会返回假的,表示当前流中缓存池内容已经达到浮标值,不希望再有数据写入了,这时我们应该立即停止数据的生产,防止缓存池过大产生背压。
Readable
可读流是流的一种类型,它有两种模式和三种状态。
两种阅读模式:
流模式:数据将从底层系统读取,并写入缓冲区。当缓冲区已满时,数据会尽快通过EventEmitter自动转移到注册的事件处理程序中。
暂停模式:在这种模式下,不会主动触发EventEmitter传输数据。必须显示Readable.read()方法才能从缓冲区读取数据,read将触发对EventEmitter事件的响应。
三种状态:
ReadableFlowing===null(初始状态)
ReadableFlowing===false(暂停模式)
ReadableFlowing===true(流动模式)
初始流的Readable.readableFlowing为空。
添加数据事件后,该值变为真。当pause()、unpipe()被调用,或者接收到回压或者添加了可读事件时,readableFlowing将被设置为false,在这个状态下,为 data 事件绑定监听器不会使 readableFlowing 切换到 true。
调用resume()将可读流的readableFlowing切换为true。
当缓冲区中有新的可读数据时,将触发事件描述可读(每个想要插入缓存池的节点都将被触发)。每次消耗数据时都会触发数据。参数是这次消耗的数据会比较接近,有错误会触发错误流。方法描述read(size)将用于使用长度为size的数据。如果返回null,则当前数据小于大小;否则,将返回这次消耗的数据。当没有交付size时,意味着缓存池中的所有数据都被消耗const fs=require( fs );
const read streams=fs . create read stream(。/EventEmitter.js ,{
HighWaterMark: 100//缓冲池浮标值
})
readStreams.on(可读,()={
Console.log(缓冲区已满)
ReadStreams.read()//消耗缓存池的所有数据,返回结果并触发数据事件。
})
readStreams.on(data ,(data)={
console.log(data )
})https://github1s . com/nodejs/node/blob/v 16 . 14 . 0/lib/internal/streams/readable . js # L527
当缓存池中的数据长度达到buoy值highWaterMark时,它不会主动请求生产数据,而是等待数据被消耗后再生产。
如果挂起的流不调用read来消耗数据,那么它就不会触发数据,以后也是可读的。调用read进行消费时,会先判断这次消费后剩余的数据长度是否低于buoy值。如果低于buoy值,它将在消费前请求生产数据。这样,read后的逻辑执行完成后,大概率产生了新的数据,然后再次触发readable。这种提前产生下一次消费的数据并存储在缓存池中的机制也是缓存流量快的原因。
流动状态下有两种流动。
当生产速度慢于消费速度时:在这种情况下,每次数据生产后,缓存池中将没有剩余数据。只需将这次产生的数据传递给数据事件(因为不进入缓存池,不需要调用read来消费),然后立即开始产生新的数据。最后一次数据消耗后,会产生新的数据,再次触发数据,一次结束流。当生产速度快于消耗速度时:此时,每次数据生产后,通用缓存池中仍有未消耗的数据。在这种情况下,下一个消费数据将在消费数据时产生。旧数据被消耗后,新数据已经产生并放入缓存池。它们的区别只是数据产生后缓存池中是否有数据。如果有数据,产生的数据将被推送到缓存池供使用。如果没有,数据将直接移交给数据,而不会添加到缓存池中。
值得注意的是,当缓存池中有数据流从暂停模式进入流动模式时,会循环调用read来消耗数据,直到返回null。
暂停模式
在暂停模式下,当创建可读流时,该模式为暂停模式。创建后,会自动调用_read方法将数据从数据源推送到缓冲池,直到缓冲池中的数据达到buoy值。每当数据达到buoy值时,可读流将触发一个“可读”事件,告诉消费者有数据可供进一步使用。
一般来说,“可读”事件表示流中有新内容:要么有新数据,要么到达了流的末尾。因此,在读取数据源的数据之前,会触发一个“可读”事件。
在消费者 readable 事件的处理函数中,缓冲池中的数据是通过stream.read(size)主动消费的。
const { Readable }=require(“stream”)
让计数=1000
const myReadable=new Readable({
高水位线:300,
//参数的read方法将作为流的_read方法获取源数据。
读取(大小){
//假设我们的源数据上有1000个1
让chunk=null
//读取数据的过程一般是异步的,比如IO操作
setTimeout(()={
如果(计数为0) {
let chunkLength=Math.min(计数,大小)
chunk=1 。重复(组块长度)
计数=组块长度
}
this.push(块)
}, 500)
}
})
//每次成功将数据推送到缓存池时都会触发readable。
myReadable.on(可读,()={
Const chunk=myReadable.read()//消耗当前缓存池中的所有数据
console.log(chunk.toString())
})//hwm不会大于1GB。
const MAX _ HWM=0x40000000
函数computeNewHighWaterMark(n) {
if (n=MAX_HWM) {
//1GB限制
n=MAX _ HWM
}否则{
//取下2个最高功率,以防止hwm过度增加
n-;
n =n ^ 1;
n =n ^ 2;
n =n ^ 4;
n =n ^ 8;
n =n 16
n;
}
返回n;
}
流动模式
所有可读流在开始时都处于暂停模式,可以通过以下方法切换到流动模式:
添加“数据”事件句柄;调用“resume”方法;使用 pipe 方法将数据发送到可写stream流模式,缓冲池中的数据会自动输出给消费者进行消费。同时,每次数据输出后,会自动回调_read方法,将数据源的数据放入缓冲池。如果此时缓冲池中没有数据,数据将直接传递给数据事件,而不经过缓冲池。直到流模式切换到其他暂停模式,或者数据源的数据被读出(push(null));
可读流可以通过以下方式切换回暂停模式:
如果没有管道目标,则调用stream.pause()。如果有管道目标,请删除所有管道目标。调用stream.unpipe()移除多个管道目标。const { Readable }=require(“stream”)
让计数=1000
const myReadable=new Readable({
高水位线:300,
读取(大小){
让chunk=null
setTimeout(()={
如果(计数为0) {
let chunkLength=Math.min(计数,大小)
chunk=1 。重复(组块长度)
计数=组块长度
}
this.push(块)
}, 500)
}
})
myReadable.on(data ,data={
console.log(data.toString())
})
Writable
可写流比可读流简单。
生产者调用write(chunk)时,会根据一些状态(corked、writing等)在内部选择是在缓冲队列中缓存还是call _write。),并且每次写入数据后都会尝试清空缓冲队列中的数据。如果缓冲区队列中的数据大小超过了highWaterMark,消费者将在调用write(chunk)后返回false,此时生产者应停止写入。
那么我什么时候可以继续写作呢?当缓冲区中的所有数据都被成功写入时,在缓冲区队列被清空后,将触发drain事件,此时生产者可以继续写入数据。
当生产者需要写完数据时,需要调用stream.end方法通知可写流结束。
const {可写,双工}=require(stream )
让文件内容=
const myWritable=new Writable({
高水位线:10,
Write (chunk,encoding,callback){//将作为_write方法。
setTimeout(()={
文件内容=块
Callback()//写入完成后调用
}, 500)
}
})
myWritable.on(close ,()={
console.log(close ,fileContent)
})
my writable . write( 123123 )//true
my writable . write( 123123 )//false
my Writable . end()const { Writable }=require( stream )
让文件内容=
const myWritable=new Writable({
高水位线:10,
写入(区块、编码、回调){
setTimeout(()={
文件内容=块
Console.log (consumption ,chunk.toString())
Callback()//写入完成后调用
}, 100)
}
})
myWritable.on(close ,()={
console.log(close ,fileContent)
})
让计数=0
函数productionData(){
让flag=true
while (count=20 flag){
flag=my writable . write(count . tostring())
数数
}
如果(计数20){
myWritable.end()
}
}
生产数据()
Mywriteable.on (drain ,productionData)上面是一个可写的流,buoy值为10。现在数据源是从0——20到连续的数值串,生产数据用来写数据。
第一次调用myWritable.write(0 )时,由于缓存池中没有数据, 0 不进入缓存池,而是直接交给_wirte。myWritable.write的返回值(“0”)为true。
在执行mywriteable.write (1 )时,由于没有调用_wirte的回调,说明最后一个数据还没有写入,位置保证了数据写入的顺序,所以只能创建一个缓冲区将 1 加入缓存池。后面的2-9都是这样。
当执行myWritable.write(10 )时,缓冲区长度为9(1-9),还没有达到buoy值。 10 继续加入缓存池作为缓冲区,缓存池长度变为11,所以mywriteable.write (1 )返回false,表示缓冲区中的数据足够了,我们需要等待drain事件的通知。
100ms后调用_write的回调( 0 ,编码,回调),表示已经写入 0 。然后,它将检查缓存池中是否有数据。如果有,它会先调用_read消耗缓存池的头节点( 1 ),然后继续重复这个过程,直到缓存池为空,再触发drain事件再次执行productionData。
调用myWritable.write(11 )来触发从步骤1开始的进程,直到流结束。
Duplex
了解了可读流和可写流之后,双工流就好理解了。其实双工流是继承了可读流然后实现了可写流(源代码是这么写的,但是应该说最好是同时实现可读流和可写流)。
双工流需要同时实现以下两种方法
实现_read()方法以生成可读流的数据。
实现_write()方法来使用可写流的数据。
以上两种方法如何实现,在上面的可写流和可读流部分已经介绍过了。这里需要注意的是,双工流有两个独立的缓存池,分别提供给两个流,它们的数据源也是不同的。
以NodeJs的标准iostream为例:
当我们在控制台输入数据时,会触发它的data事件,证明它具有可读流的功能。每次用户类型进入,就相当于调用可读的push方法来推送产生的数据。当我们调用它的write方法时,也可以输出内容到控制台,但是不会触发data事件,说明它有可写流的功能,有独立的缓冲区。_write方法的实现是让控制台显示文字。//每当用户在控制台输入数据(_read)时,就会触发数据事件,这就是可读流的特点。
process.stdin.on(data ,data={
process.stdin.write(数据);
})
//每秒向标准输入流产生数据(这是可写流的特点,会直接输出到控制台),不触发数据。
setInterval(()={
Process.stdin.write(不是用户控制台输入的数据)
}, 1000)
Transform
双工流可以被视为具有可写流的可读流。两者都是独立的,各自有自己的内部缓冲区。读取和写入事件独立发生。
转换流是双向的,其中读取和写入以因果关系发生。双工流的端点通过某种转换进行链接。阅读需要写作。
对于创建转换流,最重要的是实现_transform方法,而不是_write或_read。在_transform中,处理(消费)可写流写入的数据,然后为可读流生成数据。
转换流还经常会实现一个` _同花顺方法,他会在流结束前被调用,一般用于对流的末尾追加一些东西,例如压缩文件时的一些压缩信息就是在这里加上的const { write }=require(fs )
const { Transform,pass through }=require("stream ")
常量资源= 131212321312434123421342342835481627351346189146818649126412
常数转换=新转换({
高水位线:10,
转换(块、编码、回调){//转换数据,调用推将转换结果加入缓存池
this.push(chunk.toString().替换( 1 , @ ))
回调()
},
齐平(回调){//结束触发前执行
this.push( )
回调()
}
})
//写不断写入数据
让计数=0
transform.write( )
函数productionData() {
让标志=真
while (count=20 flag) {
标志=转换。写(数。tostring())
数数
}
如果(计数20) {
transform.end()
}
}
生产数据()
transform.on(drain ,productionData)
让结果=
transform.on(data ,data={
result=data.toString()
})
transform.on(end ,()={
console.log(结果)
//0@23456789@0@1@2@3@4@5@6@7@8@920
})
Pipe
管道是将上一个程序的输出作为下一个程序的输入,这是管道在Linux操作系统操作系统中管道的作用开发中的管道其实也类似,它管道用于连接两个流,上游的流的输出会作为下游的流的输入。
管道源管道(目的地,选项)要求sourec是可读的,目的地是可写的。其返回值是目的地。
https://github 1s。com/nodejs/node/blob/v 17。0 .0/lib/内部/streams/legacy。js # L16-L33
溪流。原型。管道=功能(目标,选项){
常量源=this
函数ondata(块){
如果(目的地。可写目标。write(chunk)==假源。暂停){
来源。pause();
}
}
source.on(data ,on data);
函数ondrain() {
如果(来源。可读来源。简历){
source.resume()。
}
}
dest.on(drain ,on drain);
//.后面的代码省略
}管道的实现非常清晰,当上游的流发出数据事件时会调用下游流的写方法写入数据,然后立即调用源。暂停()使得上游变为暂停状态,这主要是为了防止背压。
当下游的流将数据消费完成后会调用source.resume()使上游再次变为流动状态。
我们实现一个将数据文件中所有一替换为@然后输出到结果文件到管道。
const { Transform }=require(" stream ")
const { createReadStream,createWriteStream }=require(fs )
//一个位于管道中的转换流
函数createTransformStream(){
返回新转换({
转换(块、编码、回调){
this.push(chunk.toString().替换(/1/g, @ ))
回调()
}
})
}
createReadStream(./data’)。管道(createTransformStream())。管道(createWriteStream( ./result ))更多结节相关知识,请访问:节点射流研究…教程!以上就是什么是流(流)?如何理解开发中的流的详细内容,更多请关注我们其它相关文章!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。