How to use Bull queues in Node.js
Bull is a powerful Node.js library for handling distributed job queues with Redis, enabling reliable background task processing. As the creator of CoreUI with over 10 years of Node.js experience since 2014, I’ve used Bull for email sending, image processing, report generation, and scheduled tasks in production systems. The standard approach creates queue instances, adds jobs with data, and processes them with worker functions that run asynchronously. This provides robust job handling with retries, priorities, and delayed execution.
Install Bull and create a basic job queue.
npm install bull
const Queue = require('bull')
const emailQueue = new Queue('email', {
redis: {
host: '127.0.0.1',
port: 6379
}
})
emailQueue.process(async (job) => {
console.log('Sending email to:', job.data.to)
await sendEmail(job.data)
return { sent: true }
})
emailQueue.add({
to: '[email protected]',
subject: 'Welcome',
body: 'Hello!'
})
async function sendEmail(data) {
await new Promise(resolve => setTimeout(resolve, 1000))
console.log('Email sent!')
}
The Queue constructor creates a queue connected to Redis. The process method registers a handler function. The add method enqueues jobs. Jobs process asynchronously and automatically.
Adding Job Options
Configure retries, delays, and priorities.
const Queue = require('bull')
const imageQueue = new Queue('image-processing')
imageQueue.add(
{ imageUrl: 'https://example.com/image.jpg', size: 'thumbnail' },
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
},
priority: 1,
delay: 5000,
removeOnComplete: true
}
)
imageQueue.process(async (job) => {
const { imageUrl, size } = job.data
console.log(`Processing ${imageUrl} to ${size}`)
if (Math.random() < 0.3) {
throw new Error('Processing failed')
}
return { processed: true, url: imageUrl }
})
The attempts option retries failed jobs. The backoff adds increasing delays between retries. The priority controls job order (lower numbers = higher priority). The delay postpones execution. The removeOnComplete cleans up finished jobs.
Implementing Multiple Workers
Process jobs concurrently with multiple workers.
const Queue = require('bull')
const videoQueue = new Queue('video-processing')
videoQueue.process(5, async (job) => {
const { videoId } = job.data
console.log(`Worker processing video ${videoId}`)
await processVideo(videoId)
await job.progress(100)
return { videoId, status: 'completed' }
})
async function processVideo(videoId) {
await new Promise(resolve => setTimeout(resolve, 5000))
}
for (let i = 0; i < 20; i++) {
videoQueue.add({ videoId: i })
}
The first argument to process specifies concurrency. Five workers process jobs simultaneously. The job.progress() updates job progress. This enables parallel processing for CPU-intensive tasks.
Handling Job Events
Listen to job lifecycle events for monitoring.
const Queue = require('bull')
const reportQueue = new Queue('reports')
reportQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result:`, result)
})
reportQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err.message)
})
reportQueue.on('progress', (job, progress) => {
console.log(`Job ${job.id} is ${progress}% complete`)
})
reportQueue.on('stalled', (job) => {
console.warn(`Job ${job.id} stalled`)
})
reportQueue.process(async (job) => {
for (let i = 0; i <= 100; i += 10) {
await job.progress(i)
await new Promise(resolve => setTimeout(resolve, 100))
}
return { report: 'generated' }
})
reportQueue.add({ reportType: 'monthly' })
Event listeners track job status. The completed event fires on success. The failed event handles errors. The progress event monitors ongoing jobs. The stalled event detects stuck jobs.
Scheduling Recurring Jobs
Create cron-like scheduled jobs.
const Queue = require('bull')
const cleanupQueue = new Queue('cleanup')
cleanupQueue.add(
{ task: 'delete-old-logs' },
{
repeat: {
cron: '0 2 * * *'
}
}
)
cleanupQueue.add(
{ task: 'backup-database' },
{
repeat: {
every: 3600000
}
}
)
cleanupQueue.process(async (job) => {
console.log(`Running ${job.data.task}`)
if (job.data.task === 'delete-old-logs') {
console.log('Deleting logs older than 30 days')
} else if (job.data.task === 'backup-database') {
console.log('Creating database backup')
}
})
The repeat.cron option uses cron syntax for scheduling. The repeat.every specifies intervals in milliseconds. Bull handles job scheduling automatically using Redis.
Implementing Job Priorities
Process high-priority jobs first.
const Queue = require('bull')
const taskQueue = new Queue('tasks')
taskQueue.add({ type: 'urgent' }, { priority: 1 })
taskQueue.add({ type: 'normal' }, { priority: 5 })
taskQueue.add({ type: 'low' }, { priority: 10 })
taskQueue.process(async (job) => {
console.log(`Processing ${job.data.type} job (priority: ${job.opts.priority})`)
await new Promise(resolve => setTimeout(resolve, 1000))
})
Lower priority numbers execute first. Jobs of same priority follow FIFO order. This ensures critical tasks process before less important ones.
Best Practice Note
This is the same Bull queue pattern we use in CoreUI backend services for reliable background job processing. Always handle errors in process functions and use retries for transient failures. Monitor queue metrics using Bull’s built-in methods or Bull Board dashboard. For high-availability systems, use Redis Sentinel or Cluster. Implement proper shutdown handling to finish in-progress jobs gracefully. Bull provides more features than simple queues - use rate limiting for API calls, job dependencies for workflows, and sandboxed processors for security. Always clean up completed jobs to prevent Redis memory bloat.



