Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'continue' event for Readable streams #111

Closed
mscdex opened this issue Feb 1, 2015 · 31 comments
Closed

'continue' event for Readable streams #111

mscdex opened this issue Feb 1, 2015 · 31 comments

Comments

@mscdex
Copy link
Contributor

mscdex commented Feb 1, 2015

Currently Writables have a 'drain' event that fires when more writes can be done. It would be nice to have something like this for Readables when push() returns false. It's not just a matter of emitting on every call to _read() since _read() can be called when the highWaterMark hasn't been reached yet. It's especially more tricky for Transforms because you'd have to manually override _read() in your subclass, which is not fun.

I propose a 'continue' event for Readables.

@mscdex
Copy link
Contributor Author

mscdex commented Feb 1, 2015

FWIW here's what I'm currently having to do (to avoid having to touch everywhere I do this.push()) to get this functionality:

function MyStream() {
   // ...
  this._needContinue = false;
  // ...
}
// ...
MyStream.prototype.__read = Transform.prototype._read;
MyStream.prototype._read = function(n) {
  if (this._needContinue) {
    this._needContinue = false;
    this.emit('continue');
  }
  return this.__read(n);
};
MyStream.prototype.__push = Transform.prototype.push;
MyStream.prototype.push = function(chunk, encoding) {
  var ret = this.__push(chunk, encoding);
  this._needContinue = (ret === false);
  return ret;
};

@sonewman
Copy link
Contributor

sonewman commented Feb 2, 2015

Hmm, so the problem is that you want to know when a stream has started to read, whether this is the initial read or once it starts reading again after some of back pressure?

@sonewman
Copy link
Contributor

sonewman commented Feb 2, 2015

This event could be internalised without it needing to have an addition to transform streams.

What is the exact use-case that requires you to need to know these internal details?

Normally if a stream is being consumed the consumer doesn't really concern itself with anything else other than receiving a chunk of data or knowing that the source has ended.

@mscdex
Copy link
Contributor Author

mscdex commented Feb 2, 2015

My use-case is this:

ssh2-streams provides protocol streams where the input (responses) from the remote end comes in through the writable side and the output (requests) to the remote end goes to the readable side. The output gets pushed out via this.push(...);. The return value of push() is returned back to the user at the higher level so they know if they should continue issuing more commands or not. However the user has no way to know when they can continue issuing commands again once the readable side has dipped below the highWaterMark.

This is solved for Writable streams via the drain event when write() returns false, but no such mechanism exists when push() returns false.

@sonewman
Copy link
Contributor

sonewman commented Feb 2, 2015

I'm just trying to think if there any other use-cases where this would useful, the actually addition would be simple but i'm not sure how keen we are to add new events like this into streams. @iojs/streams any opinions?

@mscdex
Copy link
Contributor Author

mscdex commented Feb 2, 2015

Well, the workaround I posted does work. But I'm not sure what (if any measurable) performance overhead it may have. Also I'd rather use a standard event name if possible. continue kind of made sense to me, but it's tricky because it's really the same function as drain, but for the Readable side. readableDrain perhaps? I dunno ...

@Raynos
Copy link
Contributor

Raynos commented Mar 1, 2015

Nice 👍 we were talking about drain for readables.

We ended up with something like

function drainReadable(readable, cb) {
  readable.on('readable', onReadable);


  function onReadable() {
    if (this._readableState.length < this._readableState.hwm) {
      readable.removeListener('readable', onReadable);
      cb();

    }
  }
}

@Raynos
Copy link
Contributor

Raynos commented Mar 1, 2015

@mscdex because you dont check the hwm your logic is incorrect. you could push(2mb) then read(2) and you would emit continue too early.

@mscdex
Copy link
Contributor Author

mscdex commented Mar 1, 2015

@Raynos You mean that _read() can be called when the amount of buffered data is still >= highWaterMark? That doesn't sound right...

@Raynos
Copy link
Contributor

Raynos commented Mar 1, 2015

@mscdex Oh interesting, You make a good point!

I was only using the readable event which can get emitted when buffered data >= highWaterMark

I think your trick is correct.

@calvinmetcalf
Copy link
Contributor

are there modules that already use the continue event? and will there be performance issues (we'd need a benchmark)

@calvinmetcalf
Copy link
Contributor

probably can't call it continue

@mscdex
Copy link
Contributor Author

mscdex commented May 18, 2016

@calvinmetcalf That's what I've been using for ssh2/ssh2-streams ever since I created this issue.

@calvinmetcalf
Copy link
Contributor

http might use that event

On Wed, May 18, 2016 at 10:34 AM Brian White notifications@github.com
wrote:

@calvinmetcalf https://github.com/calvinmetcalf That's what I've been
using for ssh2/ssh2-streams.


You are receiving this because you were mentioned.
Reply to this email directly or view it on GitHub
#111 (comment)

@mscdex
Copy link
Contributor Author

mscdex commented May 18, 2016

@calvinmetcalf Well it was only a suggestion and I had to pick something at the time. I can change my module(s).

This is one reason why I wanted the streams WG to discuss it :-)

@mcollina
Copy link
Member

mcollina commented May 19, 2016

👍 for having this in.

Probably not with the 'continue' event name, let's start bikeshedding 😁 . How about awaitingPush?

@mscdex can you please define a little bit better the state machine of this event? As far as I understand the use case, this is when you are wrapping something else, and you need to know when to call push(buf), right? can you make an example from an user point of view?

@mscdex
Copy link
Contributor Author

mscdex commented May 19, 2016

@mcollina I'm not really wrapping anything. I have a custom Readable stream implementation. When I call this.push(...) to make data available on the stream, and it returns false, I need to have some way to know when I can start pushing more data. For writable streams, there is already drain for write(), but there is no similar event for push() for readable streams.

As I previously described, my use case is for so-called protocol streams where I have one stream for parsing and writing for a particular protocol. Writing to the writable side is for incoming protocol data to be parsed and the writing to the readable side is for outgoing protocol data (e.g. a user sends a request -- it gets encoded as per the protocol and the raw bytes are push()ed out to the readable side). These protocol streams have extra functions that provide a convenient way for a user to send outgoing requests for example, doing so generates raw bytes that are push()ed to the readable side. This is where the problem lies. If a user calls stream.foo() that sends a request out the readable side and the size of the resulting bytes is large enough for push() to return false, there is no (standard) way to notify the user when they can continue to (safely) use more API methods (e.g. another .foo() call).

@mcollina
Copy link
Member

I usually achieved the same by wrapping a Writable stream, see https://github.com/mqttjs/mqtt-packet/blob/master/writeToStream.js. But we should definitely support your approach as well, as it enables more api.

Any other name for the event?

BTW, I'm 👍 in adding it.

@mscdex
Copy link
Contributor Author

mscdex commented May 19, 2016

@mcollina I used to do things like that in the past, but the reason I am more inclined to write protocol streams these days is that they bring simplicity and flexibility (you can do stuff like socket.pipe(parser).pipe(socket)).

I don't have any (other) event name suggestions at the moment.

@mcollina
Copy link
Member

@mscdex because of the missing event, i moved from your style to the "wrap" style, and not expose a Readable, but relying only on the Writable.

I think we can just do a PR and test it with the new magic flag @calvinmetcalf put in.

@mcollina
Copy link
Member

mcollina commented May 19, 2017

@mscdex this has been floating around here for some time, should we get to a PR to core? I'm happy to add it.

@mscdex
Copy link
Contributor Author

mscdex commented May 19, 2017

@mcollina I'm still all for it, but we need to come up with a suitable name I guess.

@mcollina
Copy link
Member

mcollina commented May 19, 2017

@mscdex how about 'readableContinue'? It's long and bad enough to not have any ecosystem usage.

@mscdex
Copy link
Contributor Author

mscdex commented May 19, 2017

I'm not particularly picky about the name, just as long as it's clear and unique enough, so 'readableContinue' would be fine I think.

@alextaujenis
Copy link

alextaujenis commented Feb 22, 2019

Hi @mscdex and @mcollina, any progress on this?

I have a custom Transform stream and I'd also like to know the proper way to handle when this.push(chunk) returns false.

Context: this Transform stream is parsing a text file formatted like{ json }{ json }{ json } and pushes each { json } chunk downstream. It appends the incoming data to an internal buffer, then parses json chunks off the internal buffer until it can't find anymore } characters.

const { Transform } = require('stream')

class MyStream extends Transform { 
  constructor (opts) {
    super(opts)
    // initialize an empty buffer
    this._buffer = Buffer.alloc(0) 
  }

  _transform (data, encoding, callback) {
    // append all incoming data to the internal buffer
    this._buffer = Buffer.concat([this._buffer, data]) 
    let finished = false
    
    while (!finished) {
      // upstream data is ndjson with a flat structure
      // here's an easy way to parse each { json } chunk
      const start = this._buffer.indexOf(123) // left bracket {
      const end = this._buffer.indexOf(125) // right bracket }
      
      // check if there are any complete { json } chunks in the internal buffer
      if (end < 0) {
        // no more complete { json } chunks in the internal buffer
        finished = true

      } else {
        // isolate the { json } chunk in the buffer
        const chunk = this._buffer.slice(start, end + 1)
        // push the chunk downstream while capturing the backpressure flag
        const backpressure = !this.push(chunk, encoding)
        // remove the isolated { json } chunk from the internal buffer
        this._buffer = this._buffer.slice(end + 1)
        
        // handle downstream backpressure
        if (backpressure) {
          // now what??? <----------------------------------------------------
        }
      }
    }
    // let upstream know we are ready for more data
    callback()
  }
}

How should I handle downstream backpressure? There still might be { json } chunks in the internal buffer, and the while loop could still need some iterations to push the data out.

  1. Is there really some (missing) hypothetical event such as readableContinue that I need in order to properly control the backpressure flow?
  2. Has something been implemented since this issue was opened that is the correct way of handling this situation?
  3. Does the Transform stream handle the backpressure for me automatically here? (I can't see where)
  4. What if my downstream Writable is really slow, or this.push(chunk) was actually sending expanded data?

Basically, any advice on this situation would be helpful. Thank you for providing your time and expertise to this project.

@mcollina
Copy link
Member

Transform should handle all of this for you automatically, and you should not need to worry at all. Why isn't it doing that? Can you open an new issue with a clear example of why Transform is not handling backpressure correctly?

@alextaujenis
Copy link

alextaujenis commented Feb 22, 2019

Thank you @mcollina for the quick reply, and also thank you for making me dig deeper. Here are the questions I was originally asking.

Questions (with answers)

1. With a custom Transform stream; what happens to the chunk of data if this.push(chunk) returns false?

Answer: The chunk is pushed onto the Transform stream read buffer and is not lost. It will eventually be sent downstream and obey all backpressure rules.

2. With a custom Transform stream; what happens to the next chunks of data if this.push(chunk) returns false and I just keep calling this.push(chunk) with more data?

Answer: All chunks are pushed onto the Transform stream read buffer and are not lost. The Transform stream this.readableHighWaterMark is just a warning level of how much memory should be allocated and it will not change, even if the this.readableLength increases past the this.readableHighWaterMark. All chunks will eventually be sent downstream and obey all backpressure rules.

Simple Code Example

This is an example just trying to tease-out how the stream interface behaves. It's probably unreasonable that a transform stream would blow-out data 1000x, but who knows?

const { Transform } = require('stream')

class TransformStream extends Transform {
  _transform (chunk, encoding, callback) {
    // 1000x data expansion
    let i = 1000
    while (i--) { 
      // all of these chunks will buffer correctly and obey backpressure
      this.push(chunk, encoding) // just ignore when this returns 'false' and keep pushing
    }
    callback()
  }
}

How I got here #111

I'm learning the API for Stream Implementers and following along with the Backpressuring in Streams guide. I'm building a custom Transform stream and it inherits from the Readable and Writable, so I thought this open issue was discussing the same problem with the Readable side of the Transform stream.

The things that misdirected me on this behavior were:

  1. This quote. It leaves me with the question "How quickly should I stop reading from the source when .push() returns false?".

So, as well with respecting the .write() return, we must also respect the return value of .push() used in the ._read() method. If .push() returns a false value, the stream will stop reading from the source. Otherwise, it will continue without pause.

  1. This diagram, which is confusing when you get to the Is this chunk too big? => Yes path.

screen shot 2019-02-22 at 12 09 01 pm

  1. The docs around the topic of .push() in the transform stream are silent about that function returning false and how to handle it.

Advanced Code Example

To see what's happening I created a Node script that prints benchmark info about each step in the pipeline as data is expanded through the transform stream, while the downstream has a delay and takes time to process.

const { Readable, Writable, Transform } = require('stream')

// random buffer with utf8: a-z dec: 97-122
// NOT A CSPRNG!!! DO NOT USE FOR CRYPTO!!!
const randBuff = (size) => {
  let arr = []
  while (size--) { arr.push(97 + parseInt(Math.random() * 26)) }
  return Buffer.from(arr)
}

class ReadStream extends Readable {
  constructor (opts) {
    super(opts)
    this.total = 10
  }
  _read () {
    if (this.total--) {
      console.log(`read backpressure ${!this.push(randBuff(1000))}`, `readbuffer ${this._readBuffer()}`, this.readableHighWaterMark)
    } else {
      this.push(null)
    }
  }

  _readBuffer () {
    return `${parseFloat(this.readableLength / this.readableHighWaterMark * 100).toFixed(2)}%`
  }
}

class TransformStream extends Transform {
  _transform (chunk, encoding, callback) {
    // 1000x data expansion
    let i = 1000
    while (i--) {
      console.log(`transform backpressure ${!this.push(chunk, encoding)}`, `readbuffer ${this._readBuffer()}`, this.readableHighWaterMark)
    }
    callback()
  }

  _readBuffer () {
    return `${parseFloat(this.readableLength / this.readableHighWaterMark * 100).toFixed(2)}%`
  }
}

class WriteStream extends Writable {
  constructor (opts) {
    super(opts)
    this.total = 0
  }
  _write (chunk, encoding, next) {
    setTimeout(() => {
      console.log(`write #${this.total++}`, `writebuffer ${this._writeBuffer()}`, this.writableHighWaterMark)
      next()
    }, 1)
  }

  _writeBuffer () {
    return `${parseFloat(this.writableLength / this.writableHighWaterMark * 100).toFixed(2)}%`
  }
}

const read = new ReadStream()
const transform = new TransformStream()
const write = new WriteStream()

read.pipe(transform).pipe(write)
write.on('finish', () => { console.log('ta da!') })

Code Output

Here we can see the Transform stream calling this.push(chunk) too many times, this.readableLength grows way past the this.readableHighWaterMark, but the high water mark holds steady at 16384 (16k buffer).
screen shot 2019-02-22 at 1 07 47 pm

Then we can watch after the Transform readable buffer is blown-out with the 1000x increase... the downstream Writable takes over and starts draining it:
screen shot 2019-02-22 at 1 10 35 pm

After the downstream Writable is finished draining the Transform stream, it asks for more data, then the Transform stream starts the 1000x process again. This time; the Transform buffer begins entirely full (maybe I'm missing and event handler in the code example, or maybe this is normal).
screen shot 2019-02-22 at 1 30 13 pm

Conclusion

TLDR: to implement a Transform stream correctly just keep calling this.push(chunk) until the current _transform(data, encoding, callback) data has been consumed, then call the callback(). Ignore any false return values from this.push(chunk).

The docs around this behavior could probably use some love, along with a few deeper examples on how to properly implement streams. Thanks again, and let me know if you think any of this should be re-posted somewhere else.

@mcollina
Copy link
Member

I think this is great feedback to improve our doc. Would you like to integrate https://github.com/nodejs/nodejs.org/blob/master/locale/en/docs/guides/backpressuring-in-streams.md with your finding, so it is clearer? Tag me in in the PR.

@zeodtr
Copy link

zeodtr commented Aug 21, 2020

@alextaujenis Sorry to reply too late, but your TL;DR description of the 'correct implementation' of the Transform stream does not seem to be a 'well-behaving' one, since that kind of Transform stream will hold 1000x chunks in memory, thus make big overhead for a Node.js process, while Node.js' stream is made just to avoid such a situation. It effectively nullifies the stream's purpose.
I think a Transform stream implementation must handle false return values from this.push(chunk) just like the other readable stream implementations (and I haven't found the documentation about it yet).

@dmurvihill
Copy link

@mcollina what was the final resolution was for this issue? I'm still trying to understand how I should handle when push returns false in Transform._flush().

@dmurvihill
Copy link

Till there is an officially-supported solution, I've published a small package that extends the Transform class with a continue event. It also adds a Promise-based push method called safePush. It's based on the workaround first proposed by @mscdex eight years ago.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants