How to use streams in Node.js
Node.js streams enable processing large amounts of data efficiently by handling data in chunks rather than loading everything into memory. As the creator of CoreUI with 12 years of Node.js backend experience, I’ve implemented streaming solutions that process terabytes of data daily while maintaining constant memory usage under 100MB for enterprise applications.
The most efficient approach uses built-in stream types with proper error handling and backpressure management.
Readable Stream
const fs = require('fs')
const readStream = fs.createReadStream('large-file.txt', {
encoding: 'utf8',
highWaterMark: 16 * 1024 // 16KB chunks
})
readStream.on('data', (chunk) => {
console.log('Received chunk:', chunk.length, 'bytes')
})
readStream.on('end', () => {
console.log('Finished reading file')
})
readStream.on('error', (error) => {
console.error('Error reading file:', error)
})
Piping Streams
const fs = require('fs')
const readStream = fs.createReadStream('source.txt')
const writeStream = fs.createWriteStream('destination.txt')
readStream.pipe(writeStream)
writeStream.on('finish', () => {
console.log('File copied successfully')
})
Transform Stream
const { Transform } = require('stream')
class UppercaseTransform extends Transform {
_transform(chunk, encoding, callback) {
const uppercase = chunk.toString().toUpperCase()
this.push(uppercase)
callback()
}
}
const fs = require('fs')
fs.createReadStream('input.txt')
.pipe(new UppercaseTransform())
.pipe(fs.createWriteStream('output.txt'))
HTTP Response Streaming
const express = require('express')
const fs = require('fs')
const app = express()
app.get('/download', (req, res) => {
const filePath = './large-file.pdf'
res.setHeader('Content-Type', 'application/pdf')
res.setHeader('Content-Disposition', 'attachment; filename="file.pdf"')
const readStream = fs.createReadStream(filePath)
readStream.on('error', (error) => {
res.status(500).send('Error reading file')
})
readStream.pipe(res)
})
app.listen(3000)
Stream Pipeline
const { pipeline } = require('stream')
const fs = require('fs')
const zlib = require('zlib')
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('input.txt.gz'),
(error) => {
if (error) {
console.error('Pipeline failed:', error)
} else {
console.log('Pipeline succeeded')
}
}
)
Backpressure Handling
const fs = require('fs')
const readStream = fs.createReadStream('large-file.txt')
const writeStream = fs.createWriteStream('output.txt')
readStream.on('data', (chunk) => {
const canWrite = writeStream.write(chunk)
if (!canWrite) {
console.log('Backpressure - pausing read stream')
readStream.pause()
}
})
writeStream.on('drain', () => {
console.log('Drain - resuming read stream')
readStream.resume()
})
readStream.on('end', () => {
writeStream.end()
})
Custom Readable Stream
const { Readable } = require('stream')
class NumberStream extends Readable {
constructor(max, options) {
super(options)
this.current = 1
this.max = max
}
_read() {
if (this.current <= this.max) {
this.push(`${this.current}\n`)
this.current++
} else {
this.push(null) // End stream
}
}
}
const numberStream = new NumberStream(10)
numberStream.on('data', (chunk) => {
console.log('Number:', chunk.toString().trim())
})
numberStream.on('end', () => {
console.log('Stream ended')
})
Stream from API Response
const https = require('https')
const fs = require('fs')
https.get('https://example.com/large-file.zip', (response) => {
const writeStream = fs.createWriteStream('downloaded-file.zip')
let downloaded = 0
const totalSize = parseInt(response.headers['content-length'], 10)
response.on('data', (chunk) => {
downloaded += chunk.length
const progress = ((downloaded / totalSize) * 100).toFixed(2)
console.log(`Downloaded: ${progress}%`)
})
response.pipe(writeStream)
writeStream.on('finish', () => {
console.log('Download complete')
})
})
CSV Processing with Streams
const fs = require('fs')
const { Transform } = require('stream')
const csv = require('csv-parser')
class CSVTransform extends Transform {
constructor() {
super({ objectMode: true })
}
_transform(row, encoding, callback) {
const processed = {
...row,
fullName: `${row.firstName} ${row.lastName}`,
processed: true
}
this.push(JSON.stringify(processed) + '\n')
callback()
}
}
fs.createReadStream('input.csv')
.pipe(csv())
.pipe(new CSVTransform())
.pipe(fs.createWriteStream('output.json'))
.on('finish', () => {
console.log('CSV processing complete')
})
Best Practice Note
This is the same streaming architecture we use in CoreUI enterprise applications for processing large datasets and file uploads. Streams provide constant memory usage regardless of data size, making them essential for scalable Node.js applications. Always handle errors properly, manage backpressure, and use pipeline() for automatic error propagation across multiple streams.
Related Articles
For related data processing patterns, check out how to handle file uploads in Node.js and how to process large datasets in Node.js.



