The hidden power of node.js stream: reactive programming.

This post is about a little coding exercise where we will create basic components to implement reactive programming paradigm using only node.js streams. There are lot of libraries out there for reactive programming, like RxJS, Bacon and Callbags. These libraries don't use node.js streams while is possible to wrap regular streams as source. Another good library is Highland which build node.js-like streams. More than explaining how to use this kind of libraries, in this article we want to write a simple one, understand which problems it solves and which advantages gives us.

Starting from a real world problem, first we'll build some basic tools which will remember us the basic blocks of functional programming, such as map, filter and reduce functions. Then we'll see how to combine these bricks to write a neat, clear and effective code.

You can find all the code present in this article on repl.it.

The case

Let's define the problem we want to solve. We want to fetch, from a database, a list of economical transactions. Later we want to convert each transaction amount from US-dollar to euro, filter only the ones with an amount higher than 100€ and compute the sum.
Don't mind if those operations can probably be done with most of the database out there, for the sake of the experiment let's say we can just retrieve a list as an array or a stream.

Here a simple implementation

const sumRichTransactions = async () => {

  const transactions = await DB.getAsArray({ collection: 'Transaction' });
  
  const sum = transactions
    .map(transaction => transaction.amount * USD2EUR_CONVERSION_FACTOR)
    .filter(amount => amount > 100)
    .reduce((sum, amount) => sum + amount, 0);
  
  return sum;
}

sumRichTransactions();

The code should be clear enough. First we retrive the data as an array, then we map each value to convert the amount from dollars to euro, then we keep only those which amount is more than 100€ with the filter function and lastly we reduce the array to get the sum of the amounts. We return this sum.

What's wrong with it

Let's look at some possible problems of this approach. Everything stays in memory: the array returned by the db is stored in memory, the memory of your application. If there are millions of those transactions, the memory footprint can be huge.
map, filter and reduce are handy functions but each of it cycle our dataset one time. This means the dataset is cycled three times in this example (reduce cycles a subset actually). Cycling without a bike is just loosing time. To solve this problem could be useful to know that we could use a nice different approach with transducers. Transducers are really powerful and worth reading about it. This article by @roman01 gives a great introduction.

We want to change this implementation to use streams to demonstrate how these problems are solved.

Streams

In this article I take as a fact that you already know node.js streams. I already know that streams is one of the most underestimated parts of node.js so here a little recap:

Streams can be of three types:

  • Readable
  • Writable
  • Duplex/Transform

A Transform stream is a Duplex stream that can transform data it receives.

null has a particular meaning among streams. If the null value is fed to a stream, the stream ends. It's like saying that null has the meaning of "end of communications".

Streams usually work with buffer but they can be configured to work with objects. Practically this means that every chunk read/write by a stream is an object. For simplicity we'll work in object mode.

Streams can be piped to produce chains that looks like Readable → Transform → Transform → Writable. It's written like this:

readableSrc
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)

For anything else you don't know about streams, read the official documentation or the article Node.js Streams: Everything you need to know.

Our first Transform stream

Let's imagine to have function called DB.getAsStream which gives the same results of DB.getAsArray but in the form of an object stream.

const stream = DB.getAsStream({ collection: 'Transaction' });

stream.on('read', transaction => console.log(transaction));

//Everytime the read callback is called something like this is printed
// {id:1, amount: 12, currency: "USD"}
// {id:2, amount: 456, currency: "USD"}
// {id:3, amount: 120, currency: "USD"}

Let write a transform stream that changes each transaction returning the amount converted in euro. We will use the simplified constructor for stream, but the same can be achieved with the class-style constructor.

const USD2EUR_CONVERSION_FACTOR = 0.85;

const usd2eur = new Transform({
  // Each chunk is an object. Here we tell node to not consider the chunks as buffers
  objectMode: true,
  
  // The signature is (chunk, encoding, callback) but we can safely rename chunk to transaction
  transform(transaction, encoding, callback) {
    callback(null, transaction.amount * USD2EUR_CONVERSION_FACTOR)
  }
});

Let's write the entire code

const { Transform } = require('stream');

const usd2eur = new Transform({
  // ... same as before
});
const stream = await DB.getAsStream({ collection: 'Transaction' });
const transformedStream = stream.pipe(usd2eur);

transformedStream.on('data', amount => console.log(amount));
// This will output
// 10.2
// 387.59
// 102

As you can imagine, it's possible to write also the filter and reduce functions as transform, but is this needed? Must we really write each business function as a Transform implementation? What if we can focus on our business code and hide somewhere else the Transform instantiation? Well, we can...

Providing the building blocks

In our first implementation we focused on our business without caring of how map, filter, reduce functions works. We have them as part of the standard javascript library and we simply use them. We focused on writing the mapping function, the filter function and the reduce function.
Let's isolate them:


const amountToEur = t => t.amount * USD2EUR_CONVERSION_FACTOR;
const moreThan100 = amount => amount > 100;
const sum = (sum, amount) => amount + sum;

// Later we use them

transactions
    .map(amountToEur)
    .filter(moreThan100)
    .reduce(sum, 0)

Now we're going to write the implementation of map, filter and reduce but for streams so we can reuse our business logic. At the end we'll write our code like that

stream
    .pipe(map(amountToEur))
    .pipe(filter(moreThan100))
    .pipe(reduce(sum, 0))

Map

Here an implementation of map using a transform stream. We want to let the user provide a map function and apply it for each chunk.

const map = (fn, options = {}) => new Transform({
  // By default we are in object mode but this can be overwritten by the user
  objectMode: true,
  ...options,
  
  transform(chunk, encoding, callback) {
    let res;
    try {
      res = fn(chunk);
    } catch(e) {
      return callback(e);
    }
    callback(null, res);
  }
})

This should do the trick. Every chunk passing in the stream is mapped to a new value using the user mapping function fn, exactly as map for arrays. The second parameter is included in this implementation for completeness, but it's useless in our case.
Great. Now we can write

Filter

Filter exploits a peculiarity of transform streams: they're not forced to emit an output value for each input value. They're totally unrelated, output values can be as much as the input or less or more. Here the filter for streams

const filter = (fn, options = {}) => new Transform({
  objectMode: true,
  ...options,
  
  transform(chunk, encoding, callback) {
    let take;
    try {
      take = fn(chunk);
    } catch (e) {
      return callback(e);
    }
    return callback(null, take ? chunk : undefined);
  }
})

Notice that if the filter function returns false, we call the callback with undefined. We do not use null otherwise the stream will end!

Reduce

Reduce is probably the more complex but not that much.

const reduce = (fn, acc, options = {}) => new Transform({
    objectMode: true,
    ...options,
    
    transform(chunk, encoding, callback) {
        try {
          acc = fn(acc, chunk);
        } catch (e) {
          return callback(e);
        }
        return callback();
    },
    
    flush(callback) {
      callback(null, acc);
    }
});

As you can see reduce must implement also flush method. This method is called at the end, when all the transformations are done (on stream "end" event). At that point we want to return the accumulated value.

We should note a key difference between reduce and the other functions. While map and filter emit a value while the stream is ongoing, reduce will emit only at the end. This is because, with streams, we can't know beforehand how many chunks there will be, we must wait the stream end before emitting the reduced value.

Using the building blocks

A you can see, map, filter and reduce have the same signature of their array counterpart except for the last parameter which is ignored by the way. Let's see how our code looks like using the new functions

// Isolate our new shiny functions in their module
const {map, filter, reduce} = require('./streamUtils');

const amountToEur = t => t.amount * USD2EUR_CONVERSION_FACTOR;
const moreThan100 = amount => amount > 100;
const sum = (sum, amount) => amount + sum;

const stream = await DB.getAsStream({ collection: 'Transaction' });
return stream
    .pipe(map(amountToEur))
    .pipe(filter(moreThan100))
    .pipe(reduce(sum));

So, we focused on our business code. Cool. We also solved all our initial problems. The memory is under control since the streams never bloat your memory which will keep only the current chunk. Also, we never cycle the dataset more than once. Each chunk pass through map and filter function once and then goes on the reduce function which will access the chunk one by one too. It's very similar to what transducers do.

This is a great achievement because, without changing too much the way we wrote the code, we gain a lot and are covered against problems that can arise only at runtime.

Bonus 1: compose

The code is neat, quick and have a low memomry footprint but we can have a better style. Having each map/filter/reduce function wrapped by a pipe function makes the code a bit ugly. We have two choices: make the code chainable (.map().filter().reduce()) as it happens for arrays, or use a point free code style.

To make the code chainable we could wrap the stream object and this is what a library like Highland does. Since that is already a good example, let's go for the point-free version.

Point-free code style is a popular way of writing the code especially in functional programming world. What we want is to group all the transoform passed to pipes in order to remove all those .pipe() call: we want to compose them. The desired result is:

...
...
return compose(
    map(amountToEur)
    filter(moreThan100)
    reduce(sum)
)(stream)  

As you can see this is very similar to compose for functions. The only difference is that it takes a stream and returns a stream. Here the implementation of compose.

const compose = (...fns) => stream => fns.reduce((transformed, fn) => transformed.pipe(fn), stream);

It's a simple reduce that start with a stream and, at each step, returns the stream piped with the next transformer.

Bonus 2: async

Map, filter and reduce accept only synchronous functions. Stream transformers are innerly asynchronous because, as many node.js functions, a callback is accepted as last parameter. Here a version of map that accept a promise as return value of the map function and waits until the promise is fullfilled.

const asyncMap = (fn, options = {}) => new Transform({
  objectMode: true,
  ...options,
  
  transform(chunk, encoding, callback) {
    let promise;
    try {
      promise = fn(chunk);
    } catch (e) {
      callback(e);
      return;
    }
    if (!(promise instanceof Promise)) {
      promise = Promise.resolve(promise);
    }
    promise
      .then(res => callback(null, res))
      .catch(err => callback(err));
  }
})

This implementation accepts also functions that don't return promises and promisify their return value.

Bonus, some more

There are other little advantages, e.g.

  • easy by-step inspection
  • incomplete map
  • error handling
  • Multiple flows (stream fork)
  • stream as promises

but maybe these will find place in another article :)

Conclusion

We saw how we can solve some common problems with a nice use of node.js streams. We wrote together some basic block, the functions map, filter and reduce, to build everything else with it. Usually libraries that offer similar functionalities have useful functions such as throttle, debounce, group etc. We definied only three basic functions because them can be used to build any other fancy functionality. To be honest, we could have defined only reduce, because it's the very base component of any other function. Map and filter can be actually written as reduce implementation. If you like the approach I suggest you to have a look at the libraries I mentioned at the beginning of this article because usually offers many ineresting features (e.g. lazyness and asynchronicity).