>_>_
Processing Massive Amounts of Data on Demand Without Crashing Node.js Main Thread

Processing Massive Amounts of Data on Demand Without Crashing Node.js Main Thread

Processing large datasets in Node.js can be challenging due to the risk of blocking the main thread, which drives the event loop. Fortunately, Node.js Streams offer a powerful and efficient way to process data incrementally.

In this tutorial, you'll build a Node.js application that:

  • Downloads a large CSV file from Google Cloud Storage
  • Converts it into JSON objects
  • Extracts useful insights in real-time
  • Compresses the data
  • Uploads the result back to Cloud Storage
  • Visualizes progress in the terminal

Let's dive into it step-by-step using core Node.js features and best practices.


๐Ÿงฐ Prerequisites

  • Node.js v16.16+
  • A Google Cloud Platform account
  • A service account key (gkeys.json) for authentication

Install dependencies:

npm install @google-cloud/storage

โ˜๏ธ Cloud Storage File Service (src/cloudStorageFileService.js)

const { Storage } = require('@google-cloud/storage');
const path = require('path');
const serviceKey = path.join(__dirname, '../gkeys.json');

class CloudStorageFileService {
  constructor() {
    this.storage = new Storage({
      projectId: 'my-project-id',
      keyFilename: serviceKey,
    });
  }

  async downloadFile(bucketName, fileName) {
    return this.storage.bucket(bucketName).file(fileName).createReadStream();
  }

  async uploadFile(bucketName, destFileName) {
    return this.storage.bucket(bucketName).file(destFileName).createWriteStream();
  }

  async getFileSize(bucketName, fileName) {
    const [metadata] = await this.storage.bucket(bucketName).file(fileName).getMetadata();
    return metadata.size;
  }
}

module.exports = CloudStorageFileService;

๐Ÿ“Œ Notes:

  • Handles both download and upload as streams
  • Retrieves file size for progress tracking

๐Ÿ“Š Progress Bar Stream (src/progressPass.js)

const { PassThrough } = require('stream');

class ProgressPass extends PassThrough {
  constructor(fileSize, options = {}) {
    super({ ...options });
    this.bytesRead = 0;
    this.fileSize = fileSize;
    this.progress = 0;

    this.on('data', this.processData);
    this.on('progress', this.showResult);
    this.on('close', this.finishProgress);

    this.createProgressBar();
  }

  processData(data) {
    this.bytesRead += data.length;
    this.progress = (this.bytesRead / this.fileSize) * 100;
    this.emit('progress', Math.floor(this.progress));
  }

  createProgressBar() {
    process.stdout.write("\x1B[?25l[");
    for (let i = 1; i <= 101; i++) process.stdout.write('-');
    process.stdout.write(']');
  }

  showResult(progress) {
    process.stdout.cursorTo(progress + 1);
    process.stdout.write('=');
    process.stdout.cursorTo(105);
    process.stdout.write(`${progress}%`);
  }

  finishProgress() {
    process.stdout.write("\x1B[?25h\n");
  }
}

module.exports = ProgressPass;

๐Ÿ“Œ Notes:

  • Uses PassThrough to monitor progress without changing data
  • Custom progress bar using stdout cursor control

๐Ÿ”„ CSV to JS Object Transform (src/objectTransform.js)

const { Transform } = require('stream');

class ObjectTransform extends Transform {
  constructor(options = {}) {
    super({ ...options });
    this.headerLine = true;
    this.keys = [];
    this.tailChunk = '';
  }

  _transform(chunk, encoding, callback) {
    const stringChunks = chunk.toString("utf8");
    const lines = stringChunks.split('\n');

    for (const line of lines) {
      const lineString = this.tailChunk + line;
      const values = lineString.split(',');

      if (this.headerLine) {
        this.keys = values;
        this.headerLine = false;
        continue;
      }

      if (values.length !== this.keys.length || lineString.endsWith(',')) {
        this.tailChunk = line;
      } else {
        const obj = {};
        this.keys.forEach((key, index) => {
          obj[key] = values[index];
        });
        this.tailChunk = '';
        this.push(JSON.stringify(obj));
      }
    }
    callback();
  }

  _flush(callback) {
    callback();
  }
}

module.exports = ObjectTransform;

๐Ÿ“Œ Notes:

  • Handles incomplete line splitting across chunks
  • Converts rows to consistent JSON format

๐Ÿ•ต๏ธ Data Monitor Stream (src/monitorTransform.js)

const { PassThrough } = require('stream');

class MonitorTransform extends PassThrough {
  constructor(options = {}) {
    super({ ...options });
    this.totalCrimes = 0;
    this.boroughTotal = new Map();
    this.monthTotal = new Map();
    this.yearTotal = new Map();

    this.on('data', this.processData);
    this.on('close', this.showResult);
  }

  processData(data) {
    const row = JSON.parse(data.toString());
    const count = Number(row.value) || 0;

    this.totalCrimes += count;
    this.boroughTotal.set(row.borough, (this.boroughTotal.get(row.borough) || 0) + count);
    this.monthTotal.set(row.month, (this.monthTotal.get(row.month) || 0) + count);
    this.yearTotal.set(row.year, (this.yearTotal.get(row.year) || 0) + count);
  }

  showResult() {
    console.log('Total Crimes:', this.totalCrimes);
    console.log('By Borough:', this.boroughTotal);
    console.log('By Month:', this.monthTotal);
    console.log('By Year:', this.yearTotal);
  }
}

module.exports = MonitorTransform;

๐Ÿงฉ Convert to JSON Array (src/jsonTransform.js)

const { Transform } = require('stream');

class JsonTransform extends Transform {
  constructor(options = {}) {
    super({ ...options });
    this.firstLine = true;
    this.once('data', this.startJson);
  }

  startJson() {
    this.push('[');
  }

  _transform(chunk, encoding, callback) {
    const row = JSON.parse(chunk.toString());
    const formatted = this.firstLine ? JSON.stringify(row) : ',' + JSON.stringify(row);
    this.firstLine = false;
    this.push(formatted);
    callback();
  }

  _flush(callback) {
    this.push(']');
    callback();
  }
}

module.exports = JsonTransform;

๐Ÿ”ง Stream Pipeline Builder (src/fileProcessor.js)

const { pipeline } = require('stream/promises');

class FileProcessor {
  constructor() {
    this.readableStream = null;
    this.transforms = [];
    this.writableStream = null;
  }

  setReadable(readableStream) {
    this.readableStream = readableStream;
    return this;
  }

  addTransforms(transformsStream) {
    this.transforms = transformsStream;
    return this;
  }

  setWritable(writableStream) {
    this.writableStream = writableStream;
    return this;
  }

  async execute() {
    if (!this.readableStream) throw new Error('Missing readable stream');
    if (!this.writableStream) throw new Error('Missing writable stream');
    await pipeline(this.readableStream, ...this.transforms, this.writableStream);
  }
}

module.exports = FileProcessor;

๐Ÿš€ Putting It All Together (src/index.js)

const FileProcessor = require('./fileProcessor');
const JsonTransform = require('./jsonTransform');
const MonitorTransform = require('./monitorTransform');
const ObjectTransform = require('./objectTransform');
const { createGzip } = require('node:zlib');
const CloudStorageFileService = require('./cloudStorageFileService');
const ProgressPass = require('./progressPass');

const processor = new FileProcessor();
const cloudFileService = new CloudStorageFileService();

const bucketName = 'myfileuploads';
const fileName = 'london_crime_by_lsoa2.csv';
const destFileName = 'london_crime_by_lsoa2.tar.gz';
const gzip = createGzip();

(async () => {
  try {
    const fileSize = await cloudFileService.getFileSize(bucketName, fileName);

    await processor
      .setReadable(await cloudFileService.downloadFile(bucketName, fileName))
      .addTransforms([
        new ProgressPass(fileSize),
        new ObjectTransform(),
        new MonitorTransform(),
        new JsonTransform(),
        gzip,
      ])
      .setWritable(await cloudFileService.uploadFile(bucketName, destFileName))
      .execute();
  } catch (err) {
    console.error('Pipeline failed:', err);
  }
})();

๐Ÿ“Œ Final Thoughts

  • Streams provide memory-efficient processing of large data
  • Transform and PassThrough streams help in modular processing
  • You can apply this architecture to ETL pipelines, file conversions, and reporting systems

Stay scalable, stay streamable! ๐Ÿš€