Using NATS JetStream Key/Value Store in Go

Shiju Varghese
6 min readNov 18, 2022

--

In this post, I will provide a brief introduction to the NATS JetStream Key/Value Store using an example with Go programming language. If you’re using NATS ecosystem in your systems architecture, you can further leverage the Key/Value Store as a mechanism for a consistent key/value store for various use cases like dynamic configurations for distributed systems and microservices.

Introduction to JetStream Key/Value Store

The NATS JetSteam, the persistence layer of NATS, provides a distributed streaming platform that lets you capture streams of messages (stream of events) from distributed systems, IoT sensors and Edge devices, and persist these data streams into persistent stores. Because you persist these data streams into persistent stores, you can replay it for retrieval, processing, and reactive to those stream of events by using an event-driven architecture.

The JetSteam platform now introduces a Key/Value Store, which allows applications to create buckets and use them as immediately consistent, Key/Value store by leveraging the existing capabilities of JetSteam.

Persistent Store of Key/Value Store

The Key/Value store uses Stream of JetStream to persist Key/Value store using buckets. The bucket of Key/Value store persist as a Stream of JetStream. For example, when you create a bucket with name discoveey for persisting Key/Value store associated with that particular bucket, it will create a Stream with name KV_discovery.

The figure below shows the storage representation of a bucket “discovery” as a Stream with file storage at the server.

Figure 1. Stream for a Key/Value store bucket “discovery”

You can persist and retrieve to and from the Key/Value store using the NATS CLI tool as well as programmatically using a Client SDK.

JetStream Key/Value Store example in Go

Let’s write a simple example to understand how to working with NATS JetStream Key/Value Store by using the nats.go Go client SDK.

Let’s set up the latest version of NATS Server and nats.go Go client SDK:

Listing 1. Install the NATS Server

# go install the latest NATS server
go install github.com/nats-io/nats-server/v2@latest

Listing 2. Install the latest nats.go Go client SDK in the project

# go get nats.go Go client SDK package
go get github.com/nats-io/nats.go/@latest

In order to use Key/Value Store, you should run the NATS server enabled with JetStream. You can run the nats-server with js flag enabled (nats-server -js) or with a configuration that enabled JetStream (jetstream {})

Figure 2. NATS JetStream Server is running

Example demo in Go

In this example, we create a bucket named discovery so that a stream with name KV_discovery will be created for persisting the Key/Value store values associated with that bucket.

The code block below creates a Key/Value store with a bucket named discovery if it doesn’t exist, otherwise it creates a Key/Value store by providing the bucket name.

Listing 3. Create a Key/Value Store with a bucket

 url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}

nc, _ := nats.Connect(url)
defer nc.Drain()

js, _ := nc.JetStream()
if stream, _ := js.StreamInfo("KV_discovery"); stream == nil {
// A key-value (KV) bucket is created by specifying a bucket name.
kv, _ = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "discovery",
})
} else {
kv, _ = js.KeyValue("discovery")
}

Putting and getting Key/Value to and from the Store

The Key/Value Store interface provides the standard Put and Get methods, with a revision number of the entry. The code block below puts a Key/Value values and gets the value and corresponding information.

Listing 4. Using Put and Get methods for putting and getting data

kv.Put("services.orders", []byte("https://localhost:8080/orders"))
entry, _ := kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

kv.Put("services.orders", []byte("https://localhost:8080/v1/orders"))
entry, _ = kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

You can also use the Put method for updating the data with a key. At the same time, there is an Update method that updates the value only if the latest revision matches. If the given revision number is wrong, you will get an error value.

The code block below update a value by providing a matching latest revision and getting the value and corresponding information.

Listing 5. Updating the value with matching revision number

kv.Update("services.orders", []byte("https://localhost:8080/v2/orders"), 2)
entry, _ = kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

The code block below provides the full source code of the simple example that uses Put, Get, Update and Delete methods.

Listing 6. Example code on Put, Get, Update and Delete methods

package main

import (
"fmt"
"os"

"github.com/nats-io/nats.go"
)

func main() {
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}

nc, _ := nats.Connect(url)
defer nc.Drain()

js, _ := nc.JetStream()
var kv nats.KeyValue
// Creates a Key/Value store with a bucket named discovery if it doesn't exist.
// Otherwise, it creates a Key/Value store by providing the existing bucket name.
if stream, _ := js.StreamInfo("KV_discovery"); stream == nil {
// A key-value (KV) bucket is created by specifying a bucket name.
kv, _ = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "discovery",
})
} else {
kv, _ = js.KeyValue("discovery")
}
// `KeyValue` interface provides the
// standard `Put` and `Get` methods, with a revision number of the entry.
kv.Put("services.orders", []byte("https://localhost:8080/orders"))
entry, _ := kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

kv.Put("services.orders", []byte("https://localhost:8080/v1/orders"))
entry, _ = kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

_, err := kv.Update("services.orders", []byte("https://localhost:8080/v1/orders"), 1)
fmt.Printf("expected error because of wrong revision: %s\n", err)

kv.Update("services.orders", []byte("https://localhost:8080/v2/orders"), 2)
entry, _ = kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

name := <-js.StreamNames()
fmt.Printf("KV stream name: %s\n", name)

kv.Put("services.customers", []byte("https://localhost:8080/v2/customers"))
entry, _ = kv.Get("services.customers")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

kv.Delete("services.customers")
entry, err = kv.Get("services.customers")
if err == nil {
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
}
}

The code block below removes the bucket named discovery.

Listing 7. Removes a bucket

if err := js.DeleteKeyValue("discovery"); err != nil {
fmt.Println(err)
}

Creating a Key Watcher for Key/Value store changes

The Key/Value Store interface allows you to create a watcher for subscribing changes made on the Key/Value Store. The Watch method watch for any updates to keys that match the keys argument which could include wildcards. Watch will send a nil entry when it has received all initial values.

The code block below creates a key watcher with wildcard “services.*”

Listing 8. Key Watcher for Key/Value store changes

package main

import (
"fmt"
"os"
"runtime"

"github.com/nats-io/nats.go"
)

func main() {
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}

nc, _ := nats.Connect(url)
defer nc.Drain()

js, _ := nc.JetStream()
var kv nats.KeyValue
if stream, _ := js.StreamInfo("KV_discovery"); stream == nil {
// A key-value (KV) bucket is created by specifying a bucket name.
kv, _ = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "discovery",
})
} else {
kv, _ = js.KeyValue("discovery")
}
// KeyWatcher for the wildcard "services.*"
w, _ := kv.Watch("services.*")
defer w.Stop()
for kve := range w.Updates() {
if kve != nil {
fmt.Printf("%s @ %d -> %q (op: %s)\n", kve.Key(), kve.Revision(), string(kve.Value()), kve.Operation())
}

}
runtime.Goexit()
}

The figure below shows the output of the code Listing 6

Figure 3. Output the of the code Listing 6

The figure below shows the output of the code Listing 8 (Key Watcher)

Figure 3. Output the of the code Listing 8

Source Code

The full source code of the example is available form here.

--

--

Shiju Varghese
Shiju Varghese

Written by Shiju Varghese

Consulting Solutions Architect and Trainer on Go and Distributed Systems, with focus on Microservices and Event-Driven Architectures. Author of two books on Go.

No responses yet