How to use streams in Node.js

Node.js streams enable processing large amounts of data efficiently by handling data in chunks rather than loading everything into memory. As the creator of CoreUI with 12 years of Node.js backend experience, I’ve implemented streaming solutions that process terabytes of data daily while maintaining constant memory usage under 100MB for enterprise applications.

The most efficient approach uses built-in stream types with proper error handling and backpressure management.

Readable Stream

const fs = require('fs')

const readStream = fs.createReadStream('large-file.txt', {
  encoding: 'utf8',
  highWaterMark: 16 * 1024 // 16KB chunks
})

readStream.on('data', (chunk) => {
  console.log('Received chunk:', chunk.length, 'bytes')
})

readStream.on('end', () => {
  console.log('Finished reading file')
})

readStream.on('error', (error) => {
  console.error('Error reading file:', error)
})

Piping Streams

const fs = require('fs')

const readStream = fs.createReadStream('source.txt')
const writeStream = fs.createWriteStream('destination.txt')

readStream.pipe(writeStream)

writeStream.on('finish', () => {
  console.log('File copied successfully')
})

Transform Stream

const { Transform } = require('stream')

class UppercaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    const uppercase = chunk.toString().toUpperCase()
    this.push(uppercase)
    callback()
  }
}

const fs = require('fs')

fs.createReadStream('input.txt')
  .pipe(new UppercaseTransform())
  .pipe(fs.createWriteStream('output.txt'))

HTTP Response Streaming

const express = require('express')
const fs = require('fs')

const app = express()

app.get('/download', (req, res) => {
  const filePath = './large-file.pdf'

  res.setHeader('Content-Type', 'application/pdf')
  res.setHeader('Content-Disposition', 'attachment; filename="file.pdf"')

  const readStream = fs.createReadStream(filePath)

  readStream.on('error', (error) => {
    res.status(500).send('Error reading file')
  })

  readStream.pipe(res)
})

app.listen(3000)

Stream Pipeline

const { pipeline } = require('stream')
const fs = require('fs')
const zlib = require('zlib')

pipeline(
  fs.createReadStream('input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('input.txt.gz'),
  (error) => {
    if (error) {
      console.error('Pipeline failed:', error)
    } else {
      console.log('Pipeline succeeded')
    }
  }
)

Backpressure Handling

const fs = require('fs')

const readStream = fs.createReadStream('large-file.txt')
const writeStream = fs.createWriteStream('output.txt')

readStream.on('data', (chunk) => {
  const canWrite = writeStream.write(chunk)

  if (!canWrite) {
    console.log('Backpressure - pausing read stream')
    readStream.pause()
  }
})

writeStream.on('drain', () => {
  console.log('Drain - resuming read stream')
  readStream.resume()
})

readStream.on('end', () => {
  writeStream.end()
})

Custom Readable Stream

const { Readable } = require('stream')

class NumberStream extends Readable {
  constructor(max, options) {
    super(options)
    this.current = 1
    this.max = max
  }

  _read() {
    if (this.current <= this.max) {
      this.push(`${this.current}\n`)
      this.current++
    } else {
      this.push(null) // End stream
    }
  }
}

const numberStream = new NumberStream(10)

numberStream.on('data', (chunk) => {
  console.log('Number:', chunk.toString().trim())
})

numberStream.on('end', () => {
  console.log('Stream ended')
})

Stream from API Response

const https = require('https')
const fs = require('fs')

https.get('https://example.com/large-file.zip', (response) => {
  const writeStream = fs.createWriteStream('downloaded-file.zip')

  let downloaded = 0
  const totalSize = parseInt(response.headers['content-length'], 10)

  response.on('data', (chunk) => {
    downloaded += chunk.length
    const progress = ((downloaded / totalSize) * 100).toFixed(2)
    console.log(`Downloaded: ${progress}%`)
  })

  response.pipe(writeStream)

  writeStream.on('finish', () => {
    console.log('Download complete')
  })
})

CSV Processing with Streams

const fs = require('fs')
const { Transform } = require('stream')
const csv = require('csv-parser')

class CSVTransform extends Transform {
  constructor() {
    super({ objectMode: true })
  }

  _transform(row, encoding, callback) {
    const processed = {
      ...row,
      fullName: `${row.firstName} ${row.lastName}`,
      processed: true
    }

    this.push(JSON.stringify(processed) + '\n')
    callback()
  }
}

fs.createReadStream('input.csv')
  .pipe(csv())
  .pipe(new CSVTransform())
  .pipe(fs.createWriteStream('output.json'))
  .on('finish', () => {
    console.log('CSV processing complete')
  })

Best Practice Note

This is the same streaming architecture we use in CoreUI enterprise applications for processing large datasets and file uploads. Streams provide constant memory usage regardless of data size, making them essential for scalable Node.js applications. Always handle errors properly, manage backpressure, and use pipeline() for automatic error propagation across multiple streams.

For related data processing patterns, check out how to handle file uploads in Node.js and how to process large datasets in Node.js.


Speed up your responsive apps and websites with fully-featured, ready-to-use open-source admin panel templates—free to use and built for efficiency.


About the Author

Subscribe to our newsletter
Get early information about new products, product updates and blog posts.

Answers by CoreUI Core Team