简单理解 backpressure(背压)机制

前言

本文仅是做者本身的看法,若有错误还请即便指正node

为何存在背压机制?

咱们首先来看一段代码, 这段代码存在什么问题?缓存

乍一看,感受没啥大毛病,可是若是writable.write()写入数据比较慢,可是可读流又在不断的传输数据,就会形成内存溢出,造成阻塞。async

const fs = require('fs')

const readable = fs.createReadStream('./小妇人.mp4')
const writable = fs.createWriteStream('./小妇人(1).mp4')

readable.on('data', (chunk) => {
    // 这里存在问题↓↓↓↓↓↓↓
    writable.write(chunk);
})

readable.on('end', () => {
    writable.end()
})

流的错误处理

若是可写流,没法正确的处理大量由可读流传输的数据,可读流并不会被销毁,这会致使咱们写入的文件被损坏。咱们必须添加适当的错误处理程序,在当流发生故障的时候,销毁管道中的全部流。ide

const gzip = require('zlib').createGzip();
const fs = require('fs');

const readable = fs.createReadStream('好莱坞往事.1080p.mkv');
const writable = fs.createWriteStream('好莱坞往事.1080p.mkv.gz');

// 若是可写流发生故障,压缩文件会压缩失败
readable.pipe(gzip).pipe(writable);

在 Node 8.x 版本以前咱们使用 pump。对于更高版本的 Node, 可使用pipelineui

const gzip = require('zlib').createGzip();
const { pipeline } = require('stream')
const fs = require('fs');

const readable = fs.createReadStream('好莱坞往事.1080p.mkv');
const writable = fs.createWriteStream('好莱坞往事.1080p.mkv.gz');

pipeline(
    readable,
    gzip,
    writable,
    error => {
        if (error) {
            console.log('电影压缩失败')
        } else {
            console.log('电影压缩成功')
        }
    }
)

咱们也可使用 promisify 将其改形成 async/await 的形式。code

const gzip = require('zlib').createGzip()
const { pipeline } = require('stream')
const { promisify } = require('util')
const fs = require('fs')
const readable = fs.createReadStream('好莱坞往事.1080p.mkv')
const writable = fs.createWriteStream('好莱坞往事.1080p.mkv.gz')
const asyncPipeline = promisify(pipeline)

async function start () {
    try {
        await asyncPipeline(
            readable,
            gzip,
            writable
        )
        console.log('电影压缩成功')
    } catch (error) {
        console.log('电影压缩失败')
    }
}

start()

可读流太快了

硬盘的写入速度,远远小于硬盘的读取速度。若是可读流太快,而可写流的没法迅速的消费可读流传输的数据,写入流将会把 chunk,push 到写队列中方便以后使用,这样就会形成数据在内存中的累积。这个时候将会触发 backpressur(背压) 机制。若是没有 backpressur(背压) 机制,系统将会出现以下的问题:队列

  1. 内存溢出
  2. 其余进程变得缓慢
  3. 垃圾收集器将超负荷运做

背压机制是如何解决这些问题的?

在代码中调用pipe时,它会向可写流发出信号,表示有数据准备传输。当咱们的可写流使用 write() 写入数据时,若是写队列繁忙,或者内部缓存区已经溢出了,write() 将会返回false。进程

这个时候,背压机制就会启动,它会暂停任何数据传入到可写流中,并等待可写流准备好,清空内部缓存区后。将会发出 drain 事件,并恢复可读流的传输。事件

这就意味着 pipe 只会使用固定大小的内存,不会存在内存泄漏的问题。ip

为何咱们平时不多关注背压的问题呢?那是由于在你调用 pipe 时,Node.js已经自动处理了这些问题。可是若是咱们须要实现自定义流,则须要考虑到这些问题。

参考