Next.js starter your AI actually understands. Ship internal tools in days not weeks. Pre-order $199 $499 → [Get it now]

How to use RabbitMQ in Node.js

RabbitMQ is a powerful message broker that enables asynchronous communication between services through reliable message queuing. As the creator of CoreUI with over 10 years of Node.js experience since 2014, I’ve built RabbitMQ-based systems for background job processing, microservice communication, and event-driven architectures. The standard approach uses the amqplib library to connect to RabbitMQ, publish messages to queues, and consume messages with workers. This provides reliable, scalable message delivery for distributed systems.

Install amqplib to connect Node.js applications to RabbitMQ.

npm install amqplib

The amqplib library provides the AMQP protocol implementation for Node.js. It supports both callback and promise-based APIs. The library handles connection management, channel creation, and message delivery.

Creating a Message Producer

Send messages to a RabbitMQ queue.

const amqp = require('amqplib')

async function sendMessage(queue, message) {
  try {
    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createChannel()

    await channel.assertQueue(queue, { durable: true })

    channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
      persistent: true
    })

    console.log('Message sent:', message)

    await channel.close()
    await connection.close()
  } catch (error) {
    console.error('Error sending message:', error)
  }
}

sendMessage('tasks', {
  type: 'email',
  to: '[email protected]',
  subject: 'Hello'
})

The amqp.connect() establishes a connection to RabbitMQ. The createChannel() creates a channel for communication. The assertQueue() ensures the queue exists. The sendToQueue() publishes the message. The durable flag makes the queue survive broker restarts. The persistent flag ensures messages are saved to disk.

Creating a Message Consumer

Consume messages from a RabbitMQ queue.

const amqp = require('amqplib')

async function consumeMessages(queue) {
  try {
    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createChannel()

    await channel.assertQueue(queue, { durable: true })
    channel.prefetch(1)

    console.log(`Waiting for messages in ${queue}...`)

    channel.consume(queue, async (msg) => {
      if (msg) {
        const content = JSON.parse(msg.content.toString())
        console.log('Received:', content)

        // Process the message
        await processMessage(content)

        // Acknowledge the message
        channel.ack(msg)
      }
    }, { noAck: false })
  } catch (error) {
    console.error('Error consuming messages:', error)
  }
}

async function processMessage(message) {
  // Simulate processing
  console.log('Processing:', message)
  await new Promise(resolve => setTimeout(resolve, 1000))
}

consumeMessages('tasks')

The prefetch(1) limits the consumer to process one message at a time. The consume() method registers a callback for incoming messages. The message content is parsed from Buffer. The ack() acknowledges successful processing, removing the message from the queue. The noAck: false enables manual acknowledgment for reliability.

Implementing Work Queues

Distribute tasks among multiple workers.

// producer.js
const amqp = require('amqplib')

async function sendTask(task) {
  const connection = await amqp.connect('amqp://localhost')
  const channel = await connection.createChannel()
  const queue = 'work_queue'

  await channel.assertQueue(queue, { durable: true })

  const message = JSON.stringify(task)
  channel.sendToQueue(queue, Buffer.from(message), {
    persistent: true
  })

  console.log('Task sent:', task)

  await channel.close()
  await connection.close()
}

// Send multiple tasks
for (let i = 1; i <= 10; i++) {
  sendTask({ id: i, work: 'Process item ' + i })
}
// worker.js
const amqp = require('amqplib')

async function startWorker(workerId) {
  const connection = await amqp.connect('amqp://localhost')
  const channel = await connection.createChannel()
  const queue = 'work_queue'

  await channel.assertQueue(queue, { durable: true })
  channel.prefetch(1)

  console.log(`Worker ${workerId} waiting for tasks...`)

  channel.consume(queue, async (msg) => {
    const task = JSON.parse(msg.content.toString())
    console.log(`Worker ${workerId} processing:`, task)

    // Simulate work
    await new Promise(resolve => setTimeout(resolve, 2000))

    console.log(`Worker ${workerId} completed:`, task)
    channel.ack(msg)
  })
}

// Start worker
startWorker(process.argv[2] || '1')

Multiple workers consume from the same queue. RabbitMQ distributes messages round-robin among available workers. The prefetch(1) ensures fair distribution based on worker capacity. This pattern scales horizontally by adding more workers.

Using Publish/Subscribe Pattern

Broadcast messages to multiple consumers using exchanges.

// publisher.js
const amqp = require('amqplib')

async function publishEvent(exchange, event) {
  const connection = await amqp.connect('amqp://localhost')
  const channel = await connection.createChannel()

  await channel.assertExchange(exchange, 'fanout', { durable: false })

  const message = JSON.stringify(event)
  channel.publish(exchange, '', Buffer.from(message))

  console.log('Event published:', event)

  await channel.close()
  await connection.close()
}

publishEvent('notifications', {
  type: 'user_signup',
  userId: '123',
  timestamp: Date.now()
})
// subscriber.js
const amqp = require('amqplib')

async function subscribeToEvents(exchange, subscriberId) {
  const connection = await amqp.connect('amqp://localhost')
  const channel = await connection.createChannel()

  await channel.assertExchange(exchange, 'fanout', { durable: false })

  const q = await channel.assertQueue('', { exclusive: true })
  channel.bindQueue(q.queue, exchange, '')

  console.log(`Subscriber ${subscriberId} waiting for events...`)

  channel.consume(q.queue, (msg) => {
    const event = JSON.parse(msg.content.toString())
    console.log(`Subscriber ${subscriberId} received:`, event)
  }, { noAck: true })
}

subscribeToEvents('notifications', process.argv[2] || '1')

The fanout exchange broadcasts messages to all bound queues. Each subscriber creates an exclusive, temporary queue. The bindQueue() connects the queue to the exchange. All subscribers receive every message. This pattern is ideal for event notifications.

Implementing Topic-Based Routing

Route messages based on patterns using topic exchanges.

// publisher.js
const amqp = require('amqplib')

async function publishLog(severity, message) {
  const connection = await amqp.connect('amqp://localhost')
  const channel = await connection.createChannel()
  const exchange = 'logs'

  await channel.assertExchange(exchange, 'topic', { durable: false })

  const routingKey = `log.${severity}`
  channel.publish(exchange, routingKey, Buffer.from(message))

  console.log(`Published [${routingKey}]:`, message)

  await channel.close()
  await connection.close()
}

publishLog('error', 'Database connection failed')
publishLog('info', 'User logged in')
publishLog('warning', 'High memory usage')
// consumer.js
const amqp = require('amqplib')

async function subscribeToLogs(patterns) {
  const connection = await amqp.connect('amqp://localhost')
  const channel = await connection.createChannel()
  const exchange = 'logs'

  await channel.assertExchange(exchange, 'topic', { durable: false })

  const q = await channel.assertQueue('', { exclusive: true })

  patterns.forEach(pattern => {
    channel.bindQueue(q.queue, exchange, pattern)
  })

  console.log('Waiting for logs matching:', patterns)

  channel.consume(q.queue, (msg) => {
    console.log(`[${msg.fields.routingKey}]:`, msg.content.toString())
  }, { noAck: true })
}

// Subscribe to errors and warnings only
subscribeToLogs(['log.error', 'log.warning'])

// Or use wildcards: 'log.*' for all, 'log.error.*' for error subtypes

Topic exchanges route messages based on routing key patterns. The * wildcard matches one word. The # wildcard matches zero or more words. Consumers bind to patterns matching their interests. This provides flexible message routing.

Handling Connection Failures

Implement reconnection logic for resilient applications.

const amqp = require('amqplib')

class RabbitMQConnection {
  constructor(url) {
    this.url = url
    this.connection = null
    this.channel = null
  }

  async connect() {
    try {
      this.connection = await amqp.connect(this.url)
      this.channel = await this.connection.createChannel()

      this.connection.on('error', (err) => {
        console.error('Connection error:', err)
        this.reconnect()
      })

      this.connection.on('close', () => {
        console.log('Connection closed, reconnecting...')
        this.reconnect()
      })

      console.log('Connected to RabbitMQ')
      return this.channel
    } catch (error) {
      console.error('Failed to connect:', error)
      this.reconnect()
    }
  }

  async reconnect() {
    setTimeout(() => this.connect(), 5000)
  }

  async sendMessage(queue, message) {
    if (!this.channel) {
      throw new Error('Not connected')
    }

    await this.channel.assertQueue(queue, { durable: true })
    this.channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
      persistent: true
    })
  }
}

const rabbitmq = new RabbitMQConnection('amqp://localhost')
rabbitmq.connect()

The connection class handles reconnection automatically. Error and close events trigger reconnection. The setTimeout adds delay before retry attempts. This makes applications resilient to temporary RabbitMQ outages.

Best Practice Note

This is the same RabbitMQ integration pattern we use in CoreUI backend services for asynchronous task processing and microservice communication. Always use durable queues and persistent messages for important data to survive broker restarts. Implement proper error handling and retry logic for failed message processing. Use prefetch to control worker concurrency and prevent overwhelming slow consumers. For high-availability systems, configure RabbitMQ clustering and message mirroring. Consider using dead letter exchanges for handling failed messages. RabbitMQ provides more flexible routing and guaranteed delivery compared to simpler message queues, making it ideal for complex distributed systems requiring reliable message patterns like work queues, pub/sub, and RPC.


Speed up your responsive apps and websites with fully-featured, ready-to-use open-source admin panel templates—free to use and built for efficiency.


About the Author