stream
There are a number of conventions associated with streams, which have pretty minimal requirements.
Suppose you have a var stream = fs.createReadStream('README.md')
; in this case,
stream.autoClose = true
by default. Until you call stream.read(1000)
,
stream.readable = true
, and stream.destroyed = stream.closed = undefined
.
Once you read to the end, i.e., stream.read(size)
returns a Buffer shorter than
size
, or null
(I kinda wish it would return a 0-length Buffer, but no; not even if you open the stream with {autoClose: false}
), stream.readable = false
, and stream.destroyed = stream.closed = true
.
The fs.ReadStream
instance returned by fs.createReadStream()
also has three helper functions on top of its stream.Readable
properties:
close(callback)
: This sets this.closed = true
, calls fs.close()
on the underlying file descriptor id, attaches callback
as a listener for the ‘close’ event, and emits ‘close’.destroy()
: This sets this.destroyed = true
, and calls this.close()
if this.fd
is still set.open()
: This calls fs.open()
on the stream’s path with its flags and mode, sets this.fd
, emits ‘open’ with the file descriptor id as the argument, and calls this.read()
to start the data flowing.A few insights from reading the node/lib/_stream_readable.js
source code:
var stream = new stream.Readable();
var state = stream._readableState;
stream.read(size)
:
_read()
); for example, state.reading
(and state.ended
) must be false._read()
, it will set state.reading = true
and state.sync = true
._read()
returns, it calls state.sync = false
, but state.reading
is left for the underlying implementation to reset. If _read()
is synchronous, and confident that it read all that it could / needed to before exiting, it should set _readableState.reading = false
.stream.push(chunk, encoding)
: When you call stream.push(chunk)
and nothing triggers an error, state.reading = false
is one of the first things that will be called. If state.flowing == true
and the internal buffer is empty and state.sync == false
(because stream.push()
was called outside _read()
), the chunk will be emitted immediately. Otherwise, it will be added to state.buffer
..pause()
, .resume()
, or listening for ‘data’ events (the Readable.prototype.on = function(...)
handler is overloaded to handle special configuration for specific event types, like ‘data’).state.flowing
doesn’t indicate a legacy stream; state.flowing
simply indicates whether we have an active reader, like a ‘data’ event listener, or a downstream pipe.state.flowing = true
, like it will when we listen for ‘data’ events, but once data is read, and the stream emits a ‘readable’, it will set state.flowing = true
.Custom instance of stream.Transform({objectMode: true})
:
{ _readableState:
{ highWaterMark: 16384,
buffer: [],
length: 0,
pipes: null,
pipesCount: 0,
flowing: false,
ended: false,
endEmitted: false,
reading: false,
calledRead: false,
sync: false,
needReadable: true,
emittedReadable: false,
readableListening: false,
objectMode: true,
defaultEncoding: 'utf8',
ranOut: false,
awaitDrain: 0,
readingMore: false,
decoder: null,
encoding: null },
readable: true,
domain: null,
_events:
{ end: { [Function: g] listener: [Function: onend] },
finish: { [Function: g] listener: [Function] } },
_maxListeners: 10,
_writableState:
{ highWaterMark: 16384,
objectMode: true,
needDrain: false,
ending: false,
ended: false,
finished: false,
decodeStrings: true,
defaultEncoding: 'utf8',
length: 0,
writing: false,
sync: true,
bufferProcessing: false,
onwrite: [Function],
writecb: null,
writelen: 0,
buffer: [],
errorEmitted: false },
writable: true,
allowHalfOpen: true,
_transformState:
{ afterTransform: [Function],
needTransform: false,
transforming: false,
writecb: null,
writechunk: null },
_transform: [Function] }
Custom instance of stream.Duplex({objectMode: true})
:
{ _readableState:
{ highWaterMark: 16384,
buffer: [],
length: 0,
pipes: null,
pipesCount: 0,
flowing: false,
ended: false,
endEmitted: false,
reading: false,
calledRead: false,
sync: true,
needReadable: false,
emittedReadable: false,
readableListening: false,
objectMode: true,
defaultEncoding: 'utf8',
ranOut: false,
awaitDrain: 0,
readingMore: false,
decoder: null,
encoding: null },
readable: true,
domain: null,
_events: { end: { [Function: g] listener: [Function: onend] } },
_maxListeners: 10,
_writableState:
{ highWaterMark: 16384,
objectMode: true,
needDrain: false,
ending: false,
ended: false,
finished: false,
decodeStrings: true,
defaultEncoding: 'utf8',
length: 0,
writing: false,
sync: true,
bufferProcessing: false,
onwrite: [Function],
writecb: null,
writelen: 0,
buffer: [],
errorEmitted: false },
writable: true,
allowHalfOpen: true }