一、认识Stream

什么是流呢?

  • 第一反应应该是流水,源源不断的流动;
  • 程序中的流也是类似的含义,我们可以想象当我们从一个文件中读取数据时,文件的二进制(字节)数据会源源不断的被读取到我们程序中;
  • 而这个一连串的字节,就是我们程序中的流;

所以,我们可以这样理解流:

  • 是连续字节的一种表现形式和抽象概念;
  • 流应该是可读的,也是可写的;

在之前学习文件的读写时,我们可以直接通过 readFile或者 writeFile方式读写文件,为什么还需要呢?

  • 直接读写文件的方式,虽然简单,但是无法控制一些细节的操作;
  • 比如从什么位置开始读、读到什么位置、一次性读取多少个字节;
  • 读到某个位置后,暂停读取,某个时刻恢复读取等等;
  • 或者这个文件非常大,比如一个视频文件,一次性全部读取并不合适;

二、文件读写的Stream

事实上Node中很多对象是基于流实现的:

  • http模块的Request和Response对象;
  • process.stdout对象;

官方:另外所有的流都是EventEmitter的实例:

Node.js中有四种基本流类型:

  • Writable:可以向其写入数据的流(例如 fs.createWriteStream())。
  • Readable:可以从中读取数据的流(例如 fs.createReadStream())。
  • Duplex:同时为Readable和的流Writable(例如 net.Socket)。
  • Transform:Duplex可以在写入和读取数据时修改或转换数据的流(例如zlib.createDeflate())。

这里我们通过fs的操作,演练一下Writable、Readable

三、Readable

之前读取文件的方式:

1
2
3
4
//之前读取一个文件的方式
fs.readFile('./foo.txt', (err,data) => {
console.log(data);
});

这种方式是一次性将一个文件中所有的内容都读取到程序(内存)中,但是这种读取方式就会出现我们之前提到的很多问题:

  • 文件过大、读取的位置、结束的位置、一次读取的大小;

这个时候,我们可以使用 createReadStream,我们来看几个参数,更多参数可以参考官网:

  • start:文件读取开始的位置;
  • end:文件读取结束的位置;
  • highWaterMark:一次性读取字节的长度,默认是64kb;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
const fs = require('fs');

//之前读取一个文件的方式
// fs.readFile('./foo.txt', (err,data) => {
//<Buffer 61 73 68 64 67 68 71 77>
// console.log(data);
// });

//使用Stream
const reader = fs.createReadStream('./foo.txt', {
start: 3,
end: 5,
highWaterMark: 2,
});

reader.on('data', (data) => {
console.log(data);
//暂停读取
reader.pause();
//3s后恢复读取
setTimeout(() => [reader.resume()], 3000);
});
//监听文件打开
reader.on('open', () => {
console.log('文件被打开');
});
//监听文件读取完毕
reader.on('end', () => {
console.log('文件读取完毕');
});
//监听文件关闭
reader.on('close', () => {
console.log('文件被关闭');
});

//E:\project\vs project\Node学习\06-Stream流>node Readable.js
//文件被打开
//<Buffer 64 67>

//3s后
//<Buffer 68>
//文件读取完毕
//文件被关闭

四、writeable

之前写入数据的方式:

1
2
3
4
const fs = require('fs');
fs.writeFile('./foo.txt', 'Hello', (err) => {
console.log(err);
});

这种方式相当于一次性将所有的内容写入到文件中,但是这种方式也有很多问题:

  • 比如我们希望一点点写入内容,精确每次写入的位置等;

这个时候,我们可以使用 createWriteStream,我们来看几个参数,更多参数可以参考官网:

  • flags:默认是w,如果我们希望是追加写入,可以使用 a或者 a+;
  • start:写入的位置;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const writer = fs.createWriteStream('./foo.txt', {
flags: 'a+',
start: 3,
});
writer.write('你好你好你好你好', (err) => {
console.log('写入成功');
});
writer.on('open', () => {
console.log('文件被打开');
});
writer.on('close', () => {
console.log('文件被关闭');
});
//文件被打开
//写入成功
//监听不到close

注意:这里有个bug(windows)

  • 我们写了a+ 他不会去这个位置添加内容,而是会在末尾追加
  • 我们需要写r+,这样就会在我们设置的start位置添加内容

我们并不能监听到 close 事件:

  • 这是因为写入流在打开后是不会自动关闭的;

  • p我们必须手动关闭,来告诉Node已经写入结束了;

  • 并且会发出一个 finish 事件的;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
const writer = fs.createWriteStream('./foo.txt', {
flags: 'a+',
start: 3,
});
writer.write('你好你好你好你好', (err) => {
console.log('写入成功');
});
writer.on('open', () => {
console.log('文件被打开');
});
writer.on('finish', () => {
console.log('文件写入结束,但没有被关闭');
});
writer.close();
writer.on('close', () => {
console.log('文件被关闭');
});

文件被打开
写入成功
文件写入结束,但没有被关闭
文件被关闭
  • 另外一个非常常用的方法是 end:end方法相当于做了两步操作: write传入的数据和调用close方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
const writer = fs.createWriteStream('./foo.txt', {
flags: 'a+',
start: 3,
});
writer.write('你好你好你好你好', (err) => {
console.log('写入成功');
});
writer.on('open', () => {
console.log('文件被打开');
});
writer.on('finish', () => {
console.log('文件写入结束,但没有被关闭');
});
writer.end('END')
writer.on('close', () => {
console.log('文件被关闭');
});

//此时foo.txt 原本无内容
//你好你好你好你好END

五、pipe

正常情况下,我们可以将读取到的 输入流,手动的放到 输出流中进行写入:

1
2
3
4
5
6
7
8
9
10
const fs = require('fs');
const reader = fs.createReadStream('./foo.txt');
const writer = fs.createWriteStream('./bar.txt');

reader.on('data', (data) => {
writer.write(data, (err) => {
console.log(err);
});
});

可以通过pipe来完成这样的操作:

1
reader.pipe(writer);

更多可参考官方文档。