Get Started with RabbitMQ in Node.js

In this article, we are going to connect two node.js applications with a queue using RabbitMQ.

Image by Author

RabbitMQ is an open-source message broker software.

Message Broker is the intermediary agent between provider and client(s) that makes sure the message is not lost.
Some alternatives: Apache Kafka, Amazon Kinesis etc.

RabbitMQ implements AMQP (Advanced Message Queuing Protocol). AMQP is a protocol that helps in communication between services using messages.

RabbitMQ

Installation

If RabbitMQ is not installed in your system, install it from here based on your platform or you can use this docker image.

I will be using the RabbitMQ docker image to run the RabbitMQ server locally. If you are familiar with docker, you can run the following command to run the RabbitMQ server in your system.

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

Our server is running at port 5672 and it is ready to connect to services.

RabbitMQ running

This article guides you step-by-step with code snippets. For the full code, visit the Github repository link at the end of this article.

Creating Node app 1 (Provider)

Let's create a node.js app that will act as the provider. The role of the provider here is to send a message to the queue from where it reaches the client. (To get started with node.js, click here)

Steps:

  1. Create a node.js app using express.
  2. Connect to the queue using the amqplib module.
  3. Define an API to send a message to the queue(for testing).

1. Start with a simple Node app

Let's create a simple node application and make it listen to port 4001 .

  • Create a new project folder and initialise npm in it.
$ mkdir provider-app$ cd provider-app$ npm init
  • Install the required modules:
$ npm i express amqplib

express -> to create a node.js application.
amqplib -> to create a message broker.

  • Create index.js file and add the following,
const express = require("express");
const app = express();
const PORT = process.env.PORT || 4001;
app.use(express.json());app.get("/send-msg", (req, res) => {
res.send("Hello world")
});
app.listen(PORT, () => console.log("Server running at port " + PORT));

Notice that we define a route /send-msg here, which we will be using later to send a message to the queue upon an API request.

2. Connection to the queue

  • Import amqplib module in the index.js file.
const amqp = require("amqplib");
  • Let’s wrap our connection code in an async function since we need to work with promises in the code.

(If you are unfamiliar with async/await and other js concepts, click here to get a quick overview of some of the must-know javascript concepts)

var channel, connection;  //global variablesasync function connectQueue() {   
try {
connection = await amqp.connect("amqp://localhost:5672");
channel = await connection.createChannel()

await channel.assertQueue("test-queue")

} catch (error) {
console.log(error)
}
}

Let’s take a look at the code that connects to the RabbitMQ server.

connection = await amqp.connect(“amqp://localhost:5672”)
channel = await connection.createChannel()

  • Using the connect() method, we make a connection to the server which is running at port 5672.
  • We create a channel from the connection using which we can access the queues. (To understand more about connections and channels, visit this link.)

await channel.assertQueue(“test-queue”)

  • In the above line, we check for a queue named ‘test-queue’. If that queue does not exist, a new queue is created with the provided name (here, ‘test-queue’).

Then, call the connectQueue function to initiate a connection when our app starts.

connectQueue()
  • Let’s define another async function, which can be used to send a message to the queue.
async function sendData (data) {    // send data to queue
await channel.sendToQueue("test-queue", Buffer.from(JSON.stringify(data)));

// close the channel and connection
await channel.close();
await connection.close();

}
  • We pass the message to this sendData function using function parameters.
  • The channel.sendToQueue() method is used to send the message to the specified queue. It takes two parameters, the name of the queue and the message to be sent.

3. Define an API for sending a message to the queue (for testing).

  • Let’s modify the route ‘/send-msg’ to send a message to the queue when it is called. We use the function sendData() (which we defined earlier) to pass the message.
app.get("/send-msg", (req, res) => {

// data to be sent
const data = {
title : "Six of Crows",
author : "Leigh Burdugo"
}
sendData(data); // pass the data to the function we defined console.log("A message is sent to queue")
res.send("Message Sent"); //response to the API request

})

Creating Node app 2 (Client)

Now, we create another node.js app that connects to the queue and acknowledges the message from the queue.

Firstly, a simple express app

$ mkdir client-app$ cd client-app$ npm init

Create index.js file and add the following,

const express = require("express");
const app = express();
const PORT = process.env.PORT || 4002;
app.use(express.json());app.listen(PORT, () => console.log("Server running at port " + PORT));

The client-app runs on port 4002.

Connection to the ‘test-queue’

We make a connection to the test-queue in the same way as we did in the ‘provider-app’. Then, we consume the data from the queue and acknowledge it.

const amqp = require("amqplib");
var channel, connection;
connectQueue() // call the connect function

async function connectQueue() {
try {
connection = await amqp.connect("amqp://localhost:5672");
channel = await connection.createChannel()

await channel.assertQueue("test-queue")

channel.consume("test-queue", data => {
console.log(`${Buffer.from(data.content)}`);
channel.ack(data);
}) } catch (error) {
console.log(error);
}
}

The message from the queue is read using the consume() method. It takes the name of the queue ( here, ‘test-queue’ ) as a parameter and returns the message from the queue as a callback.

channel.consume(“test-queue”, data => {
console.log(`${Buffer.from(data.content)}`);
channel.ack(data);
})

The channel.ack() function is used to acknowledge that the particular message has been received by the ‘client-app’.

Testing

Let’s try to send a message from the provider service to the client service.

  • Go to the terminal, run the provider app using,
$ cd provider-app$ node index.js
  • Run the client application in another terminal.
$ cd client-app$ node index.js

Now, let’s try to send a message from the provider app. Our provider app is running at http://localhost:4001. When the route ‘/send-msg’ is called, the message will be sent to the queue.

Since it’s a GET request, I’m just going to enter the URL in the browser, you can also use tools like Postman to send the request.

Request to /send-msg

Let’s look at the console now.

  • provider-app
Provider sending message to the queue
  • client-app
Client displaying the message from the queue

The message has been sent from one service to another, using a queue. We now implemented a simple queueing service using RabbitMQ in nodejs.

GitHub link — https://github.com/SharmilaS22/medium-rabbitmq-nodejs

Hope this article helped to get started with RabbitMQ! Happy learning!

If you liked my article, feel free to buy me a coffee here.

https://www.buymeacoffee.com/sharmilas

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store