Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: simplify .pipe() and .unpipe() in Readable #28583

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 16 additions & 46 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ function ReadableState(options, stream, isDuplex) {
// array.shift()
this.buffer = new BufferList();
this.length = 0;
this.pipes = null;
this.pipesCount = 0;
this.pipes = [];
this.flowing = null;
this.ended = false;
this.endEmitted = false;
Expand Down Expand Up @@ -148,6 +147,13 @@ function ReadableState(options, stream, isDuplex) {
}
}

// Legacy getter for `pipesCount`
Object.defineProperty(ReadableState.prototype, 'pipesCount', {
get() {
return this.pipes.length;
}
});

function Readable(options) {
if (!(this instanceof Readable))
return new Readable(options);
Expand Down Expand Up @@ -635,19 +641,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
const src = this;
const state = this._readableState;

switch (state.pipesCount) {
case 0:
state.pipes = dest;
break;
case 1:
state.pipes = [state.pipes, dest];
break;
default:
state.pipes.push(dest);
break;
}
state.pipesCount += 1;
debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
state.pipes.push(dest);
debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts);

const doEnd = (!pipeOpts || pipeOpts.end !== false) &&
dest !== process.stdout &&
Expand Down Expand Up @@ -717,9 +712,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
if (((state.pipesCount === 1 && state.pipes === dest) ||
(state.pipesCount > 1 && state.pipes.includes(dest))) &&
!cleanedUp) {
if (state.pipes.length > 0 && state.pipes.includes(dest) && !cleanedUp) {
debug('false write response, pause', state.awaitDrain);
state.awaitDrain++;
}
Expand Down Expand Up @@ -789,38 +782,16 @@ Readable.prototype.unpipe = function(dest) {
const unpipeInfo = { hasUnpiped: false };

// If we're not piping anywhere, then do nothing.
if (state.pipesCount === 0)
if (state.pipes.length === 0)
return this;

// Just one destination. most common case.
if (state.pipesCount === 1) {
// Passed in one, but it's not the right one.
if (dest && dest !== state.pipes)
return this;

if (!dest)
dest = state.pipes;

// got a match.
state.pipes = null;
state.pipesCount = 0;
state.flowing = false;
if (dest)
dest.emit('unpipe', this, unpipeInfo);
return this;
}

// Slow case with multiple pipe destinations.

if (!dest) {
// remove all.
var dests = state.pipes;
var len = state.pipesCount;
state.pipes = null;
state.pipesCount = 0;
state.pipes = [];
state.flowing = false;

for (var i = 0; i < len; i++)
for (var i = 0; i < dests.length; i++)
dests[i].emit('unpipe', this, { hasUnpiped: false });
return this;
}
Expand All @@ -831,9 +802,8 @@ Readable.prototype.unpipe = function(dest) {
return this;

state.pipes.splice(index, 1);
state.pipesCount -= 1;
if (state.pipesCount === 1)
state.pipes = state.pipes[0];
if (state.pipes.length === 0)
state.flowing = false;

dest.emit('unpipe', this, unpipeInfo);

Expand Down
12 changes: 6 additions & 6 deletions test/parallel/test-stream-pipe-same-destination-twice.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ const { PassThrough, Writable } = require('stream');
passThrough.pipe(dest);

assert.strictEqual(passThrough._events.data.length, 2);
assert.strictEqual(passThrough._readableState.pipesCount, 2);
assert.strictEqual(passThrough._readableState.pipes.length, 2);
assert.strictEqual(passThrough._readableState.pipes[0], dest);
assert.strictEqual(passThrough._readableState.pipes[1], dest);

passThrough.unpipe(dest);

assert.strictEqual(passThrough._events.data.length, 1);
assert.strictEqual(passThrough._readableState.pipesCount, 1);
assert.strictEqual(passThrough._readableState.pipes, dest);
assert.strictEqual(passThrough._readableState.pipes.length, 1);
assert.deepStrictEqual(passThrough._readableState.pipes, [dest]);

passThrough.write('foobar');
passThrough.pipe(dest);
Expand All @@ -47,7 +47,7 @@ const { PassThrough, Writable } = require('stream');
passThrough.pipe(dest);

assert.strictEqual(passThrough._events.data.length, 2);
assert.strictEqual(passThrough._readableState.pipesCount, 2);
assert.strictEqual(passThrough._readableState.pipes.length, 2);
assert.strictEqual(passThrough._readableState.pipes[0], dest);
assert.strictEqual(passThrough._readableState.pipes[1], dest);

Expand All @@ -64,15 +64,15 @@ const { PassThrough, Writable } = require('stream');
passThrough.pipe(dest);

assert.strictEqual(passThrough._events.data.length, 2);
assert.strictEqual(passThrough._readableState.pipesCount, 2);
assert.strictEqual(passThrough._readableState.pipes.length, 2);
assert.strictEqual(passThrough._readableState.pipes[0], dest);
assert.strictEqual(passThrough._readableState.pipes[1], dest);

passThrough.unpipe(dest);
passThrough.unpipe(dest);

assert.strictEqual(passThrough._events.data, undefined);
assert.strictEqual(passThrough._readableState.pipesCount, 0);
assert.strictEqual(passThrough._readableState.pipes.length, 0);

passThrough.write('foobar');
}
7 changes: 3 additions & 4 deletions test/parallel/test-stream-pipe-unpipe-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ assert.strictEqual(source._readableState.pipes.length, 2);

source.unpipe(dest2);

assert.strictEqual(source._readableState.pipes, dest1);
assert.deepStrictEqual(source._readableState.pipes, [dest1]);
assert.notStrictEqual(source._readableState.pipes, dest2);

dest2.on('unpipe', common.mustNotCall());
source.unpipe(dest2);

source.unpipe(dest1);

assert.strictEqual(source._readableState.pipes, null);
assert.strictEqual(source._readableState.pipes.length, 0);

{
// Test `cleanup()` if we unpipe all streams.
Expand All @@ -43,8 +43,7 @@ assert.strictEqual(source._readableState.pipes, null);
const destCheckEventNames = ['close', 'finish', 'drain', 'error', 'unpipe'];

const checkSrcCleanup = common.mustCall(() => {
assert.strictEqual(source._readableState.pipes, null);
assert.strictEqual(source._readableState.pipesCount, 0);
assert.strictEqual(source._readableState.pipes.length, 0);
assert.strictEqual(source._readableState.flowing, false);

srcCheckEventNames.forEach((eventName) => {
Expand Down
12 changes: 6 additions & 6 deletions test/parallel/test-stream-unpipe-event.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class NeverEndReadable extends Readable {
dest.on('unpipe', common.mustCall());
src.pipe(dest);
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 0);
assert.strictEqual(src._readableState.pipes.length, 0);
});
}

Expand All @@ -34,7 +34,7 @@ class NeverEndReadable extends Readable {
dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted'));
src.pipe(dest);
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 1);
assert.strictEqual(src._readableState.pipes.length, 1);
});
}

Expand All @@ -46,7 +46,7 @@ class NeverEndReadable extends Readable {
src.pipe(dest);
src.unpipe(dest);
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 0);
assert.strictEqual(src._readableState.pipes.length, 0);
});
}

Expand All @@ -57,7 +57,7 @@ class NeverEndReadable extends Readable {
dest.on('unpipe', common.mustCall());
src.pipe(dest, { end: false });
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 0);
assert.strictEqual(src._readableState.pipes.length, 0);
});
}

Expand All @@ -68,7 +68,7 @@ class NeverEndReadable extends Readable {
dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted'));
src.pipe(dest, { end: false });
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 1);
assert.strictEqual(src._readableState.pipes.length, 1);
});
}

Expand All @@ -80,6 +80,6 @@ class NeverEndReadable extends Readable {
src.pipe(dest, { end: false });
src.unpipe(dest);
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 0);
assert.strictEqual(src._readableState.pipes.length, 0);
});
}
4 changes: 2 additions & 2 deletions test/parallel/test-stream2-basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ class TestWriter extends EE {
w[0].on('write', function() {
if (--writes === 0) {
r.unpipe();
assert.strictEqual(r._readableState.pipes, null);
assert.deepStrictEqual(r._readableState.pipes, []);
w[0].end();
r.pipe(w[1]);
assert.strictEqual(r._readableState.pipes, w[1]);
assert.deepStrictEqual(r._readableState.pipes, [w[1]]);
}
});

Expand Down