Using NATS JetStream Key/Value Store in Go

Introduction to JetStream Key/Value Store

The NATS JetSteam, the persistence layer of NATS, provides a distributed streaming platform that lets you capture streams of messages (stream of events) from distributed systems, IoT sensors and Edge devices, and persist these data streams into persistent stores. Because you persist these data streams into persistent stores, you can replay it for retrieval, processing, and reactive to those stream of events by using an event-driven architecture.

Persistent Store of Key/Value Store

The Key/Value store uses Stream of JetStream to persist Key/Value store using buckets. The bucket of Key/Value store persist as a Stream of JetStream. For example, when you create a bucket with name discoveey for persisting Key/Value store associated with that particular bucket, it will create a Stream with name KV_discovery.

JetStream Key/Value Store example in Go

Let’s write a simple example to understand how to working with NATS JetStream Key/Value Store by using the nats.go Go client SDK.

# go install the latest NATS server
go install github.com/nats-io/nats-server/v2@latest
# go get nats.go Go client SDK package
go get github.com/nats-io/nats.go/@latest

Example demo in Go

In this example, we create a bucket named discovery so that a stream with name KV_discovery will be created for persisting the Key/Value store values associated with that bucket.

 url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}

nc, _ := nats.Connect(url)
defer nc.Drain()

js, _ := nc.JetStream()
if stream, _ := js.StreamInfo("KV_discovery"); stream == nil {
// A key-value (KV) bucket is created by specifying a bucket name.
kv, _ = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "discovery",
})
} else {
kv, _ = js.KeyValue("discovery")
}
kv.Put("services.orders", []byte("https://localhost:8080/orders"))
entry, _ := kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

kv.Put("services.orders", []byte("https://localhost:8080/v1/orders"))
entry, _ = kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
kv.Update("services.orders", []byte("https://localhost:8080/v2/orders"), 2)
entry, _ = kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
package main

import (
"fmt"
"os"

"github.com/nats-io/nats.go"
)

func main() {
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}

nc, _ := nats.Connect(url)
defer nc.Drain()

js, _ := nc.JetStream()
var kv nats.KeyValue
// Creates a Key/Value store with a bucket named discovery if it doesn't exist.
// Otherwise, it creates a Key/Value store by providing the existing bucket name.
if stream, _ := js.StreamInfo("KV_discovery"); stream == nil {
// A key-value (KV) bucket is created by specifying a bucket name.
kv, _ = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "discovery",
})
} else {
kv, _ = js.KeyValue("discovery")
}
// `KeyValue` interface provides the
// standard `Put` and `Get` methods, with a revision number of the entry.
kv.Put("services.orders", []byte("https://localhost:8080/orders"))
entry, _ := kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

kv.Put("services.orders", []byte("https://localhost:8080/v1/orders"))
entry, _ = kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

_, err := kv.Update("services.orders", []byte("https://localhost:8080/v1/orders"), 1)
fmt.Printf("expected error because of wrong revision: %s\n", err)

kv.Update("services.orders", []byte("https://localhost:8080/v2/orders"), 2)
entry, _ = kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

name := <-js.StreamNames()
fmt.Printf("KV stream name: %s\n", name)

kv.Put("services.customers", []byte("https://localhost:8080/v2/customers"))
entry, _ = kv.Get("services.customers")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

kv.Delete("services.customers")
entry, err = kv.Get("services.customers")
if err == nil {
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
}
}
if err := js.DeleteKeyValue("discovery"); err != nil {
fmt.Println(err)
}

Creating a Key Watcher for Key/Value store changes

The Key/Value Store interface allows you to create a watcher for subscribing changes made on the Key/Value Store. The Watch method watch for any updates to keys that match the keys argument which could include wildcards. Watch will send a nil entry when it has received all initial values.

package main

import (
"fmt"
"os"
"runtime"

"github.com/nats-io/nats.go"
)

func main() {
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}

nc, _ := nats.Connect(url)
defer nc.Drain()

js, _ := nc.JetStream()
var kv nats.KeyValue
if stream, _ := js.StreamInfo("KV_discovery"); stream == nil {
// A key-value (KV) bucket is created by specifying a bucket name.
kv, _ = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "discovery",
})
} else {
kv, _ = js.KeyValue("discovery")
}
// KeyWatcher for the wildcard "services.*"
w, _ := kv.Watch("services.*")
defer w.Stop()
for kve := range w.Updates() {
if kve != nil {
fmt.Printf("%s @ %d -> %q (op: %s)\n", kve.Key(), kve.Revision(), string(kve.Value()), kve.Operation())
}

}
runtime.Goexit()
}

Source Code

The full source code of the example is available form here.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Shiju Varghese

Shiju Varghese

2.8K Followers

Consulting Solutions Architect and Trainer on Go and Distributed Systems, with focus on Microservices and Event-Driven Architectures. Author of two books on Go.