streamThere are a number of conventions associated with streams, which have pretty minimal requirements.
Suppose you have a var stream = fs.createReadStream('README.md'); in this case,
stream.autoClose = true by default. Until you call stream.read(1000),
stream.readable = true, and stream.destroyed = stream.closed = undefined.
Once you read to the end, i.e., stream.read(size) returns a Buffer shorter than
size, or null (I kinda wish it would return a 0-length Buffer, but no; not even if you open the stream with {autoClose: false}), stream.readable = false, and stream.destroyed = stream.closed = true.
The fs.ReadStream instance returned by fs.createReadStream() also has three helper functions on top of its stream.Readable properties:
close(callback): This sets this.closed = true, calls fs.close() on the underlying file descriptor id, attaches callback as a listener for the ‘close’ event, and emits ‘close’.destroy(): This sets this.destroyed = true, and calls this.close() if this.fd is still set.open(): This calls fs.open() on the stream’s path with its flags and mode, sets this.fd, emits ‘open’ with the file descriptor id as the argument, and calls this.read() to start the data flowing.A few insights from reading the node/lib/_stream_readable.js source code:
var stream = new stream.Readable();
var state = stream._readableState;
stream.read(size):
_read()); for example, state.reading (and state.ended) must be false._read(), it will set state.reading = true and state.sync = true._read() returns, it calls state.sync = false, but state.reading is left for the underlying implementation to reset. If _read() is synchronous, and confident that it read all that it could / needed to before exiting, it should set _readableState.reading = false.stream.push(chunk, encoding): When you call stream.push(chunk) and nothing triggers an error, state.reading = false is one of the first things that will be called. If state.flowing == true and the internal buffer is empty and state.sync == false (because stream.push() was called outside _read()), the chunk will be emitted immediately. Otherwise, it will be added to state.buffer..pause(), .resume(), or listening for ‘data’ events (the Readable.prototype.on = function(...) handler is overloaded to handle special configuration for specific event types, like ‘data’).state.flowing doesn’t indicate a legacy stream; state.flowing simply indicates whether we have an active reader, like a ‘data’ event listener, or a downstream pipe.state.flowing = true, like it will when we listen for ‘data’ events, but once data is read, and the stream emits a ‘readable’, it will set state.flowing = true.Custom instance of stream.Transform({objectMode: true}):
{ _readableState:
{ highWaterMark: 16384,
buffer: [],
length: 0,
pipes: null,
pipesCount: 0,
flowing: false,
ended: false,
endEmitted: false,
reading: false,
calledRead: false,
sync: false,
needReadable: true,
emittedReadable: false,
readableListening: false,
objectMode: true,
defaultEncoding: 'utf8',
ranOut: false,
awaitDrain: 0,
readingMore: false,
decoder: null,
encoding: null },
readable: true,
domain: null,
_events:
{ end: { [Function: g] listener: [Function: onend] },
finish: { [Function: g] listener: [Function] } },
_maxListeners: 10,
_writableState:
{ highWaterMark: 16384,
objectMode: true,
needDrain: false,
ending: false,
ended: false,
finished: false,
decodeStrings: true,
defaultEncoding: 'utf8',
length: 0,
writing: false,
sync: true,
bufferProcessing: false,
onwrite: [Function],
writecb: null,
writelen: 0,
buffer: [],
errorEmitted: false },
writable: true,
allowHalfOpen: true,
_transformState:
{ afterTransform: [Function],
needTransform: false,
transforming: false,
writecb: null,
writechunk: null },
_transform: [Function] }
Custom instance of stream.Duplex({objectMode: true}):
{ _readableState:
{ highWaterMark: 16384,
buffer: [],
length: 0,
pipes: null,
pipesCount: 0,
flowing: false,
ended: false,
endEmitted: false,
reading: false,
calledRead: false,
sync: true,
needReadable: false,
emittedReadable: false,
readableListening: false,
objectMode: true,
defaultEncoding: 'utf8',
ranOut: false,
awaitDrain: 0,
readingMore: false,
decoder: null,
encoding: null },
readable: true,
domain: null,
_events: { end: { [Function: g] listener: [Function: onend] } },
_maxListeners: 10,
_writableState:
{ highWaterMark: 16384,
objectMode: true,
needDrain: false,
ending: false,
ended: false,
finished: false,
decodeStrings: true,
defaultEncoding: 'utf8',
length: 0,
writing: false,
sync: true,
bufferProcessing: false,
onwrite: [Function],
writecb: null,
writelen: 0,
buffer: [],
errorEmitted: false },
writable: true,
allowHalfOpen: true }