
Processing Massive Amounts of Data on Demand Without Crashing Node.js Main Thread
Processing large datasets in Node.js can be challenging due to the risk of blocking the main thread, which drives the event loop. Fortunately, Node.js Streams offer a powerful and efficient way to process data incrementally.
In this tutorial, you'll build a Node.js application that:
- Downloads a large CSV file from Google Cloud Storage
- Converts it into JSON objects
- Extracts useful insights in real-time
- Compresses the data
- Uploads the result back to Cloud Storage
- Visualizes progress in the terminal
Let's dive into it step-by-step using core Node.js features and best practices.
๐งฐ Prerequisites
- Node.js v16.16+
- A Google Cloud Platform account
- A service account key (
gkeys.json
) for authentication
Install dependencies:
npm install @google-cloud/storage
src/cloudStorageFileService.js
)
โ๏ธ Cloud Storage File Service (const { Storage } = require('@google-cloud/storage');
const path = require('path');
const serviceKey = path.join(__dirname, '../gkeys.json');
class CloudStorageFileService {
constructor() {
this.storage = new Storage({
projectId: 'my-project-id',
keyFilename: serviceKey,
});
}
async downloadFile(bucketName, fileName) {
return this.storage.bucket(bucketName).file(fileName).createReadStream();
}
async uploadFile(bucketName, destFileName) {
return this.storage.bucket(bucketName).file(destFileName).createWriteStream();
}
async getFileSize(bucketName, fileName) {
const [metadata] = await this.storage.bucket(bucketName).file(fileName).getMetadata();
return metadata.size;
}
}
module.exports = CloudStorageFileService;
๐ Notes:
- Handles both download and upload as streams
- Retrieves file size for progress tracking
src/progressPass.js
)
๐ Progress Bar Stream (const { PassThrough } = require('stream');
class ProgressPass extends PassThrough {
constructor(fileSize, options = {}) {
super({ ...options });
this.bytesRead = 0;
this.fileSize = fileSize;
this.progress = 0;
this.on('data', this.processData);
this.on('progress', this.showResult);
this.on('close', this.finishProgress);
this.createProgressBar();
}
processData(data) {
this.bytesRead += data.length;
this.progress = (this.bytesRead / this.fileSize) * 100;
this.emit('progress', Math.floor(this.progress));
}
createProgressBar() {
process.stdout.write("\x1B[?25l[");
for (let i = 1; i <= 101; i++) process.stdout.write('-');
process.stdout.write(']');
}
showResult(progress) {
process.stdout.cursorTo(progress + 1);
process.stdout.write('=');
process.stdout.cursorTo(105);
process.stdout.write(`${progress}%`);
}
finishProgress() {
process.stdout.write("\x1B[?25h\n");
}
}
module.exports = ProgressPass;
๐ Notes:
- Uses
PassThrough
to monitor progress without changing data - Custom progress bar using stdout cursor control
src/objectTransform.js
)
๐ CSV to JS Object Transform (const { Transform } = require('stream');
class ObjectTransform extends Transform {
constructor(options = {}) {
super({ ...options });
this.headerLine = true;
this.keys = [];
this.tailChunk = '';
}
_transform(chunk, encoding, callback) {
const stringChunks = chunk.toString("utf8");
const lines = stringChunks.split('\n');
for (const line of lines) {
const lineString = this.tailChunk + line;
const values = lineString.split(',');
if (this.headerLine) {
this.keys = values;
this.headerLine = false;
continue;
}
if (values.length !== this.keys.length || lineString.endsWith(',')) {
this.tailChunk = line;
} else {
const obj = {};
this.keys.forEach((key, index) => {
obj[key] = values[index];
});
this.tailChunk = '';
this.push(JSON.stringify(obj));
}
}
callback();
}
_flush(callback) {
callback();
}
}
module.exports = ObjectTransform;
๐ Notes:
- Handles incomplete line splitting across chunks
- Converts rows to consistent JSON format
src/monitorTransform.js
)
๐ต๏ธ Data Monitor Stream (const { PassThrough } = require('stream');
class MonitorTransform extends PassThrough {
constructor(options = {}) {
super({ ...options });
this.totalCrimes = 0;
this.boroughTotal = new Map();
this.monthTotal = new Map();
this.yearTotal = new Map();
this.on('data', this.processData);
this.on('close', this.showResult);
}
processData(data) {
const row = JSON.parse(data.toString());
const count = Number(row.value) || 0;
this.totalCrimes += count;
this.boroughTotal.set(row.borough, (this.boroughTotal.get(row.borough) || 0) + count);
this.monthTotal.set(row.month, (this.monthTotal.get(row.month) || 0) + count);
this.yearTotal.set(row.year, (this.yearTotal.get(row.year) || 0) + count);
}
showResult() {
console.log('Total Crimes:', this.totalCrimes);
console.log('By Borough:', this.boroughTotal);
console.log('By Month:', this.monthTotal);
console.log('By Year:', this.yearTotal);
}
}
module.exports = MonitorTransform;
src/jsonTransform.js
)
๐งฉ Convert to JSON Array (const { Transform } = require('stream');
class JsonTransform extends Transform {
constructor(options = {}) {
super({ ...options });
this.firstLine = true;
this.once('data', this.startJson);
}
startJson() {
this.push('[');
}
_transform(chunk, encoding, callback) {
const row = JSON.parse(chunk.toString());
const formatted = this.firstLine ? JSON.stringify(row) : ',' + JSON.stringify(row);
this.firstLine = false;
this.push(formatted);
callback();
}
_flush(callback) {
this.push(']');
callback();
}
}
module.exports = JsonTransform;
src/fileProcessor.js
)
๐ง Stream Pipeline Builder (const { pipeline } = require('stream/promises');
class FileProcessor {
constructor() {
this.readableStream = null;
this.transforms = [];
this.writableStream = null;
}
setReadable(readableStream) {
this.readableStream = readableStream;
return this;
}
addTransforms(transformsStream) {
this.transforms = transformsStream;
return this;
}
setWritable(writableStream) {
this.writableStream = writableStream;
return this;
}
async execute() {
if (!this.readableStream) throw new Error('Missing readable stream');
if (!this.writableStream) throw new Error('Missing writable stream');
await pipeline(this.readableStream, ...this.transforms, this.writableStream);
}
}
module.exports = FileProcessor;
src/index.js
)
๐ Putting It All Together (const FileProcessor = require('./fileProcessor');
const JsonTransform = require('./jsonTransform');
const MonitorTransform = require('./monitorTransform');
const ObjectTransform = require('./objectTransform');
const { createGzip } = require('node:zlib');
const CloudStorageFileService = require('./cloudStorageFileService');
const ProgressPass = require('./progressPass');
const processor = new FileProcessor();
const cloudFileService = new CloudStorageFileService();
const bucketName = 'myfileuploads';
const fileName = 'london_crime_by_lsoa2.csv';
const destFileName = 'london_crime_by_lsoa2.tar.gz';
const gzip = createGzip();
(async () => {
try {
const fileSize = await cloudFileService.getFileSize(bucketName, fileName);
await processor
.setReadable(await cloudFileService.downloadFile(bucketName, fileName))
.addTransforms([
new ProgressPass(fileSize),
new ObjectTransform(),
new MonitorTransform(),
new JsonTransform(),
gzip,
])
.setWritable(await cloudFileService.uploadFile(bucketName, destFileName))
.execute();
} catch (err) {
console.error('Pipeline failed:', err);
}
})();
๐ Final Thoughts
- Streams provide memory-efficient processing of large data
- Transform and PassThrough streams help in modular processing
- You can apply this architecture to ETL pipelines, file conversions, and reporting systems
Stay scalable, stay streamable! ๐