How to pipe streams in Node.js
Piping streams in Node.js connects readable and writable streams to create efficient data processing pipelines with automatic flow control and error handling.
With over 25 years of experience in software development and as the creator of CoreUI, I’ve implemented stream pipelines extensively in data processing systems, file operations, and real-time applications.
From my expertise, the most effective approach is using the pipe() method to connect streams, and pipeline() for more robust error handling and cleanup.
This pattern enables building complex data processing workflows while maintaining memory efficiency and proper resource management.
Use the pipe() method to connect readable and writable streams for automatic data flow and backpressure handling.
const fs = require('fs')
const zlib = require('zlib')
const { pipeline } = require('stream')
// Simple piping
const readStream = fs.createReadStream('input.txt')
const writeStream = fs.createWriteStream('output.txt')
readStream.pipe(writeStream)
// Complex pipeline with error handling
const source = fs.createReadStream('large-file.txt')
const gzip = zlib.createGzip()
const destination = fs.createWriteStream('compressed.gz')
pipeline(
source,
gzip,
destination,
(error) => {
if (error) {
console.error('Pipeline failed:', error)
} else {
console.log('Pipeline succeeded')
}
}
)
Here readStream.pipe(writeStream) connects a readable stream to a writable stream, automatically managing data flow and backpressure. The pipeline() function provides better error handling and automatic cleanup when streams finish or encounter errors. It properly destroys all streams if any fail, preventing memory leaks. The pipeline pattern enables chaining multiple transform streams for complex data processing workflows.
Best Practice Note:
This is the same approach we use in CoreUI backend systems for file processing pipelines, data transformation workflows, and real-time data streaming. Use pipeline() instead of pipe() in production applications for better error handling and automatic resource cleanup when dealing with multiple streams.



