brunoyang / blog

134 stars 13 forks source link

抛弃难用的stream #25

Open brunoyang opened 3 years ago

brunoyang commented 3 years ago

什么是 stream

简单来说,将一个资源分成一个个小块(chunk)传输,而不是一次性传输所有数据,这种技术就可以被称为 stream。 数据源被称为 readable stream,接受数据方被称为 writeable stream,而在两者之间可以对数据做一些处理的中间步骤被称为 transform stream。 Stream 的优点就是不会对内存产生压力,并且可以让数据尽可能快地到达,而不必等待所有数据加载到内存。

难用的 stream

对我来说 stream 可能是 nodejs 中最难以掌握的部分,每次使用到 stream 时,都需要翻开文档查 api。另外,也很容易忘记处理错误,如当要从 readable stream pipe 到 writeable stream 时,要同时处理两个 stream 的错误,但往往会遗漏。

让我们看个例子,从一个 url 下载图片到本地

function download(url: string, filePath: string): Promise<void> {
  return new Promise((resolve, reject) => {
    const file = createWriteStream(filePath);
    file.on('finish', resolve);

    http.get(url, response => {
      response.pipe(file);
    }); 
  });
}

Emm,好像忘了处理错误,让我们来加上:

function download(url: string, filePath: string): Promise<void> {
  return new Promise((resolve, reject) => {
    const file = createWriteStream(filePath);

    const request = network.get(url, response => {
      if (response.statusCode === 200) {
        response.pipe(file);
      } else {
        file.close();
        unlinkSync(filePath);
        reject(`Server responded with ${response.statusCode}: ${response.statusMessage}`);
      }
    });

    request.on('error', err => {
      file.close();
      unlinkSync(filePath);
      reject(err.message);
    });

    file.on('finish', resolve);

    file.on('error', (err: FileErr) => {
      file.close();

      if (err.code === 'EEXIST') {
        reject('File already exists');
      } else {
        unlinkSync(filePath);
        reject(err.message);
      }
    });
  });
}

这下看起来不错了,已经补上了所有错误处理的逻辑。但是一个如此简单的需求我们却写了这么多代码,说明 stream 太难用了,需要我们手动处理太多逻辑。

那么,「难用」体现在哪呢? 第一,pipe 方法倾向于让用户使用链式调用,非常容易忘记写错误处理逻辑; 第二,http.get 回调参数 和 createWriteStream 都基于 stream,需要事无巨细地处理各种事件; 第三,逻辑被分割在各个回调里,无法清晰地阅读及调试。

新的工具

node 基础库里的 http 模块非常底层,所以 node 社区为 http client 出了一个又一个轮子,如axios、request、urllib等等。官方显然也是看到了这个问题,于是写了一个轮子Undici,同样基于 stream,但封装了很多细节,支持 promise,未来有望合并到 Node 基础库内。贴一段示例代码

import { request } from 'undici'

const {
  statusCode,
  headers,
  body
} = await request('http://localhost:3000/foo')

console.log('response received', statusCode)
console.log('headers', headers)

for await (const data of body) {
  console.log('data', data)
}

可以看到 api 设计已经和流行的 http client 比较接近了。

Pipe 方法也有了一个替代方案,在 Node 10 内新增了一个 pipeline 方法,注意是pipeline不是pipe。再贴一段官方示例:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed.', err);
    } else {
      console.log('Pipeline succeeded.');
    }
  }
);

抛弃了链式写法,更加清爽,强烈建议换用。并且还有 promise 版本:

const { pipeline } = require('stream/promises');

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz')
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);

重构

让我们使用上面的工具重构最初的需求

async function download(url: string, filePath: string): Promise<void> {
  try {
    const { body } = await request(url);
    await pipeline(
      body,
      createWriteStream(filePath),
    );
  } catch (e) {
    console.log('下载失败', url);
    unlinkSync(filePath);
  }
}

这个版本的代码,清晰易调试,不会忘记处理错误,你有什么理由不用呢~

总结

又做了一回标题党,我们其实并不需要抛弃 stream,反而应该大力拥抱,WHATWG已经制定了web streams 标准,node 也实现了这套标准,stream 也早已脱离 node,来到了浏览器。通过高层级封装的 api,我们可以更加方便地使用,或许2022年就将是 the year of web streams :)