Building Freetrade

Stream partitioned data from Firestore using Async Iterators in Node.js

Mathias Dewert

February 26, 2021

Mathias Dewert

Software Engineer Mathias Dewert talks about dealing with the issues that come with rapid growth.

As we’re welcoming more and more clients onto our platform, the amount of data that we need to deal with is increasing steeply as well. Now it doesn’t matter how much thought you’ve put into it, how many books you’ve read on designing scalable systems, or even if you use that latest technology whose promise is to offer you infinite and unlimited resources, a rapid increase in customer growth will more than likely bring about unexpected issues in unexpected places.

Software engineering is a war you’re waging against the unexpected, and it always comes a time when that tiny soldier process which has been so loyal to you, doing its job silently in the background, no fuss, just plain efficacy, suddenly doesn’t have the muscles to keep going and to resist that ever growing army of data coming at it. Now you have no choice left, you must throw yourself into battle and fight for its rescue.

Now I ain’t no Steven Spielberg, so this isn’t gonna be as thrilling as Saving Private Ryan, but I’ll do my best to make this blog post as useful as it can be, by going over how we rescued one of our soldier processes last week at Freetrade, and how a somewhat less-known feature of Node.js helped us do it in a quite fun and easy manner.

The problem

As a regulated financial institution, there are a lot of different reports that we need to generate each day to satisfy both the regulator and ourselves that your cash and shares are well taken care of. Part of the data that will populate those reports is pulled from our Firestore database, before being processed inside Google cloud functions. This bears two important restrictions:

  1. Firestore’s query capabilities are not as powerful as what you’d find on a traditional SQL db for instance.
  2. Cloud functions can only run for a limited amount of time, and on limited memory.

Now last week, we had one such report that had been generating perfectly for months, but suddenly started failing, with either the cloud function timing out or running out of memory. Turns out we were pulling all the data from Firestore at once, then keeping that data in memory before dumping it into a file. Even though it seems obvious now, we’d never imagined the amount of data for that report would grow so large so fast, up until the point where having it all in memory was not an option anymore, but now we had no choice, we needed a streaming solution...

Streaming in Node.js

Streaming in Node.js is out of the scope of this blog post, and I’d probably be making a fool of myself were I to try and explain it to you in detail. This is why we’ll just stick to the few general concepts that will be useful for us to know in the realm of this article. As software engineers we’re taught not to reinvent the wheel, so I’ll also be, shamelessly, pulling definitions from the Node.js official documentation.

Stream

A stream is an abstract interface for working with streaming data in Node.js. The stream module provides an API for implementing the stream interface.

Streams can be readable, writable, or both. All streams are instances of EventEmitter.

For more information, or to simply refresh your memory on how stream works in Node.js, please refer to: https://nodejs.org/api/stream.html#stream_stream

Pipeline

A convenient way to “pipe” multiple streams together (the output of a stream is fed as input to the next one), is to use the stream.pipeline method provided by Node.js.

Or as the official documentation describes it: “A module method to pipe between streams and generators forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.”

Using streams and stream.pipeline in Node.js to compress an existing file will therefore look something like this:


pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz')
);

Async Generators

Unlike traditional generators (https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Generator), async generators allow you to create async iterables, which can be used to iterate over data from an API, or any resource whose result needs to be wrapped inside a promise.

One way you can iterate over async iterable objects is to use the for await...of syntax:


for await (variable of iterable) {
  statement
}

Example

This is a very simple example of how you can create and consume a async generator in Node.js:


function* generator() {
  yield Promise.resolve(1);
  yield Promise.resolve(2);
  yield Promise.resolve(3);
}

(async function() {
  for await (let num of generator()) {
    console.log(num);
  }
})();
// 1
// 2
// 3



Now what makes it really interesting for us, is that only one yield statement gets executed by iteration: In this case Promise.resolve(3) is only going to be executed on the third iteration. This in turn gives us more control on how often we decide to fetch paginated resources, allowing us to create patterns such as this one:


function* generator() {
    for (let i = 0; i < numberOfPages; i++) {
        const pageNumber = i
        yield retrievePaginatedData(pageNumber)
    }   
}

for await (const page of generator) {
    process(page)
}

In which case we will only retrieve the content of page 2 once we’ve fully processed page 1. Effectively we are now streaming data, which is why async generators, and generators in general, are in fact one implementation of Node.js’ stream interface, and one way for us to create Readable streams that can be piped, transformed and written using the same stream.pipeline function we mentioned earlier.

Bringing everything together

Now back to our initial problem of streaming data from Firestore, transforming it, and then saving the result as a file in a Google Cloud Storage (GCS) bucket. To achieve this we will create a new async generator which yields paginated query responses from Firestore. Pagination in Firestore is made possible by combining the startAfter and limit methods from the SDK.

The final code will actually be written in Typescript, as this is our most used language here at Freetrade, don’t worry if you don’t know it, it works the same as Node.js, but with extra information on which types are expected.


Our final code will therefore look something like this:


import { Storage } from '@google-cloud/storage'
import admin = require('firebase-admin')
import { pipeline as _pipeline, Readable } from 'stream'
import { FieldPath } from '@google-cloud/firestore'
import * as util from 'util'
const pipeline = util.promisify(_pipeline)

interface IPageResult {
    items: Item[]
    lastId: string
}

async function getNextPage(collectionName: string, pageSize: number, startAfter: string | null): Promise<IPageResult> {
    const collection = admin.firestore().collection(collectionName)

    let query = collection as FirebaseFirestore.Query

    // This orders the elements by their id, which is what allows us to paginate using startAfter
    query = query.orderBy(FieldPath.documentId())

    // startAfter might not exist on the first iteration
    if (startAfter) {
        query = query.startAfter(startAfter)
    }

    query = query.limit(pageSize)

    // the items to return
    const results = await query.get()
    // the last seen id that we will use to indicate where the next page should start
    const lastId = results.docs[pageSize - 1]?.id ?? null

    return {
        items: results.docs.map((doc) => doc.data()),
        lastId: lastId
    }
}

async function* collectionExportGenerator(collectionNameToExport: string, pageSize: number = 500) {
    // keep retrieving pages of items until there's nothing more to retrieve
    let pageResult: IPageResult
    do {
        pageResult = await getNextPage(collectionNameToExport, pageSize, pageResult?.lastId ?? null)
        yield pageResult.items
    } while (pageResult.lastId !== null)
}

async function exportFirestoreCollectionToGcs(): Promise<void> {
    // create read stream for our async generator
    const readStream = Readable.from(collectionExportGenerator)

    // create write stream to upload to GCS
    const writeStream = new Storage()
        .bucket('my-bucket')
        .file('export.csv')
        .createWriteStream({})

    // pipe all streams together
    await pipeline(readStream, transformStream1, transformStream2, writeStream)
}

Summary

Upon learning to forgive ourselves for not being able to foresee every scaling issue we could run into, we’ve learned how to implement Readable Streams using async generators in Node.js/Typescript, by exporting a potentially large collection of items stored in Firestore, into a file in a GCS bucket.

Async generators clearly are not one of the most common features of Node.js, but I hope this article has at the very least made you aware that they exist, and that if one day you do have to deal with streams in one of your projects, you remember that they can be an easy and quite elegant solution to that problem.

Sign up for our newsletter

Download the app and start
investing now.