Building Event-Driven Distributed Systems in Go with gRPC, NATS JetStream and CockroachDB

In this post, I will give an overview about how to write event-driven distributed systems in Go, with gRPC, NATS JetStream and CockroachDB. As a Go developer and a solutions architect, my favorite technologies include gRPC, NATS ecosystem and CockroachDB amongst several technologies. I will use these technologies to demonstrate a simple demo on event-driven architecture to solve some of the practical challenges of building distributed systems in general, and Microservices based distributed systems in particular. This post will also give a brief idea about Event Sourcing and CQRS.

Practical challenges of building distributed systems and microservices

Challenges for managing distributed transactions and querying data from decentralised data stores

Even though you don’t follow Microservices architecture for building distributed systems, you may end with most of the difficulties which we have discussed in the preceding section.

Building event-driven architectures for building distributed systems

In event-driven architecture, an application is composed of independent components (a microservice or a component of a distributed system) that react to domain events (a state change of aggregates) published by other components. For example, when a new order is created from OrderService, it can publish a domain event “OrderCreated” (Order is created) to let other components know that a new Order has been created, and thus by reactive to those events, other systems can do their own actions. For example, a PaymentService can do its own action by reactive to the event “OrderCreated”, and it can publish another event something like “PaymentDebited” (Payment is debited from the customer account). By reactive to “PaymentDebited” event other components can do their own actions. Here, all of these domain events are facts that represent a state change of aggregates in the domain model. In event-driven architecture, persistence logic of the business transactions are pushed to the consumers or subscribers and producers or publishers of a distributed messaging/streaming systems like NATS/NATS JetStream, by responding to these domain events such as “OrderCreated”, “PaymentDebited”, “OrderApproved”, and “OrderShipped”.

Figure 1. Event-Driven Architetcure

Event Sourcing: An event-driven architecture on Event Store

Event Sourcing deals with an event store of immutable log of events (domain events), in which each log (a state change made to an object or to an aggregate of Domain-Driven Design) represents an application state. An event store is like a version control system. In a Microservices architecture, we can persist state changes of aggregates as a sequence of events. Events are facts, which represent some actions that happened in the system. These are immutable, which can’t be changed or be retracted. The examples of events in an e-Commerce system are OrderCreated, PaymentDebited, OrderApproved, OrderRejected, OrderShipped, OrderDelivered, etc.

The picture below shows an example Events table of an Event Store where eventtype column is used to specify domain events and aggregateid is used to specify ID of the aggregate, and the event data is persisted as a JSON document in the example demo being used in this post.

Figure 2. Example schema of an Event table for persisting events and event data

Figure 3. Event Store on events of an aggregate Order

In your Event Sourcing architecture, when you publish one event from one microservice, other microservices can be reactive to those events and publish another set of events. A single transaction in a Microservices based system may span into multiple microservices where we can perform a transaction as a sequence of events by building reactive microservices. Each state change of an aggregate or an object, can be treated as a domain event, which is an immutable fact about the state change of the application.

CQRS for building query model for the views of aggregates

A simple demo on event-driven distributed system in Go with gRPC, NATS JetStream and CockroachDB

The example demo is available here: https://github.com/shijuvar/go-distributed-sys.

I’ve simplified the example for the sake of a conceptual demo in order to simply understand things, and get some insight on how to use technologies like gRPC, NATS JetStream, etc. for building distributed systems.

NATS JetStream for distributed event streaming systems

The streaming systems let you capture the streams of events from distributed systems and microservices, IoT sensors, Edge devices and software applications, and persist these streams into persistent stores. Because you persist these streams of events into persistent stores, you can replay it for retrieval, processing, and reactive to those event streams by using an event-driven architecture. Because NATS JetStream replay streams of messages from a persistent store that comes with an acknowledgement mechanism of message delivery, it provides the delivery of messages with “At Least Once Delivery” guarantee. In traditional messaging systems, messages are buffering at the server and forwarding those messages to subscriber systems, where if any of the subscriber systems are down while publishing messages, then those messages will be missed from the messaging server. The NATS JetStream, which is a streaming system, guarantees the delivery of messages.

Streams in NATS JetStream

Figure 4. ORDERS Stream of file based storage of NATS JetStream Server

gRPC for building APIs based on RPC

Example demo

Figure 5. Directory structure of the event-driven example at eventstream directory

Here’re the basic workflow of the example:

  1. A client app post an Order to an HTTP API (ordersvc)
  2. An HTTP API (ordersvc) receives the order, then executes a command onto Event Store, which is an immutable log of events of domain events, to create an event via its gRPC API (eventstoresvc).
  3. The Event Store API (eventstoresvc) executes the command and then publishes an event “ORDERS.created” to the ORDERS stream of NATS JetStream server to let other services know that a domain event is created.
  4. The Payment worker (paymentworker) subscribes the event “ORDERS.created”, then make the payment, and then create another event “ORDERS.paymentdebited” via Event Store API.
  5. The Event Store API executes a command onto Event Store to create an event “ORDERS.paymentdebited” and publishes an event to NATS JetStream server to let other services know that the payment has been debited.
  6. The Query synchronising worker (querymodelworker) subscribes the event “ORDERS.created” that synchronise the query data model to provide state of the aggregates for query views.
  7. The review worker (reviewworker) subscribes the event “ORDERS.paymentdebited” and finally approves the order, and then create another event “ORDERS.approved” via Event Store API.
  8. A Saga coordinator manages the distributed transactions and makes void transactions on failures (to be implemented)

Event Store for Event Sourcing

Listing 1. Structure of the message Event in protocol buffers organised at eventstore directory

Listing 1. Protocol buffers definition of the message Event

message Event {
string event_id = 1;
string event_type = 2;
string aggregate_id = 3;
string aggregate_type = 4;
string event_data = 5;
string stream = 6;
}

The Event Store API (eventstoresvc) provides a gRPC API to persist events into the Event Store database (Command in the CQRS model), which is implemented with CockroachDB database. Here’s the basic code block in the CreateEvent RPC implementation:

Listing 2. CreateEvent RPC implementation for persisting domain events

// CreateEvent creates a new event into the event store
func (s *server) CreateEvent(ctx context.Context, eventRequest *eventstore.CreateEventRequest) (*eventstore.CreateEventResponse, error) {
err := s.repository.CreateEvent(eventRequest.Event)
if err != nil {
return nil, status.Error(codes.Internal, "internal error")
}
log.Println("Event is created")
go publishEvent(s.nats, eventRequest.Event)
return &eventstore.CreateEventResponse{IsSuccess: true, Error: ""}, nil
}
// publishEvent publishes an event via NATS JetStream server
func publishEvent(component *natsutil.NATSComponent, event *eventstore.Event) {
// Creates JetStreamContext to publish messages into Stream
jetStreamContext, _ := component.JetStreamContext()
subject := event.EventType
eventMsg := []byte(event.EventData)
// Publish message on subject (channel)
jetStreamContext.Publish(subject, eventMsg)
log.Println("Published message on subject: " + subject)
}

Whenever a new Event is persisted into the Event Store via its gRPC API, it publishes messages into the ORDERS stream of NATS JetStream server to let other microservices know that a new domain event is published so that subscriber systems which are interested into those domain events, subscribe the events, and could reactive to those events. In this simple example demo, the events are published into the streams of the NATS JetStream system from the Event Store API itself. In real-world scenarios, it might be from individual microservices, and sometimes it might be from a saga coordinator (Distributed Saga) that coordinates a single business transaction, which spans into multiple microservices.

Subscribe events for building reactive microservices

Pull based Consumer and Push based Consumer in NATS JetStream

A Push based Consumer

Listing 3. A Push based consumer consumes messages from a Stream ORDERS

// Creates JetStreamContext for create consumer 
jetStreamContext
, err := natsComponent.JetStreamContext()
if err != nil {
log.Fatal(err)
}
// Create push based consumer as durable
jetStreamContext.QueueSubscribe(subscribeSubject, queueGroup, func(msg *nats.Msg) {
msg.Ack()
var order ordermodel.Order
// Unmarshal JSON that represents the Order data
err := json.Unmarshal(msg.Data, &order)
if err != nil {
log.Print(err)
return
}
log.Printf("Message subscribed on subject:%s, from:%s, data:%v",
subscribeSubject, clientID, order)

// Create OrderPaymentDebitedCommand from Order
command := ordermodel.PaymentDebitedCommand{
OrderID: order.ID,
CustomerID: order.CustomerID,
Amount: order.Amount,
}
// Create ORDERS.paymentdebited event
if err := executePaymentDebitedCommand(command); err != nil {
log.Println("error occured while executing the PaymentDebited command")
}

}, nats.Durable(clientID), nats.ManualAck())

The QueueSubscribe API lets you create Push based consumers with a queue group name, in order to create load balanced consumers. In the preceding code block, A consumer system subscribes messages from the subject “ORDERS.created" and then it does its own actions and creates another event called “ORDERS.paymentdebited” by calling the gRPC API. Inside the gRPC API, it persist event and event data into the Event Store and publishes message into the ORDERS Stream of NATS JetStream Server.

The code block below shows the implementation of executePaymentDebitedCommand method

Listing 4. executePaymentDebitedCommand method from the paymentworker

func executePaymentDebitedCommand(command ordermodel.PaymentDebitedCommand) error {

conn, err := grpc.Dial(grpcUri, grpc.WithInsecure())
if err != nil {
log.Fatalf("Unable to connect: %v", err)
}
defer conn.Close()
client := eventstore.NewEventStoreClient(conn)
paymentJSON, _ := json.Marshal(command)
eventid, _ := uuid.NewUUID()
event := &eventstore.Event{
EventId: eventid.String(),
EventType: event,
AggregateId: command.OrderID,
AggregateType: aggregate,
EventData: string(paymentJSON),
Stream: stream,
}
createEventRequest := &eventstore.CreateEventRequest{Event: event}

resp, err := client.CreateEvent(context.Background(), createEventRequest)
if err != nil {
return fmt.Errorf("error from RPC server: %w", err)
}
if resp.IsSuccess {
return nil
}
return errors.New("error from RPC server")
}

A Pull based consumer: Subscribe events for building query model for the views of aggregates

Listing 5. A Pull based consumer for syncing data for the query model

func pullSubscribeOnOrder(js nats.JetStreamContext) {
// Create Pull based consumer with maximum 128 inflight.
// PullMaxWaiting defines the max inflight pull requests.
sub, _ := js.PullSubscribe(subscribeSubject, clientID, nats.PullMaxWaiting(128))
for {

msgs, _ := sub.Fetch(batch)
for _, msg := range msgs {
msg.Ack()
var order ordermodel.Order
// Unmarshal JSON that represents the Order data
err := json.Unmarshal(msg.Data, &order)
if err != nil {
log.Print(err)
return
}
log.Printf("Message subscribed on subject:%s, from:%s, data:%v", subscribeSubject, clientID, order)
orderDB, _ := sqldb.NewOrdersDB()
repository, _ := ordersyncrepository.New(orderDB.DB)
// Sync query model with event data
if err := repository.CreateOrder(context.Background(), order); err != nil {
log.Printf("Error while replicating the query model %+v", err)
}
}
}
}

The PullSubscribe API of JetStream lets you create Pull based consumers. Once you create a Pull based consumers. Once you create a Pull based consumer, a call to the method Fetch pulls a batch of messages from a stream of NATS JetStream. In the preceding code block, messages are fetched from ORDERS stream, and acknowledges to the Server using method msg.Ack() and then executes some logic to sync the data model with the events stored in Event Store and creates a denormalized views of aggregates by persisting data into the data store to be used for Query model in CQRS architecture.

Persistent store with CockroachDB

Listing 6. Transaction implementation in CockroachDB using package crdb

// CreateOrder persist Order data into the query model
func (repo *repository) CreateOrder(ctx context.Context, order order.Order) error {

// Run a transaction to sync the query model.
err := crdb.ExecuteTx(ctx, repo.db, nil, func(tx *sql.Tx) error {
return createOrder(tx, order)
})
if err != nil {
return err
}
return nil
}

func createOrder(tx *sql.Tx, order order.Order) error {

// Insert into the "orders" table.
sql := `
INSERT INTO orders (id, customerid, status, createdon, restaurantid, amount)
VALUES ($1,$2,$3,$4,$5,$6)`
_, err := tx.Exec(sql, order.ID, order.CustomerID, order.Status, order.CreatedOn, order.RestaurantId, order.Amount)
if err != nil {
return err
}
// Insert items into the "orderitems" table.
// Because it's store for read model, we can insert denormalized data
for _, v := range order.OrderItems {
sql = `
INSERT INTO orderitems (orderid, customerid, code, name, unitprice, quantity)
VALUES ($1,$2,$3,$4,$5,$6)`

_, err := tx.Exec(sql, order.ID, order.CustomerID, v.ProductCode, v.Name, v.UnitPrice, v.Quantity)
if err != nil {
return err
}
}
return nil
}

Source Code

Summary

When you build distributed systems, don’t try to become a poster child of any buzzwords and named patterns, including Event Sourcing and CQRS. Instead of it, try to learn all of the named patterns, and then understand what it does and does not, with advantages and limitations. When you build your own systems, try to provide a context-driven architecture for your own unique problems rather than becoming a poster child because there is not any perfect solution architecture existed. Distributed systems based on Microservices architecture, is really painful. Event Sourcing and CQRS are painful. When you choose a solution architecture, a pragmatic and middle ground solution might be better choice rather than blindly following some buzzwords, intellectual thoughts and named patterns.

You can follow me on twitter at @shijucv. As a Consulting Solutions Architect, I do provide training and consulting on Go programming language (Golang) and distributed systems architectures, in India.

--

--

Consulting Solutions Architect and Trainer on Go and Distributed Systems, with focus on Microservices and Event-Driven Architectures. Author of two books on Go.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Shiju Varghese

Consulting Solutions Architect and Trainer on Go and Distributed Systems, with focus on Microservices and Event-Driven Architectures. Author of two books on Go.