Skip to content
Eugene Lazutkin edited this page Sep 14, 2021 · 8 revisions

All provided streamers use StreamBase as their foundation. It is a base class meant to be extended, which provides common facilities used by streamers.

StreamBase is based on Transform. It operates in object mode consuming a token stream produced by a parser or filters, which will be transformed into a stream of partially assembled JavaScript objects.

API

This document describes the user-facing interface only. If you want to build your own filter, feel free to inspect the code to gain more insights.

Internally StreamBase uses Assembler to assemble objects and to keep track of a current state.

constructor(options)

options is an optional object described in details in node.js' Stream documentation. The following optional custom properties are recognized:

  • objectFilter is an optional function.
    • If specified, it is used to inspect incomplete objects as they are being assembled.
    • It is called in the context of a streamer with one argument: an internal Assembler instance.
    • A filter function can return three values:
      • Truthy. This response means that we should include this object in an output stream. The object will be assembled from an input stream without further checks.
      • false. This response means that we should not output this object. A partially assembled object will be discarded, and the rest of the object will be ignored without further checks.
      • Anything else (usually undefined) signifies that the filter has not enough information to make a decision, the next token should be processed, and the filter will be called again on the same object.
    • If not specified (the default), all objects are going to be included in an output stream.
  • includeUndecided is a flag. It controls how to handle objects, which were not decided one way or another.
    • If it is truthy, an undecided object (already fully assembled) will be included in the output.
    • Otherwise (the default), an undecided object will be discarded.

The same options object is passed to Assembler. See its documentation to understand what properties can be specified.

Important details

objectFilter and token stream filters serve different purposes. The latter edit a token stream, while the former governs assembling individual already selected values. In the end, objectFilter is an optimization feature.

For example, we want to read in all employee records and select only employees whose department is "accounting".

A simple way (preferred in some cases) is to use stream-chain facilities:

const {chain}  = require('stream-chain');
const {parser} = require('stream-json');
const {streamArray} = require('stream-json/streamers/StreamArray');

const fs = require('fs');
const zlib = require('zlib');

const pipeline = chain([
  fs.createReadStream('sample.json.gz'),
  zlib.createGunzip(),
  parser(),
  streamArray(),
  data => {
    if (data.value.department === 'accounting') return data;
    // return undefined by default skipping this item
  }
]);

Potentially a more efficient way to do the same while assembling objects:

const {chain}  = require('stream-chain');
const {parser} = require('stream-json');
const {streamArray} = require('stream-json/streamers/StreamArray');

const fs = require('fs');
const zlib = require('zlib');

const pipeline = chain([
  fs.createReadStream('sample.json.gz'),
  zlib.createGunzip(),
  parser(),
  streamArray({objectFilter: asm => {
    const value = asm.current; // the value we are working on
    // the value can be incomplete, check if we have necessary properties
    if (value && typeof value.department == 'string') {
      // we have 'department' value and can make the final decision now
      return value.department === 'accounting';
      // depending on the return value above we made a final decision:
      // we accepted or rejected an object,
      // now it will be speedily assembled or skipped.
    }
    // return undefined by default meaning "we are undecided yet"
  }})
]);

Now an object can be discarded before being fully assembled. It can save some CPU and memory. OTOH, in the first case our filter is called once per object, while in the second case it can be called multiple times. So there is a trade-off, which should be decided on a case-by-case basis possibly using trials.

Examples

Combined streamer for arrays and objects

Stefan です needed to make a dynamic decision on what kind of container is used for streaming: an array or a dictionary object (see Issue #81). Arrays should be streamed element by element, while dictionary objects should be passed as a whole. stream-json provides two separate streamers: StreamArray and StreamObject, but they do not provide the exact functionality and user should choose them statically. He ended up writing his own custom streamer (see the comment) included here for convenience (slightly reworked to add exports):

const StreamBase = require("stream-json/streamers/StreamBase")
/**
 * Streamer for stream-json that streams objects or arrays.
 * - If it's an object: Only a single value (the full assembled object) will be emitted.
 * - If it's an array: Values of the array will be emitted.
 *
 * The code is basically combining StreamArray and StreamObject and adjusted to our needs.
 */
class StreamArrayOrObject extends StreamBase {

  static make(options) {
    return new StreamArrayOrObject(options);
  }

  constructor(options) {
    super(options);
    this._level = 1;
  }

  _wait(chunk, _, callback) {
    if (chunk.name === "startObject") {
      this._lastKey = null
      // We're assembling the object in this._object
      this._object = {}
    } else if (chunk.name === "startArray") {
      this._counter = 0
    } else {
      return callback(new Error("Top-level object should be an array or object."))
    }
    this._transform = this._filter
    return this._transform(chunk, _, callback)
  }

  _push(discard) {
    if (this._object) {
      // Object in this._object
      if (this._lastKey === null) {
        this._lastKey = this._assembler.key
      } else {
        if (!discard) {
          // Assemble object from keys and values
          this._object[this._lastKey] = this._assembler.current[this._lastKey]
        }
        this._assembler.current = {}
        this._lastKey = null
      }
    } else {
      // Otherwise we're streaming an array
      if (this._assembler.current.length) {
        this._counter += 1
        if (discard) {
          this._assembler.current.pop()
        } else {
          // Push array values directly
          this.push(this._assembler.current.pop())
        }
      }
    }
  }

  _flush(callback) {
    // Push single object before end of stream
    this._object && this.push(this._object)
    callback()
  }
}
StreamArrayOrObject.streamArrayOrObject = StreamArrayOrObject.make;
StreamArrayOrObject.make.Constructor = StreamArrayOrObject;

module.exports = StreamArrayOrObject;
Clone this wiki locally