Publish/Subscribe Model with RabbitMQ in node.js

Sharmila S
DevOps.dev
Published in
8 min readFeb 18, 2023

--

In this article, we will be learning how to create a publish/subscribe queue model with the message broker RabbitMQ in node.js.

RabbitMQ is an open-source message broker software.

Message Broker is the intermediary agent between provider and client(s) that makes sure the message is not lost.
Some alternatives to RabbitMQ: Apache Kafka, Amazon Kinesis etc.

RabbitMQ implements AMQP (Advanced Message Queuing Protocol). AMQP is a protocol that helps in communication between services using messages.

In this article, we will be implementing a publish/subscribe model. This model handles the scenario of sending a common message to all the servers.

We are going to implement this model with a provider app and two client apps.

The provider will send out a message to the queue. From the queue, the message is transmitted to all the clients.

To achieve this model, we will be using an exchange type called ‘fanout’

Let’s understand what an exchange is.

An exchange receives messages from the Provider and sends them to the desired queues.

The exchange is responsible for making sure the messages are sent to the right queue.

Fanout exchange is one of the 5 types of exchanges in RabbitMQ. To learn about the other exchange types, visit the below link.

Fanout exchange:

In Fanout exchange, we transmit the messages to all the queues connected.

Fanout exchange example

In this article, we will be implementing the RabbitMQ using node.js framework. If you want to get started with node.js or need a quick refresh, check out the below article.

If you are new to RabbitMQ, I recommend the article below to get started with a simple application.

What are we going to create?

  • A publisher app in node.js.
  • An exchange and two queues.
  • Two client apps (client1 and client2). Both the client apps receive messages from their respective queue.

Tools needed:

  1. Node.js and NPM
  2. RabbitMQ

If you don't have node.js installed in your system already, install it from here, based on your OS.

If RabbitMQ is not installed in your system, install it from here based on your platform or you can use this docker image.

I will use the RabbitMQ docker image to run the RabbitMQ server locally. If you are familiar with using Docker, you can run the following command to run the RabbitMQ server in your system.

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

The server is running at port 5672 and it is ready to connect.

Server running

Our folder structure will look like this:

\publish-subscribe-demo
|-- \publisher
|-- \client1
|-- \client2

Publisher

Under the project folder, create a new folder for the publisher server and set up the project.

mkdir publisher
cd publisher
npm init
npm i express amqplib

express-> to create a node.js application.
amqplib-> to create a message broker.

Create a file called index.js and add the following.

Code explanation:

line 1- 6, 54: We are creating a basic express app. Our publisher app is running on port 4000.

(This article on Node.js will help you get started if you are not familiar with node.)

const express = require("express")

const app = express()
const PORT = process.env.PORT || 4000;

app.use(express.json())

//...
//...
//...

app.listen(PORT, () => console.log("Server listening at port " + PORT))

line 8: Importing the amqplib library. Using this, we will communicate with the RabbitMQ server.

const amqp = require("amqplib");

Connection code:

line 9–30: We are wrapping the connection code in a function called ‘connectQueue()’. Inside the function, we will create a connection, channel and exchange.

amqp.connect() is used to connect to the RabbitMQ server. The URL of the server is passed (‘amqp://localhost:5672’) as a parameter.

After connecting to the RabbitMQ server, we are creating a channel using createChannel() .

channel.assertExchange() checks for an exchange with the given properties. If the exchange doesn’t exist, a new exchange will be created with the values passed.

We are naming our exchange ‘test-exchange’. As we discussed earlier the exchange type would be ‘fanout’. You can provide any name for the exchange and queues.

var channel, connection;

const exchange_name = 'test-exchange';
const exchange_type = 'fanout';
// const message = "Hello World!";

connectQueue() // call connectQueue function
async function connectQueue() {
try {

connection = await amqp.connect("amqp://localhost:5672");
channel = await connection.createChannel()

// https://amqp-node.github.io/amqplib/channel_api.html#channel_assertExchange
await channel.assertExchange(exchange_name, exchange_type, {
durable: false
})

} catch (error) {
console.log(error)
}
}

(If you are unfamiliar with async/await and other js concepts, click here to get a quick overview of some of the must-know javascript concepts)

line: 32–38: Let’s write a function that publishes the messages to the queue.

We are defining a function called sendMessageToQueue which will take a message as a parameter. In this function, we send the message to the queue using channel.publish(). In the publish() method, we pass the names of the exchange and the queue along with the message to be sent.

const sendMessageToQueue = async (message) => {
const queue_name = '';
await channel.publish(
exchange_name,
queue_name,
Buffer.from(message)
);
}

Line 40–53: Finally, let’s create a simple API with the GET method. When this API is invoked, we will use the function ‘sendMessageToQueue’ to send the message (here, ‘Hello World!’) to the queue.

app.get("/send-msg", (req, res) => {
const message = "Hello World!";

sendMessageToQueue(message)

console.log("Message sent to the exchange");

// await channel.close();
// await connection.close();

res.send("Message Sent");

})

Let’s create two client servers to receive messages.

Client 1

For client1, we’ll create a simple node application that will connect to and receive messages from the queue ‘test-queue1’.

Create another folder called ‘client1’ at the root level. Initialise npm and install necessary modules.

mkdir client1
cd client1
npm init
npm i express amqplib

Create a file called ‘index.js’ and add the following.

Code explanation

Line 1–5 and 51: First, we’ll set up the express app and make it listen to port 5001.

const express = require("express");
const app = express();
const PORT = process.env.PORT || 5001;
app.use(express.json());

// RabbitMQ code

app.listen(PORT, () => console.log("Server running at port " + PORT));

Line 7–24: Then, we will connect to the RabbitMQ server, and create a new channel. (as we did in the publisher app).

const amqp = require("amqplib");
var channel, connection;

const exchange_name = "test-exchange";
const exchange_type = "fanout";
const queue_name = 'test-queue1';

connectToRabbitMQ();
async function connectToRabbitMQ() {
try {
connection = await amqp.connect("amqp://localhost:5672");
channel = await connection.createChannel();

connectToQueue()

} catch (error) {
console.log(error);
}
}

line 26–49: Now the part left to do is connect to the queue and consume the messages from the queue. We will create a function called ‘connectToQueue()’ to handle this.

 async function connectToQueue() {

await channel.assertExchange(exchange_name, exchange_type, {
durable: false,
});

const q = await channel.assertQueue(queue_name, { exclusive: true });

console.log("Waiting for messages....");

// binding the queue
const binding_key = "";
channel.bindQueue(q.queue, exchange_name, binding_key);

console.log("consuming messages from queue: ", q.queue);
channel.consume(
q.queue,
(msg) => {
if (msg.content)
console.log("Received message: ", msg.content.toString());
},
{ noAck: true }
);
}

Let’s connect to the exchange. (name — ‘test-exchange’, type — ‘fanout’). channel.assertExchange() is used to connect to the exchange.

await channel.assertExchange(exchange_name, exchange_type, {
durable: false,
});

Now, let's connect to the queue to receive the messages. The method channel.assertQueue() could be used to connect to the queue.

const q = await channel.assertQueue(queue_name, { exclusive: true });

We have to bind our new queue to the exchange with a binding key. Here, we set an empty binding key since we want to receive all the messages from the queue without any restrictions.

// binding the queue
const binding_key = "";
channel.bindQueue(q.queue, exchange_name, binding_key);

Let’s consume the messages from the queue using channel.consume() .

consume() method takes the queue's name as a parameter and returns the message from the queue as a callback. The channel.ack() function is used to acknowledge that the particular message has been received by this app.

channel.consume(
q.queue,
(msg) => {
if (msg.content)
console.log("Received message: ", msg.content.toString());
channel.ack(msg);
}
);

Client 2

Now let’s create another app which will also receive the messages from the exchange.

We can just make a copy of the client-1 and make a few changes.

Steps:

  • Copy the client1 folder and rename the folder to ‘client2’.
  • Open the ‘index.js’ file of the client2 app in an editor.
  • Change the value of Port on line 3 from 5001 to 5002. (This app will run on port 5002 and connects to the RabbitMQ Server.)

From:

To:

Testing

It’s time to test our code.

Make sure the RabbitMQ server is running.

In the terminal, open the publisher, client1 and client2 folders in different tabs.

Run the apps using the command node index.js in publisher, client1 and client2.

node index.js

Now when the apps are connected to the RabbitMQ server without errors, go to your browser and hit http://localhost:4000/send-msg. The publisher app will send a message to the exchange when it sees this request. The message will be broadcasted to the queue. From the queue, the message will be received by our client apps.

We can notice that the message has been sent from the publisher by looking at the app logs.

Publisher app logs

We can confirm the message has been received by both the client apps from their logs.

Client1 (left) and Client2 app logs

Happy learning!

Github repo: https://github.com/SharmilaS22/medium-rabbitmq-publish

--

--