How to use Kafka in Node.js
Apache Kafka enables building scalable event-driven systems with reliable message streaming between services. As the creator of CoreUI with over 10 years of Node.js experience since 2014, I’ve built Kafka-based architectures for real-time data processing, event sourcing, and microservice communication. The standard approach uses the KafkaJS library to create producers and consumers that connect to Kafka clusters. This provides high-throughput, fault-tolerant messaging for distributed systems.
Install KafkaJS to connect Node.js applications to Kafka.
npm install kafkajs
KafkaJS is the most popular and actively maintained Kafka client for Node.js. It provides a clean API for producing and consuming messages with full support for modern Kafka features.
Creating a Kafka Producer
Send messages to Kafka topics using a producer.
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
const producer = kafka.producer()
async function sendMessage(topic, message) {
await producer.connect()
await producer.send({
topic: topic,
messages: [
{ value: JSON.stringify(message) }
]
})
console.log('Message sent:', message)
}
async function main() {
await sendMessage('user-events', {
userId: '123',
action: 'login',
timestamp: Date.now()
})
await producer.disconnect()
}
main().catch(console.error)
The Kafka instance configures the connection to brokers. The producer.connect() establishes the connection. The producer.send() publishes messages to the specified topic. Messages are automatically serialized as strings. The disconnect() closes the connection gracefully.
Creating a Kafka Consumer
Consume messages from Kafka topics.
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
const consumer = kafka.consumer({ groupId: 'user-service' })
async function consumeMessages() {
await consumer.connect()
await consumer.subscribe({ topic: 'user-events', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString())
console.log('Received event:', event)
// Process the event
if (event.action === 'login') {
console.log(`User ${event.userId} logged in`)
}
}
})
}
consumeMessages().catch(console.error)
The consumer joins a consumer group identified by groupId. Multiple consumers with the same group ID share message processing. The subscribe() method registers interest in a topic. The fromBeginning flag reads all messages from the start. The eachMessage callback processes each message.
Producing Messages with Keys
Use message keys for partition assignment and ordering.
async function sendMessageWithKey(topic, key, message) {
await producer.send({
topic: topic,
messages: [
{
key: key,
value: JSON.stringify(message)
}
]
})
}
// Messages with the same key go to the same partition
await sendMessageWithKey('orders', 'user-123', {
orderId: 'order-1',
amount: 99.99
})
await sendMessageWithKey('orders', 'user-123', {
orderId: 'order-2',
amount: 149.99
})
Kafka uses message keys to determine which partition receives the message. Messages with the same key always go to the same partition, maintaining order. This is crucial for event sourcing where order matters. Keys are typically entity IDs like user IDs or order IDs.
Handling Consumer Errors
Implement error handling and retry logic for consumers.
async function createResilientConsumer() {
const consumer = kafka.consumer({
groupId: 'user-service',
retry: {
initialRetryTime: 100,
retries: 8
}
})
await consumer.connect()
await consumer.subscribe({ topic: 'user-events' })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const event = JSON.parse(message.value.toString())
await processEvent(event)
} catch (error) {
console.error('Error processing message:', error)
// Message will be retried or sent to dead letter queue
throw error
}
}
})
consumer.on('consumer.crash', ({ error, payload }) => {
console.error('Consumer crashed:', error)
// Implement alerting or restart logic
})
return consumer
}
async function processEvent(event) {
// Simulate processing
if (Math.random() < 0.1) {
throw new Error('Random processing error')
}
console.log('Processed:', event)
}
The retry configuration controls reconnection behavior. The try-catch block handles processing errors. Throwing errors triggers Kafka’s retry mechanism. The consumer.crash event handles catastrophic failures. This makes consumers resilient to transient failures.
Producing Multiple Messages in Batch
Send multiple messages efficiently using batches.
async function sendBatch(topic, messages) {
await producer.send({
topic: topic,
messages: messages.map(msg => ({
value: JSON.stringify(msg)
}))
})
}
const userEvents = [
{ userId: '1', action: 'login' },
{ userId: '2', action: 'logout' },
{ userId: '3', action: 'signup' }
]
await sendBatch('user-events', userEvents)
Batch sending reduces network overhead and increases throughput. The producer automatically handles partitioning and compression. This is more efficient than sending messages individually, especially for high-volume scenarios.
Creating Topics Programmatically
Create and manage Kafka topics from Node.js.
async function createTopic(topicName, numPartitions = 3, replicationFactor = 1) {
const admin = kafka.admin()
try {
await admin.connect()
const existingTopics = await admin.listTopics()
if (!existingTopics.includes(topicName)) {
await admin.createTopics({
topics: [{
topic: topicName,
numPartitions: numPartitions,
replicationFactor: replicationFactor
}]
})
console.log(`Topic ${topicName} created`)
} else {
console.log(`Topic ${topicName} already exists`)
}
} finally {
await admin.disconnect()
}
}
await createTopic('user-events', 5, 2)
The admin API manages Kafka infrastructure. The createTopics() method creates topics with specified partitions and replication. The listTopics() checks for existing topics. More partitions enable higher parallelism. Replication provides fault tolerance.
Implementing Transaction Support
Use transactions for exactly-once semantics.
async function sendTransactional() {
const producer = kafka.producer({
transactionalId: 'my-transactional-producer',
maxInFlightRequests: 1,
idempotent: true
})
await producer.connect()
const transaction = await producer.transaction()
try {
await transaction.send({
topic: 'orders',
messages: [{ value: JSON.stringify({ orderId: '1' }) }]
})
await transaction.send({
topic: 'payments',
messages: [{ value: JSON.stringify({ paymentId: '1' }) }]
})
await transaction.commit()
console.log('Transaction committed')
} catch (error) {
await transaction.abort()
console.error('Transaction aborted:', error)
throw error
} finally {
await producer.disconnect()
}
}
Transactional producers ensure atomic writes across multiple topics. The transactionalId enables exactly-once semantics. The transaction.commit() makes all sends permanent. The transaction.abort() rolls back on errors. This guarantees consistency across distributed operations.
Best Practice Note
This is the same Kafka integration pattern we use in CoreUI backend services for building event-driven microservice architectures. Always use consumer groups for scalability - multiple consumers in the same group share workload automatically. Implement proper error handling and monitoring for production systems. Use message keys consistently to maintain event ordering when needed. For high-throughput scenarios, tune batch size and compression settings. Consider implementing a dead letter queue pattern for messages that fail processing repeatedly. Kafka provides significantly better throughput and durability compared to traditional message queues, making it ideal for event sourcing and stream processing applications.



