nodejs 高并发解决方案,
Node.js的4种流是什么?背压问题是什么?下面这篇文章就带你了解一下Node.js中的four stream,并介绍背压问题及解决方法,希望对你有所帮助!
node.js速度课程简介:进入学习
如何把东西从A移到B?
举起来,移动到目的地,放下就行了。
如果这东西有一吨重呢?
只是一部分一部分地移动。
其实IO就是搬东西,包括网络的IO和文件的IO。如果数据量小,直接传输所有内容就够了。但如果内容特别大,一次性加载到内存就会崩溃,速度慢。这时候就可以一部分一部分的处理了。这就是流的概念。
所有语言基本都实现了stream的api,Node.js stream api也是常用的,下面我们来探讨一下stream。
本文将回答以下问题:
Node.js的四个流是什么?
如何将生成器与可读流结合起来
河流的暂停和流动
什么是背压问题,如何解决?
[推荐研究:《nodejs 教程》]
Node.js 的 4种 stream
流的直观感受
从一个地方到另一个地方,很明显有流出方和流入方。流出方是可读的流,而流入方是可写的流。
当然,有些流是可以流入流出的,这就是所谓的双工流。
既然可以流入流出,是不是可以把流入的内容降频后再流出?这种流动叫做变换。
双工流的流入和流出内容不需要关联,而变换流的流入和流出是关联的,这是两者的区别。
流的 api
Node.js提供的流是上面介绍的四种:
const stream=require( stream );
//可读流
常量可读=流。可读;
//可写流
常量可写=流。可写;
//双工流
常量双工=流。双工;
//转换流
常量转换=流。转型;他们都有实现它的方法:
Readable需要实现_read方法来返回内容。
Writable需要实现_write方法来接受内容。
双工需要实现_read和_write方法来接受和返回内容。
Transform需要实现_transform方法,对接受的内容进行转换并返回。
我们分开来看一下:
Readable
Readable实现_read方法,通过push返回特定数据。
const Stream=require( Stream );
const readableStream=Stream。可读();
readableStream。_read=function() {
This.push(阿门前藤);
This.push(阿东阿东绿刚发芽,);
this . push(‘阿东背着那个沉重的壳,’);
this . push(‘一步一步往上爬。’)
this.push(空);
}
readableStream.on(data ,(data)={
console.log(data.toString())
});
readableStream.on(end ,()={
console . log( done ~ );
});当你按下一个null时,意味着结束流。
执行效果如下:
可读也可以通过继承来创建:
const Stream=require( Stream );
类ReadableDong扩展流。可读{
构造函数(){
super();
}
_read() {
This.push(阿门前藤);
This.push(阿东阿东绿刚发芽,);
this . push(‘阿东背着那个沉重的壳,’);
this . push(‘一步一步往上爬。’)
this.push(空);
}
}
const readableStream=new ReadableDong();
readableStream.on(data ,(data)={
console.log(data.toString())
});
readableStream.on(end ,()={
console . log( done ~ );
});可读流生成内容,因此自然可以与生成器结合使用:
const Stream=require( Stream );
类ReadableDong扩展流。可读{
构造函数(迭代器){
super();
this.iterator=iterator
}
_read() {
const next=this . iterator . next();
if(next.done) {
返回this . push(null);
}否则{
this.push(next.value)
}
}
}
函数*songGenerator() {
产量“阿门之前的葡萄树,”;
产量‘阿东阿东绿刚发芽’;
屈服‘阿东背着那个沉重的壳’;
屈服‘一步一步往上爬’;
}
const song iterator=song generator();
const readable stream=new readable dong(song iterator);
readableStream.on(data ,(data)={
console.log(data.toString())
});
readableStream.on(end ,()={
console . log( done ~ );
});这是可读的流,通过实现_read方法返回内容。
Writable
Writable实现_write方法来接收写入的内容。
const Stream=require( Stream );
const writableStream=Stream。writable();
可写流。_write=function (data,enc,next) {
console . log(data . tostring());
//每秒写一次
setTimeout(()={
next();
}, 1000);
}
writableStream.on(finish ,()=console . log( done ~ ));
WritableStream.write(阿门前藤);
WritableStream.write(阿东阿东绿刚发芽,);
writable stream . write(‘阿东背着那个沉重的壳,’);
WritableStream.write(一步一步往上爬。);
writable stream . end();将接收到的内容打印出来,并调用next来处理下一个写入的内容。这里调用next是异步的,频率可以控制。
运行之后,它确实可以正常处理所写的内容:
这是可写的流,通过实现_write方法处理写入的内容。
Duplex
双工是可读可写的,同时实现_read和_write就足够了。
const Stream=require( Stream );
var duplexStream=Stream。duplex();
双工流。_read=function () {
This.push(阿门前藤);
This.push(阿东阿东绿刚发芽,);
this . push(‘阿东背着那个沉重的壳,’);
this . push(‘一步一步往上爬。’)
this.push(空);
}
双工流。_write=function (data,enc,next) {
console . log(data . tostring());
next();
}
duplexStream.on(data ,data=console . log(data . tostring()));
duplexStream.on(end ,data=console . log( read done ~ );
DuplexStream.write(阿门前藤);
DuplexStream.write(阿东阿东绿色刚发芽,);
duplex stream . write(‘阿东背着那个沉重的壳,’);
DuplexStream.write(一步一步往上爬。);
duplex stream . end();
duplexStream.on(finish ,data=console . log( write done ~ ));结合了可读流和可写流的功能,这就是Duplex流Duplex。
Transform
双工流是可读和可写的,但它们之间没有关联。有时需要将传入的内容转换后再流出。这时,就需要进行流转换了。
转换流为了实现_transform的api,我们实现了下面的转换流来反转内容:
const Stream=require( Stream );
类TransformReverse扩展流。转换{
构造函数(){
超级()
}
_transform(buf,enc,next) {
const res=buf.toString()。拆分(“”)。反转()。联接(“”);
这个. push(res)
下一个()
}
}
var transform stream=new transform reverse();
transformStream.on(data ,data=console.log(data.toString()))
transformStream.on(end ,data=console . log( read done ~ );
TransformStream.write(阿门前藤);
TransformStream.write(阿东阿东绿刚发芽);
transform stream . write(‘阿东背着那个沉重的壳’);
TransformStream.write(逐步);
transformStream.end()
transformStream.on(finish ,data=console . log( write done ~ );运行后,效果如下:
流的暂停和流动
我们从可读流中获取内容,然后流入可写流。双方分别实现_read和_write,流程实现。
背压
但是读和写都是异步的。两个费率不一致怎么办?
如果通过可读方式读取数据的速率高于通过可写方式写入数据的速率,一些数据将会在缓冲区中累积。如果缓冲的数据太多,就会出现突发,丢失数据。
如果可读的读取数据的速率小于可写的写入数据的速率呢?没关系。最多也就是中间有一段空闲时间。
这种读速率高于写速率的现象称为“反压力”或“负压”。这也很好理解,写段压力比较大,写不进去,会撑破缓冲导致数据丢失。
这个缓冲区大小可以通过readableHighWaterMark和writableHightWaterMark来检查,是16k。
解决背压
如何解决读写速度不一致的问题?
当你还没有写完的时候,就停止阅读。这样,越来越多的数据将不会被读入并驻留在缓冲区中。
可读流具有readableFlowing属性,该属性表示数据是否自动读入。默认值为true,即自动读入数据,然后您可以通过监控数据事件来获取它。
readableFlowing设置为false时,不会自动读取,需要通过read手动读取。
readable stream . readable flow=false;
let数据;
while((data=readable stream . read())!=null) {
console . log(data . tostring());
}但是我们自己手动看比较麻烦。我们仍然可以使用自动流入方法并调用暂停和恢复来暂停和恢复。
当调用writetablestream的write方法时,将返回一个布尔值来指示目标是被写入还是被放入缓冲区:
True:数据已写入目标。
True:目标不可写,临时放在缓冲区中。
我们可以判断在返回false时暂停,然后在缓冲区为空时恢复:
const RS=fs . createreadstream(src);
const ws=fs . createwritestream(dst);
rs.on(数据,函数(块){
if (ws.write(chunk)===false) {
RS . pause();
}
});
rs.on(end ,function () {
ws . end();
});
ws.on(drain ,function () {
RS . resume();
});这样就可以实现根据写速率暂停和恢复读速率的功能,解决了背压的问题。
pipe 有背压问题么?
通常,我们使用管道直接将可读流连接到可写流,但似乎从未遇到过背压的问题。实际上,读取速率的动态调整已经在pipe内部完成。
const RS=fs . createreadstream(src);
const ws=fs . createwritestream(dst);
rs .管道(ws);
总结
流是传输数据时常见的一种思路,即传输内容的一部分,是文件读写和网络通信的基本概念。
Node.js还提供了流的api,包括可读的可读流、可写的可写流、Duplex双工流和Transform转换流。它们分别实现_read、_write、_read _write、_transform方法来返回和处理数据。
创建可读对象,可以直接调用可读api创建,然后重写_read方法,也可以继承可读实现一个子类,然后实例化。其他流程相同。(可读性可以很容易地与生成器结合)
当读取速率高于写入速率时,会出现“反压”现象,使缓冲区爆裂,造成数据丢失。解决方案是根据写入速率动态暂停和恢复可读流速率。Pipe没有这个问题,因为已经内部处理了。
是一个I/O绕不开的概念,背压问题也是一个很常见的流量问题。当数据丢失时,可以考虑是否发生了反压。希望这篇文章能帮你理清思路,真正掌握stream!
有关编程的更多信息,请访问:编程视频!以上是了解Node.js中四大流,解决“背压”问题的详细内容。请多关注我们的其他相关文章!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。