Using NATS JetStream Key/Value Store in Go
In this post, I will provide a brief introduction to the NATS JetStream Key/Value Store using an example with Go programming language. If you’re using NATS ecosystem in your systems architecture, you can further leverage the Key/Value Store as a mechanism for a consistent key/value store for various use cases like dynamic configurations for distributed systems and microservices.
Introduction to JetStream Key/Value Store
The NATS JetSteam, the persistence layer of NATS, provides a distributed streaming platform that lets you capture streams of messages (stream of events) from distributed systems, IoT sensors and Edge devices, and persist these data streams into persistent stores. Because you persist these data streams into persistent stores, you can replay it for retrieval, processing, and reactive to those stream of events by using an event-driven architecture.
The JetSteam platform now introduces a Key/Value Store, which allows applications to create buckets
and use them as immediately consistent, Key/Value store by leveraging the existing capabilities of JetSteam.
Persistent Store of Key/Value Store
The Key/Value store uses Stream
of JetStream to persist Key/Value store using buckets. The bucket
of Key/Value store persist as a Stream
of JetStream. For example, when you create a bucket with name discoveey
for persisting Key/Value store associated with that particular bucket, it will create a Stream
with name KV_discovery
.
The figure below shows the storage representation of a bucket
“discovery” as a Stream
with file storage at the server.
Figure 1. Stream for a Key/Value store bucket “discovery”
You can persist and retrieve to and from the Key/Value store using the NATS CLI tool as well as programmatically using a Client SDK.
JetStream Key/Value Store example in Go
Let’s write a simple example to understand how to working with NATS JetStream Key/Value Store by using the nats.go Go client SDK.
Let’s set up the latest version of NATS Server and nats.go Go client SDK:
Listing 1. Install the NATS Server
# go install the latest NATS server
go install github.com/nats-io/nats-server/v2@latest
Listing 2. Install the latest nats.go Go client SDK in the project
# go get nats.go Go client SDK package
go get github.com/nats-io/nats.go/@latest
In order to use Key/Value Store, you should run the NATS server enabled with JetStream. You can run the nats-server
with js
flag enabled (nats-server -js
) or with a configuration that enabled JetStream (jetstream {}
)
Figure 2. NATS JetStream Server is running
Example demo in Go
In this example, we create a bucket
named discovery so that a stream
with name KV_discovery
will be created for persisting the Key/Value store values associated with that bucket
.
The code block below creates a Key/Value store with a bucket
named discovery if it doesn’t exist, otherwise it creates a Key/Value store by providing the bucket
name.
Listing 3. Create a Key/Value Store with a bucket
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}
nc, _ := nats.Connect(url)
defer nc.Drain()
js, _ := nc.JetStream()
if stream, _ := js.StreamInfo("KV_discovery"); stream == nil {
// A key-value (KV) bucket is created by specifying a bucket name.
kv, _ = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "discovery",
})
} else {
kv, _ = js.KeyValue("discovery")
}
Putting and getting Key/Value to and from the Store
The Key/Value Store interface provides the standard Put
and Get
methods, with a revision number of the entry. The code block below puts a Key/Value values and gets the value and corresponding information.
Listing 4. Using Put and Get methods for putting and getting data
kv.Put("services.orders", []byte("https://localhost:8080/orders"))
entry, _ := kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
kv.Put("services.orders", []byte("https://localhost:8080/v1/orders"))
entry, _ = kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
You can also use the Put
method for updating the data with a key. At the same time, there is an Update
method that updates the value only if the latest revision matches. If the given revision number is wrong, you will get an error value.
The code block below update a value by providing a matching latest revision and getting the value and corresponding information.
Listing 5. Updating the value with matching revision number
kv.Update("services.orders", []byte("https://localhost:8080/v2/orders"), 2)
entry, _ = kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
The code block below provides the full source code of the simple example that uses Put
, Get
, Update
and Delete
methods.
Listing 6. Example code on Put, Get, Update and Delete methods
package main
import (
"fmt"
"os"
"github.com/nats-io/nats.go"
)
func main() {
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}
nc, _ := nats.Connect(url)
defer nc.Drain()
js, _ := nc.JetStream()
var kv nats.KeyValue
// Creates a Key/Value store with a bucket named discovery if it doesn't exist.
// Otherwise, it creates a Key/Value store by providing the existing bucket name.
if stream, _ := js.StreamInfo("KV_discovery"); stream == nil {
// A key-value (KV) bucket is created by specifying a bucket name.
kv, _ = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "discovery",
})
} else {
kv, _ = js.KeyValue("discovery")
}
// `KeyValue` interface provides the
// standard `Put` and `Get` methods, with a revision number of the entry.
kv.Put("services.orders", []byte("https://localhost:8080/orders"))
entry, _ := kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
kv.Put("services.orders", []byte("https://localhost:8080/v1/orders"))
entry, _ = kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
_, err := kv.Update("services.orders", []byte("https://localhost:8080/v1/orders"), 1)
fmt.Printf("expected error because of wrong revision: %s\n", err)
kv.Update("services.orders", []byte("https://localhost:8080/v2/orders"), 2)
entry, _ = kv.Get("services.orders")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
name := <-js.StreamNames()
fmt.Printf("KV stream name: %s\n", name)
kv.Put("services.customers", []byte("https://localhost:8080/v2/customers"))
entry, _ = kv.Get("services.customers")
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
kv.Delete("services.customers")
entry, err = kv.Get("services.customers")
if err == nil {
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
}
}
The code block below removes the bucket
named discovery.
Listing 7. Removes a bucket
if err := js.DeleteKeyValue("discovery"); err != nil {
fmt.Println(err)
}
Creating a Key Watcher for Key/Value store changes
The Key/Value Store interface allows you to create a watcher for subscribing changes made on the Key/Value Store. The Watch
method watch for any updates to keys that match the keys argument which could include wildcards. Watch will send a nil entry when it has received all initial values.
The code block below creates a key watcher with wildcard “services.*”
Listing 8. Key Watcher for Key/Value store changes
package main
import (
"fmt"
"os"
"runtime"
"github.com/nats-io/nats.go"
)
func main() {
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}
nc, _ := nats.Connect(url)
defer nc.Drain()
js, _ := nc.JetStream()
var kv nats.KeyValue
if stream, _ := js.StreamInfo("KV_discovery"); stream == nil {
// A key-value (KV) bucket is created by specifying a bucket name.
kv, _ = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "discovery",
})
} else {
kv, _ = js.KeyValue("discovery")
}
// KeyWatcher for the wildcard "services.*"
w, _ := kv.Watch("services.*")
defer w.Stop()
for kve := range w.Updates() {
if kve != nil {
fmt.Printf("%s @ %d -> %q (op: %s)\n", kve.Key(), kve.Revision(), string(kve.Value()), kve.Operation())
}
}
runtime.Goexit()
}
The figure below shows the output of the code Listing 6
Figure 3. Output the of the code Listing 6
The figure below shows the output of the code Listing 8 (Key Watcher)
Figure 3. Output the of the code Listing 8
Source Code
The full source code of the example is available form here.