Building Distributed Event Streaming Systems In Go With NATS JetStream
In this post, I will give a brief introduction to NATS JetStream as a distributed event streaming system, which comes from the NATS ecosystem. In a nutshell, NATS JetStream is a distributed streaming system with publish/subscribe as the messaging pattern. When I introduce NATS to my clients in my consulting job, there is a confusion about different offerings with NATS, NATS Streaming and the new offering NATS JetStream. I will give a quick overview on these three offerings and then we will dive into NATS JetStream with a very simple example demo. A full-fledged demo will be given at later posts.
Understanding the NATS ecosystem: From basic NATS server to NATS JetStream
The NATS cloud-native messaging ecosystem first came with a simple NATS server based on “At-most once” delivery model that forwards the published messages to consumers without any persistence for the published messages, with an industry leading performance. For some kind of applications, the basic NATS platform is adequate where you don’t need an event streaming platform and can gain the performance benefits as well. With “At-most once” delivery model, if any of the subscriber systems, is down while forwarding the messages to subscriber systems by messaging server, then those messages will be missed in subscriber systems and thus there is no guarantee of delivery for the published messages.
The second messaging option was added later, named NATS Streaming to the NATS ecosystem, which is an extremely performant, lightweight reliable streaming platform built on the basic NATS. When you publish messages with NATS Streaming, the published messages persist into a customisable storage so that we can replay the messages into consumers that provides “At-least-once-delivery” model with the capability of ACK messages for publishers and subscribers, publisher rate limiting and rate matching/limiting per subscriber. Both basic NATS and NATS Streaming are written in Go. Although NATS Streaming is extremely performant and lightweight, it is not much powerful as distributed streaming systems like Kafka in-terms of some of the capabilities and maturity. And at the same time, NATS ecosystem evolved a lot with the NATS 2.0 that provides distributed security, decentralized management, multi-tenancy, larger networks, global scaling with super clusters, and secure sharing of data. But the NATS Streaming is having a lot of limitations to fit into the era of NATS 2.0, and also streaming systems are not evolved to tackle next-generation challenges with IoT and Edge.
The next-generation of NATS Streaming for the era of NATS 2.0, is known as NATS JetStream. So if you would like to use a streaming system for building distributed systems, IoT and Edge, then better to use NATS JetStream over the NATS Streaming. The official support for NATS Streaming will be end by June 2023. So choose the basic NATS if you don’t want to persist the messages published by publisher systems, and use NATS JetStream if you would like to persist the messages into a persistent store and replay the messages from the persistent store, with ACK, rate matching/limiting, etc. NATS JetStream is also written in Go.
NATS JetStream
NATS JetStream is the next-generation of streaming system from the NATS ecosystem, built for the era of NATS 2.0 with the capabilities of distributed security, multi-tenancy and horizontally scaling.
What is distributed event streaming systems?
The streaming systems let you capture the streams of events from distributed systems and microservices, IoT sensors, Edge devices and software applications, and persist these streams into persistent stores. Because you persist these streams of events into persistent stores, you can replay it for retrieval, processing, and reactive to those event streams by using an event-driven architecture. With NATS JetStream, streams will be published by producer (publisher) systems and replay of the streams from persistent store will be delivered via consumer systems. In a nutshell, NATS JetStream is a distributed streaming system with publish/subscribe as the messaging pattern.
Although there are lot of existing streaming technologies are available, and some of them are good at somethings, most of these technologies are not much evolved to handle the future computing scenarios especially with IoT and Edge. For example, you may need a better security model when you publish and subscribe messages with IoT and Edge. And you may need a multi-tenancy and the support for multiple deployment models. NATS JetStream was designed for tackling many of these problems, which we have seen in today’s streaming technology stack.
Design goals
The official NATS documentation listed out the following as the design goals of NATS JetStream:
- The system must be easy to configure and operate and be observable.
- The system must be secure and operate well with NATS 2.0 security models.
- The system must scale horizontally and be applicable to a high ingestion rate.
- The system must support multiple use cases.
- The system must self heal and always be available.
- The system must have an API that is closer to core NATS.
- The system must allow NATS messages to be part of a stream as desired.
- The system must display payload agnostic behavior.
- The system must not have third party dependencies.
Single executable for running both NATS Server and NATS JetStream Server
With NATS Server v2.2.2, you can run basic NATS server (without persistence) and NATS JetStream server by using the same executable, named “nats-server”. When you simply run with nats-server
executable, it will run as the basic NATS server, and, when you run the nats-server
executable with-js
flag or with a configuration that enables JetStream, it will run the NATS server with the JetStream subsystem enabled.
Here’s the configuration file for JetStream:
Listing 1. js.conf file for enabling the JetStream subsystem
// enables jetstream, an empty block will enable and use defaults
jetstream {
// jetstream data will be in /data/nats-server/jetstream
store_dir: "/data/nats-server"// 1GB
max_memory_store: 1073741824// 10GB
max_file_store: 10737418240
}
The above configuration enables the JetStream subsystem for NATS server and configures /data/nats-server
directory as the store directory for persisting streams.
Here we run the nats-server
with JetStream subsystem by using js.conf
file:
Listing 2. Running the NATS server with js.conf
file
nats-server -c js.conf
Figure 1. NATS JetStream server is running
Single NATS client SDK for working with NATS and NATS JetStream
We have used separate client SDKs for working with both NATS server and NATS Streaming server. But with the Go SDK nats.go v1.11.0, you can use same library for working with basic NATS and NATS JetStream.
Here’s the code block that creates JetStream Context from the NATS conection:
Listing 3. Creates JetStream Context
import "github.com/nats-io/nats.go"nc, _ := nats.Connect(nats.DefaultURL)
js, err := nc.JetStream() // Returns JetStreamContext
The JetStreamContext
allows JetStream messaging and stream management. Once you create a JetStreamContext, you can easily work with the JetStream by using its Publish and Subscribe APIs.
Streams in JetStream
In JetStream, stream of events, are stored as Streams
, where many related subjects are stored in a single Stream
. For example, when you build an Order processing system, you can consider “ORDERS” as one Stream
, where you produce and consume streams of events to and from this Stream
with related subjects such as “ORDERS.created”, “ORDERS.approved”, “ORDERS.rejected”, “ORDERS.payment.debited” and “ORDERS.shipped”. So all of these related subjects ( “ORDERS.*”) belong to “ORDERS” Stream
.
The figure below shows the storage representation of “ORDERS” Stream with file storage at the server.
Figure 2. ORDERS Stream storage
Pull based consumer and Push based consumer
JetStream provides two kinds of consumer (subscriber) systems: Pull based consumer and Push based consumer. The Pull based consumers let JetStream pull the messages from consumer systems. Pull based consumer systems are like work queues. Because the JetStream provides a ACK (acknowledgment) mechanism, you can easily scale Pull based consumer systems horizontally without the problem of duplication of messages. Pull based subscription is new to the NATS ecosystem. The Push based consumers let JetStream pushing the messages to consumer systems, which can be a good choice for monitoring systems.
Here’s the JetStream mixed push/pull Order processing architecture from the NATS documentation:
Figure 3. JetStream mixed push/pull Order processing architecture
Image source: https://docs.nats.io/jetstream/concepts
Wildcard Subscriptions
NATS Streaming was lacking wildcard subscriptions for consumer systems. It’s really good to see that JetStream supports wildcard subscriptions.
The code block below shows wildcard subscription:
Listing 4. Wildcard subscription in JetStream
nc, _ := nats.Connect(nats.DefaultURL)
js, _ := nc.JetStream()
js.Subscribe("ORDERS.*", func(msg *nats.Msg) {
}
NATS CLI
The NATS ecosystem provides a new NATS CLI tool for administration needs. You can install this from source or using Homebrew tap.
Listing 5. Install NATS CLI using Homebrew tap
$ brew tap nats-io/nats-tools
$ brew install nats-io/nats-tools/nats
Here’s a introductory video tutorial on NATS CLI by JetStream team member R.I. Pienaar:
An example demo with NATS JetStream
Let’s write a simple example demo to understand how to working with NATS JetStream by using the nats.go Go client SDK.
Let’s set up the latest version of NATS Server and nats.go Go client SDK:
Listing 6. Download and install NATS Server and nats.go Go client SDK
# Go client latest
go get github.com/nats-io/nats.go/@latest
# For latest NATS Server, add /v2 at the end
go get github.com/nats-io/nats-server/v2
The example demo
In this simple example, we use one Stream “ORDERS”
to be used for subjects “ORDERS.*”
. And just for the sake of demo, we publish messages over the subjects “ORDERS.created”
and “ORDERS.approved”
. One system will publish the messages with “ORDERS.created"
subject. A Pull based consumer system subscribes the messages from “ORDERS.created"
subject, and being reactive to the events it does something and then publish another event with subject “ORDERS.approved"
.There is another Push based consumer system that subscribes the messages with a wildcard subscription “ORDERS.*”
so that all events published over the Stream “ORDERS”
can be received.
Creating Streams with JetStreamContext
Let’s first create a Stream “ORDERS”
to be used for subjects “ORDERS.*”
. You can create Stream by using administrative tools like NATS CLI or by using the NATS Client SDKs. Here we create the Stream by using nats.go
Go package.
Listing 7. Creates a Stream by using JetStreamContext
const (
streamName = "ORDERS"
streamSubjects = "ORDERS.*"
)// createStream creates a stream by using JetStreamContext
func createStream(js nats.JetStreamContext) error {
// Check if the ORDERS stream already exists; if not, create it.
stream, err := js.StreamInfo(streamName)
if err != nil {
log.Println(err)
}
if stream == nil {
log.Printf("creating stream %q and subjects %q", streamName, streamSubjects)
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{streamSubjects},
})
if err != nil {
return err
}
}
return nil
}
In the above code block, we create a new Stream by using the AddStream
method of JetStreamContext
if the Stream doesn’t exist. The JetStreamContext
can be created by calling the JetStream
method of the NATS connection object.
Listing 8. Creates JetStreamContext from NATS connection, and then calling createStream method to create a Stream.
// Connect to NATS
nc, _ := nats.Connect(nats.DefaultURL)
// Creates JetStreamContext
js, err := nc.JetStream()
err = createStream(js)
Publishing stream of events
The code block below publishes messages (stream of events) with subject “ORDERS.created” to let consumer systems to know that new orders are placed into our distributed systems environment, so that consumer systems can be reactive to those events by performing its own actions and publishing other set of events.
Listing 9. Creates publishes messages with subject “ORDERS.created”
const (
subjectName ="ORDERS.created"
)
// createOrder publishes stream of events
// with subject "ORDERS.created"
func createOrder(js nats.JetStreamContext) error{
var order model.Order
for i := 1; i <= 10; i++ {
order = model.Order{
OrderID: i,
CustomerID: "Cust-" + strconv.Itoa(i),
Status: "created",
}
orderJSON, _ := json.Marshal(order)
_, err := js.Publish(subjectName, orderJSON)
if err!=nil {
return err
}
log.Printf("Order with OrderID:%d has been published\n",i)
}
return nil
}
Here’s the Order entity being used in our example:
Listing 10. Order struct
type Order struct {
OrderID int
CustomerID string
Status string
}
Here’s the full code block of one example system that creates Stream and publish messages over the subject “ORDERS.created”
:
Listing 11. Creates Stream and publish messages
package mainimport (
"encoding/json"
"log"
"strconv"
"github.com/nats-io/nats.go"
"github.com/shijuvar/go-distsys/jsdemo/model"
)
const (
streamName = "ORDERS"
streamSubjects = "ORDERS.*"
subjectName ="ORDERS.created"
)
func main() {
// Connect to NATS
nc, _ := nats.Connect(nats.DefaultURL)
// Creates JetStreamContext
js, err := nc.JetStream()
checkErr(err)
// Creates stream
err = createStream(js)
checkErr(err)
// Create orders by publishing messages
err= createOrder(js)
checkErr(err)
}
// createOrder publishes stream of events
// with subject "ORDERS.created"
func createOrder(js nats.JetStreamContext) error{
var order model.Order
for i := 1; i <= 10; i++ {
order = model.Order{
OrderID: i,
CustomerID: "Cust-" + strconv.Itoa(i),
Status: "created",
}
orderJSON, _ := json.Marshal(order)
_, err := js.Publish(subjectName, orderJSON)
if err!=nil {
return err
}
log.Printf("Order with OrderID:%d has been published\n",i)
}
return nil
}
// createStream creates a stream by using JetStreamContext
func createStream(js nats.JetStreamContext) error {
// Check if the ORDERS stream already exists; if not, create it.
stream, err := js.StreamInfo(streamName)
if err != nil {
log.Println(err)
}
if stream == nil {
log.Printf("creating stream %q and subjects %q", streamName, streamSubjects)
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{streamSubjects},
})
if err != nil {
return err
}
}
return nil
}
func checkErr(err error) {
if err != nil {
log.Fatal(err)
}
}
Pull based consumer being reactive to events
An example Pull based consumer subscribes the messages from subject “ORDERS.created”
and finally publish messages over subject “ORDERS.approved”
.
Listing 12. Pull based subscriber subscribes messages and publish another event
package main
import (
"context"
"encoding/json"
"log"
"time"
"github.com/nats-io/nats.go"
"github.com/shijuvar/go-distsys/jsdemo/model"
)
const (
subSubjectName ="ORDERS.created"
pubSubjectName ="ORDERS.approved"
)
func main() {
// Connect to NATS
nc, _ := nats.Connect(nats.DefaultURL)
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
// Create Pull based consumer with maximum 128 inflight.
// PullMaxWaiting defines the max inflight pull requests.
sub, _ := js.PullSubscribe(subSubjectName, "order-review", nats.PullMaxWaiting(128))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return
default:
}
msgs, _ := sub.Fetch(10, nats.Context(ctx))
for _, msg := range msgs {
msg.Ack()
var order model.Order
err := json.Unmarshal(msg.Data, &order)
if err != nil {
log.Fatal(err)
}
log.Println("order-review service")
log.Printf("OrderID:%d, CustomerID: %s, Status:%s\n", order.OrderID, order.CustomerID, order.Status)
reviewOrder(js,order)
}
}
}
// reviewOrder reviews the order and publishes ORDERS.approved event
func reviewOrder(js nats.JetStreamContext, order model.Order) {
// Changing the Order status
order.Status ="approved"
orderJSON, _ := json.Marshal(order)
_, err := js.Publish(pubSubjectName, orderJSON)
if err != nil {
log.Fatal(err)
}
log.Printf("Order with OrderID:%d has been %s\n",order.OrderID, order.Status)
}
The PullSubscribe
method of JetStreamContext
creates a Subscription that can fetch messages. It will return a Subscription
object. The Fetch
method of Subscription
, pulls a batch of messages from a stream for a Pull consumer. The msg.Ack()
makes a manual acknowledgement to the server.
Push based consumer with wildcard subscription
Another consumer in our example demo, is a Push based subscriber that subscribes messages with wildcard subscription.
Listing 12. Push based subscriber with wildcard subscription
package main
import (
"encoding/json"
"log"
"runtime"
"github.com/nats-io/nats.go"
"github.com/shijuvar/go-distsys/jsdemo/model"
)
func main() {
// Connect to NATS
nc, _ := nats.Connect(nats.DefaultURL)
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
// Create durable consumer monitor
js.Subscribe("ORDERS.*", func(msg *nats.Msg) {
msg.Ack()
var order model.Order
err := json.Unmarshal(msg.Data, &order)
if err != nil {
log.Fatal(err)
}
log.Printf("monitor service subscribes from subject:%s\n", msg.Subject)
log.Printf("OrderID:%d, CustomerID: %s, Status:%s\n", order.OrderID, order.CustomerID, order.Status)
}, nats.Durable("monitor"),nats.ManualAck())
runtime.Goexit()
}
The Subscribe
API is a very familiar API for existing NATS developers who has already worked on NATS and NATS Streaming server. The Subscribe
will express interest in the given subject. The subject can have wildcards (partial:*, full:>). Messages will be delivered to the associated MsgHandler
. Here we have subscribed with wildcard “ORDERS.*”
so that all the messages published over the subjects like “ORDERS.created”
and “ORDERS.approved”
can be subscribed from consumer system. The nats.ManualAck()
configures manual acknowledgement for the messages so that we call msg.Ack()
to make manual acknowledgement. You can also create queue subscribers using the QueueSubscribe
API. With QueueSubscribe
, all subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message asynchronously.
Source Code
The source code of the example demo is available at: https://github.com/shijuvar/go-distsys/tree/master/jsdemo
Summary
When we think about using a distributed streaming system as the nervous system for building distributed systems, microservices based distributed systems, IoT based systems, next-generation Edge systems, you can consider to use NATS JetStream. In future computing, dealing with massive volume of stream of events and messages will be a big challenge especially with Edge computing. NATS JetStream provides the capabilities of distributed security, multi-tenancy and horizontally scaling. At this moment with the early release of NATS JetStream, some of the design goals not not fulfilled with clustering options. But with later releases and improvements, I hope that NATS JetStream fulfils all of its design goals and will be a highly competitive streaming technology. When considering the simplicity and the Adaptive Edge Architecture, I will definitely choose NATS JetStream over Kafka, and waiting for getting this technology more mature.