Skip to content
Eugene Lazutkin edited this page Jun 29, 2018 · 3 revisions

Emitter is a Writable stream operating in object mode. It consumes a token stream and emits tokens as events on itself. Essentially it is an object version of emit().

Introduction

const Emitter = require('stream-json/Emitter');

const emitter = new Emitter();

chain([
  fs.createReadStream('data.json'),
  parser(),
  emitter
]);

let counter = 0;
emitter.on('startObject', () => ++counter);
emitter.on('finish', () => console.log(counter, 'objects'));

API

Emitter has no special API. It emits events with names corresponding to token names with corresponding values. See the definition of stearm of tokens for details.

Emitter is based on Writable.

The whole implementation of Emitter is very simple:

class Emitter extends Writable {
  constructor(options) {
    super(Object.assign({}, options, {objectMode: true}));
  }
  _write(chunk, encoding, callback) {
    this.emit(chunk.name, chunk.value);
    callback(null);
  }
}

constructor(options)

options is an optional object described in details in node.js' Stream documentation. It is passed directly to its parent. No custom options are used.

Static methods and properties

emitter(options) and make(options)

make() and emitter() are two aliases of the factory function. It takes options described above, and return a new instance of Emitter. emitter() helps to reduce a boilerplate when creating data processing pipelines:

const {chain}  = require('stream-chain');
const {parser} = require('stream-json/Parser');
const {emitter} = require('stream-json/Emitter');

const fs = require('fs');

const pipeline = chain([
  fs.createReadStream('sample.json'),
  parser(),
  emitter()
]);

let objectCounter = 0;
pipeline.output.on('startObject', () => ++objectCounter);
pipeline.output.on('end', console.log(`Found ${objectCounter} objects.`));

make.Constructor

Constructor property of make() (and emitter()) is set to Emitter. It can be used for indirect creating of emitters or metaprogramming if needed.

Clone this wiki locally