How to add GraphQL subscriptions in Node.js

GraphQL subscriptions enable real-time data updates by maintaining persistent WebSocket connections between clients and servers for push-based event notifications. As the creator of CoreUI, a widely used open-source UI library, I’ve implemented real-time features in collaborative dashboards throughout my 12 years of development experience since 2014. The most effective approach is using graphql-ws library with PubSub pattern for managing subscriptions and broadcasting events to connected clients. This method provides standards-compliant WebSocket protocol, automatic connection management, and scalable event distribution for real-time applications.

Install required packages for GraphQL subscriptions with WebSocket support.

npm install graphql graphql-ws ws @graphql-tools/schema

Create GraphQL schema with subscription types:

// schema.js
import { makeExecutableSchema } from '@graphql-tools/schema'

const typeDefs = `
  type Message {
    id: ID!
    content: String!
    author: String!
    timestamp: String!
  }

  type User {
    id: ID!
    name: String!
    status: String!
  }

  type Notification {
    id: ID!
    type: String!
    message: String!
    timestamp: String!
  }

  type Query {
    messages: [Message!]!
    users: [User!]!
  }

  type Mutation {
    sendMessage(content: String!, author: String!): Message!
    updateUserStatus(userId: ID!, status: String!): User!
    createNotification(type: String!, message: String!): Notification!
  }

  type Subscription {
    messageAdded: Message!
    userStatusChanged: User!
    notificationReceived: Notification!
  }
`

export default typeDefs

Implement PubSub for event management:

// pubsub.js
export class PubSub {
  constructor() {
    this.topics = new Map()
  }

  subscribe(topic, callback) {
    if (!this.topics.has(topic)) {
      this.topics.set(topic, new Set())
    }

    const subscribers = this.topics.get(topic)
    subscribers.add(callback)

    return () => {
      subscribers.delete(callback)
      if (subscribers.size === 0) {
        this.topics.delete(topic)
      }
    }
  }

  publish(topic, payload) {
    const subscribers = this.topics.get(topic)
    if (!subscribers) {
      return
    }

    subscribers.forEach(callback => {
      try {
        callback(payload)
      } catch (error) {
        console.error('Error in subscription callback:', error)
      }
    })
  }

  asyncIterator(topics) {
    const pullQueue = []
    const pushQueue = []
    let listening = true

    const topicsArray = Array.isArray(topics) ? topics : [topics]
    const unsubscribers = []

    const pushValue = (event) => {
      if (pullQueue.length !== 0) {
        const resolver = pullQueue.shift()
        resolver({ value: event, done: false })
      } else {
        pushQueue.push(event)
      }
    }

    const pullValue = () => {
      return new Promise(resolve => {
        if (pushQueue.length !== 0) {
          resolve({ value: pushQueue.shift(), done: false })
        } else {
          pullQueue.push(resolve)
        }
      })
    }

    const unsubscribeAll = () => {
      unsubscribers.forEach(unsubscribe => unsubscribe())
      listening = false
      pullQueue.forEach(resolve => resolve({ value: undefined, done: true }))
      pullQueue.length = 0
      pushQueue.length = 0
    }

    topicsArray.forEach(topic => {
      const unsubscribe = this.subscribe(topic, pushValue)
      unsubscribers.push(unsubscribe)
    })

    return {
      next: () => (listening ? pullValue() : Promise.resolve({ value: undefined, done: true })),
      return: () => {
        unsubscribeAll()
        return Promise.resolve({ value: undefined, done: true })
      },
      throw: (error) => {
        unsubscribeAll()
        return Promise.reject(error)
      },
      [Symbol.asyncIterator]: () => this
    }
  }
}

export const pubsub = new PubSub()

Create resolvers with subscription logic:

// resolvers.js
import { pubsub } from './pubsub.js'

const messages = []
const users = [
  { id: '1', name: 'John', status: 'online' },
  { id: '2', name: 'Jane', status: 'offline' }
]

export const resolvers = {
  Query: {
    messages: () => messages,
    users: () => users
  },

  Mutation: {
    sendMessage: (_, { content, author }) => {
      const message = {
        id: String(messages.length + 1),
        content,
        author,
        timestamp: new Date().toISOString()
      }

      messages.push(message)
      pubsub.publish('MESSAGE_ADDED', { messageAdded: message })

      return message
    },

    updateUserStatus: (_, { userId, status }) => {
      const user = users.find(u => u.id === userId)
      if (!user) {
        throw new Error('User not found')
      }

      user.status = status
      pubsub.publish('USER_STATUS_CHANGED', { userStatusChanged: user })

      return user
    },

    createNotification: (_, { type, message }) => {
      const notification = {
        id: String(Date.now()),
        type,
        message,
        timestamp: new Date().toISOString()
      }

      pubsub.publish('NOTIFICATION_RECEIVED', { notificationReceived: notification })

      return notification
    }
  },

  Subscription: {
    messageAdded: {
      subscribe: () => pubsub.asyncIterator(['MESSAGE_ADDED'])
    },

    userStatusChanged: {
      subscribe: () => pubsub.asyncIterator(['USER_STATUS_CHANGED'])
    },

    notificationReceived: {
      subscribe: () => pubsub.asyncIterator(['NOTIFICATION_RECEIVED'])
    }
  }
}

Set up WebSocket server with GraphQL subscriptions:

// server.js
import { createServer } from 'http'
import { WebSocketServer } from 'ws'
import { useServer } from 'graphql-ws/lib/use/ws'
import { makeExecutableSchema } from '@graphql-tools/schema'
import express from 'express'
import { createHandler } from 'graphql-http/lib/use/express'
import typeDefs from './schema.js'
import { resolvers } from './resolvers.js'

const schema = makeExecutableSchema({ typeDefs, resolvers })

const app = express()

app.use('/graphql', createHandler({ schema }))

app.get('/', (req, res) => {
  res.send('GraphQL server with subscriptions')
})

const server = createServer(app)

const wsServer = new WebSocketServer({
  server,
  path: '/graphql'
})

useServer({ schema }, wsServer)

const PORT = process.env.PORT || 4000

server.listen(PORT, () => {
  console.log(`Server running on http://localhost:${PORT}/graphql`)
  console.log(`WebSocket server running on ws://localhost:${PORT}/graphql`)
})

Client-side subscription example using graphql-ws:

// client.js
import { createClient } from 'graphql-ws'

const client = createClient({
  url: 'ws://localhost:4000/graphql'
})

const subscribeToMessages = () => {
  return client.subscribe(
    {
      query: `
        subscription {
          messageAdded {
            id
            content
            author
            timestamp
          }
        }
      `
    },
    {
      next: (data) => {
        console.log('New message:', data.data.messageAdded)
      },
      error: (error) => {
        console.error('Subscription error:', error)
      },
      complete: () => {
        console.log('Subscription completed')
      }
    }
  )
}

const unsubscribe = subscribeToMessages()

setTimeout(() => {
  unsubscribe()
}, 60000)

React component with subscription:

import { useEffect, useState } from 'react'
import { createClient } from 'graphql-ws'

const client = createClient({
  url: 'ws://localhost:4000/graphql'
})

const MessageFeed = () => {
  const [messages, setMessages] = useState([])

  useEffect(() => {
    const unsubscribe = client.subscribe(
      {
        query: `
          subscription {
            messageAdded {
              id
              content
              author
              timestamp
            }
          }
        `
      },
      {
        next: (data) => {
          setMessages(prev => [...prev, data.data.messageAdded])
        },
        error: (error) => {
          console.error('Subscription error:', error)
        }
      }
    )

    return () => unsubscribe()
  }, [])

  return (
    <div>
      <h2>Live Messages</h2>
      {messages.map(msg => (
        <div key={msg.id}>
          <strong>{msg.author}</strong>: {msg.content}
          <small>{new Date(msg.timestamp).toLocaleTimeString()}</small>
        </div>
      ))}
    </div>
  )
}

Here the graphql-ws library implements WebSocket protocol for GraphQL subscriptions following official GraphQL over WebSocket protocol. The PubSub class manages topic-based event distribution with subscribe and publish methods for event handling. The asyncIterator method creates async iterators required by GraphQL subscription resolvers. The subscription resolvers return async iterators that yield values when pubsub publishes events to subscribed topics. The useServer function attaches GraphQL subscription handler to existing WebSocket server for unified HTTP and WebSocket endpoint. The createClient function establishes WebSocket connection and manages subscription lifecycle including reconnection logic. The React component subscribes on mount and unsubscribes on unmount to prevent memory leaks.

Best Practice Note:

This is the GraphQL subscription implementation we use in CoreUI dashboards for real-time notifications, live data updates, and collaborative features. Implement authentication and authorization for subscriptions by validating tokens in connection parameters, add subscription filtering to send events only to relevant clients based on user permissions or data ownership, and consider using Redis PubSub for horizontal scaling across multiple server instances in production environments with high concurrent connection requirements.


Speed up your responsive apps and websites with fully-featured, ready-to-use open-source admin panel templates—free to use and built for efficiency.


About the Author