Asynchronous Messaging Service Using NATS - Jetstream

Asynchronous Messaging Service Using NATS/Jetstream: What and How?

In modern software development, asynchronous communication among different services or using a queue service for running a long process background job is very common. There are many players out there like RabbitMQ, Apache Kafka, NATS Streaming, etc. NATS/Jetstream is one of them. 

About NATS/Jetstream

It is a cloud-native, open source, high-performance message system. It was created to solve the problems identified with streaming in technology today – complexity, fragility, and a lack of scalability. So it

  • is easy to deploy and manage, built into the NATS server
  • simplifies and accelerates development
  • supports wildcard subjects
  • supports at least once delivery and exactly once within a window
  • is horizontally scalable at runtime with no interruptions
  • persists data via streams and delivers or replays via consumers
  • supports multiple patterns to consume data on the same stream
  • supports push and pull modes when consuming messages
  • is account aware

Example User Case

Suppose we have a video rendering service that takes a request from the user and renders the video and sends the generated video. The video rendering process can take a minimum of half an hour to a maximum of hours that we don’t know. So in this case user should not wait for the video to render successfully. A possible solution could be, that we can receive the request from the user and put it in the queue, and say to the user that we will send the video via email shortly

We will use here NATS/Jetstreaming as a message queue service

Server

Now we will see how can we set up the NATS/Jetstream server and client for our local development!

To set up the server in our local machine we will use docker and docker-compose in our local machine. You can use this docker-compose file 

version: "3.8"
services:
  nats:
    image: nats:2.8.4-alpine ports:
      - 4222:4222 - 6222:6222 - 8222:8222 command: --jsCode language: CSS (css)
Screenshot showing starting NATS server for NATS/Jetstream: Building an Asynchronous Messaging Service.

It will start the server. Now we have to set up the client side like stream and consumer for communication. To setup the client stuff we need to nats CLI first. You can find the cli for windows and linux user form HERE. Lets setup the stream first.

Stream

We will add the stream first. Through this stream the message will be passed to the queue. We will give a steam name demo-stream. Then it will ask for some option to complete the stream setup. Two are the most important options from them. One is the subject and another is the retention policy. The message will publish with the subject name. Here the retention policy will be work queue as we will use the nats/jetstreaming as a queue system. The complete stream setup will look like the below output.

Screenshot of source code snippet displaying NATS stream setup for Building an Asynchronous Messaging Service.

Consumer

Now, we will setup the consumer, it will consume the message from the queue. Here three things are important, one is consumer name, another is the subject name that we provided during stream creation and the last is the maximum deliveries. So if we set the maximum delivery 3 times, the queue will re-assign the message 3 times if any message fails to process. The complete consumer setting will look like below.

Screenshot of source code snippet displaying NATS customer setup for Building an Asynchronous Messaging Service.

The NATS/Streaming client/server setup part is completed. Now we will see the sample source code to publish the message in the queue and consume the message from the queue.

Publish Message
create a file name – publish.js

 const nats = require('nats');

const natsHost = process.env.NATS_HOST;

const subjectName = process.env.NATS_SUBJECT;

try {

    const nc = await connect({servers: natsHost});

    const js = nc.jetstream();

    coinst msqObj = {name: 'sohel', address: 'uttra'}

    const qm = JSON.stringify(msqObj);

    const msqId = 'abc';

    await js.publish(subjectName, sc.encode(qm), { msgID: `${msqId}` });

} catch (exception) {

    console.log(exception);

}
Code language: JavaScript (javascript)

make sure the msgID will unique so that it can prevent message duplication.

Consume Message

create a filename – consume.js

const nats = require('nats');

const natsHost = process.env.NATS_HOST;

const streamName = process.env.NATS_STREAM;

const consumerName = process.env.NATS_CONSUMER;

const nc = await connect({servers: natsHost});

const js = nc.jetstream();

try {

  let msgs = await js.fetch(streamName, consumerName, { batch: 1, expires: 5000 });

  for await (const message of msgs) {

    const inputData = JSON.parse(sc.decode(message.data));

    let renderResult = await renderVideo(inputData.requestBody,  message);

    if(renderResult) {

      logger.info("sending acknowledgment");

      message.ack();

    } else {

      message.nak(1000);

    }

  }

}  catch(exception){

  console.log(exception);

}
Code language: JavaScript (javascript)

I think this will be helpful for you to implement a queue system. Please, let me know in the comments section if you have any feedback or any concern.

If you need further assistance to implement an asynchronous messaging system or a queue system you can reach us via email at  info@perf.ixorasolution.com or feel free to contact us

Thank you so much for your patience!

Add a Comment

Your email address will not be published. Required fields are marked *