Using NATS JetStream Key/Value Store in Go

In this post, I will provide a brief introduction to the NATS JetStream Key/Value Store using an example with Go programming language. If you’re using NATS ecosystem in your systems architecture, you can further leverage the Key/Value Store as a mechanism for a consistent key/value store for various use cases like dynamic configurations for distributed systems and microservices.

Introduction to JetStream Key/Value Store

The JetSteam platform now introduces a Key/Value Store, which allows applications to create buckets and use them as immediately consistent, Key/Value store by leveraging the existing capabilities of JetSteam.

Persistent Store of Key/Value Store

The figure below shows the storage representation of a bucket “discovery” as a Stream with file storage at the server.

Figure 1. Stream for a Key/Value store bucket “discovery”

You can persist and retrieve to and from the Key/Value store using the NATS CLI tool as well as programmatically using a Client SDK.

JetStream Key/Value Store example in Go

Let’s set up the latest version of NATS Server and nats.go Go client SDK:

Listing 1. Install the NATS Server

# go install the latest NATS server
go install github.com/nats-io/nats-server/v2@latest

Listing 2. Install the latest nats.go Go client SDK in the project

# go get nats.go Go client SDK package
go get github.com/nats-io/nats.go/@latest

In order to use Key/Value Store, you should run the NATS server enabled with JetStream. You can run the nats-server with js flag enabled (nats-server -js) or with a configuration that enabled JetStream (jetstream {})

Figure 2. NATS JetStream Server is running

Example demo in Go

The code block below creates a Key/Value store with a bucket named discovery if it doesn’t exist, otherwise it creates a Key/Value store by providing the bucket name.

Listing 3. Create a Key/Value Store with a bucket

 url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}

nc, _ := nats.Connect(url)
defer nc.Drain()

js, _ := nc.JetStream()
if stream, _ := js.StreamInfo("KV_discovery"); stream == nil {
// A key-value (KV) bucket is created by specifying a bucket name.
kv, _ = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "discovery",
})
} else {
kv, _ = js.KeyValue("discovery")
}

Putting and getting Key/Value to and from the Store

The Key/Value Store interface provides the standard Put and Get methods, with a revision number of the entry. The code block below puts a Key/Value values and gets the value and corresponding information.

Listing 4. Using Put and Get methods for putting and getting data

kv.Put("services.orders", []byte("https://localhost:8080/orders"))
entry, _ := kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

kv.Put("services.orders", []byte("https://localhost:8080/v1/orders"))
entry, _ = kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

You can also use the Put method for updating the data with a key. At the same time, there is an Update method that updates the value only if the latest revision matches. If the given revision number is wrong, you will get an error value.

The code block below update a value by providing a matching latest revision and getting the value and corresponding information.

Listing 5. Updating the value with matching revision number

kv.Update("services.orders", []byte("https://localhost:8080/v2/orders"), 2)
entry, _ = kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

The code block below provides the full source code of the simple example that uses Put, Get, Update and Delete methods.

Listing 6. Example code on Put, Get, Update and Delete methods

package main

import (
"fmt"
"os"

"github.com/nats-io/nats.go"
)

func main() {
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}

nc, _ := nats.Connect(url)
defer nc.Drain()

js, _ := nc.JetStream()
var kv nats.KeyValue
// Creates a Key/Value store with a bucket named discovery if it doesn't exist.
// Otherwise, it creates a Key/Value store by providing the existing bucket name.
if stream, _ := js.StreamInfo("KV_discovery"); stream == nil {
// A key-value (KV) bucket is created by specifying a bucket name.
kv, _ = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "discovery",
})
} else {
kv, _ = js.KeyValue("discovery")
}
// `KeyValue` interface provides the
// standard `Put` and `Get` methods, with a revision number of the entry.
kv.Put("services.orders", []byte("https://localhost:8080/orders"))
entry, _ := kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

kv.Put("services.orders", []byte("https://localhost:8080/v1/orders"))
entry, _ = kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

_, err := kv.Update("services.orders", []byte("https://localhost:8080/v1/orders"), 1)
fmt.Printf("expected error because of wrong revision: %s\n", err)

kv.Update("services.orders", []byte("https://localhost:8080/v2/orders"), 2)
entry, _ = kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

name := <-js.StreamNames()
fmt.Printf("KV stream name: %s\n", name)

kv.Put("services.customers", []byte("https://localhost:8080/v2/customers"))
entry, _ = kv.Get("services.customers")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

kv.Delete("services.customers")
entry, err = kv.Get("services.customers")
if err == nil {
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
}
}

The code block below removes the bucket named discovery.

Listing 7. Removes a bucket

if err := js.DeleteKeyValue("discovery"); err != nil {
fmt.Println(err)
}

Creating a Key Watcher for Key/Value store changes

The code block below creates a key watcher with wildcard “services.*”

Listing 8. Key Watcher for Key/Value store changes

package main

import (
"fmt"
"os"
"runtime"

"github.com/nats-io/nats.go"
)

func main() {
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}

nc, _ := nats.Connect(url)
defer nc.Drain()

js, _ := nc.JetStream()
var kv nats.KeyValue
if stream, _ := js.StreamInfo("KV_discovery"); stream == nil {
// A key-value (KV) bucket is created by specifying a bucket name.
kv, _ = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "discovery",
})
} else {
kv, _ = js.KeyValue("discovery")
}
// KeyWatcher for the wildcard "services.*"
w, _ := kv.Watch("services.*")
defer w.Stop()
for kve := range w.Updates() {
if kve != nil {
fmt.Printf("%s @ %d -> %q (op: %s)\n", kve.Key(), kve.Revision(), string(kve.Value()), kve.Operation())
}

}
runtime.Goexit()
}

The figure below shows the output of the code Listing 6

Figure 3. Output the of the code Listing 6

The figure below shows the output of the code Listing 8 (Key Watcher)

Figure 3. Output the of the code Listing 8

Source Code

--

--

Consulting Solutions Architect and Trainer on Go and Distributed Systems, with focus on Microservices and Event-Driven Architectures. Author of two books on Go.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Shiju Varghese

Consulting Solutions Architect and Trainer on Go and Distributed Systems, with focus on Microservices and Event-Driven Architectures. Author of two books on Go.