Building Event-Driven Distributed Systems in Go with gRPC, NATS JetStream and CockroachDB
In this post, I will give an overview about how to write event-driven distributed systems in Go, with gRPC, NATS JetStream and CockroachDB. As a Go developer and a solutions architect, my favorite technologies include gRPC, NATS ecosystem and CockroachDB amongst several technologies. I will use these technologies to demonstrate a simple demo on event-driven architecture to solve some of the practical challenges of building distributed systems in general, and Microservices based distributed systems in particular. This post will also give a brief idea about Event Sourcing and CQRS.
Practical challenges of building distributed systems and microservices
Although people who have never worked on distributed systems from an engineering perspective, have been aggressively claiming and marketing that Microservices architecture is all about containers and Kubernetes, in reality, Microservices architecture is all about functional decomposition. Although we may use containers and Kubernetes for infrastructure, the fundamental idea of Microservices architecture is functional decomposition for building highly-scalable systems. Because we decompose larger systems into several autonomous services, it may bring new kinds of complexities, especially this may make it hard for making transactions that span in multiple microservices, and querying data where data is scattered in multiple databases owned by individual microservices.
Challenges for managing distributed transactions and querying data from decentralised data stores
Because we broke up a monolithic system into several autonomous services, the data is scattered amongst several databases owned by individual microservices. This makes lot of complexity to your applications and architecture. For example, a business transaction may span into multiple microservices. Let’s say you build an e-commerce system with Microservices architecture where placing an order would be initially handled by OrderService — a microservice, then payment processing might be done by another service — a PaymentService, and so on. And another challenge is querying data from multiple databases. With a monolithic database, you can easily perform join queries from a single database. Because the monolithic database is moved into several databases as part of the decomposition of functional components, you can’t simply execute join queries, thus you must get data from multiple databases. Here you don’t have any centralized database.
Even though you don’t follow Microservices architecture for building distributed systems, you may end with most of the difficulties which we have discussed in the preceding section.
Building event-driven architectures for building distributed systems
In a distributed system, various systems and components of an application are distributed into different networked computers, and thus we need a lot of inter-process communication in order to coordinate their actions by passing messages to one another. Although RPC is one viable option for making inter-process communication, this approach may not be adequate when complex business transactions are performed, where a single business transaction may spans into multiple systems and services, which may lead to persistence with multiple databases because of the isolated persistence model of microservices and individual systems. In this context, an event-driven architecture is a better approach for handling the complexity and managing the failures while performing transactions.
In event-driven architecture, an application is composed of independent components (a microservice or a component of a distributed system) that react to domain events (a state change of aggregates) published by other components. For example, when a new order is created from OrderService, it can publish a domain event “OrderCreated” (Order is created) to let other components know that a new Order has been created, and thus by reactive to those events, other systems can do their own actions. For example, a PaymentService can do its own action by reactive to the event “OrderCreated”, and it can publish another event something like “PaymentDebited” (Payment is debited from the customer account). By reactive to “PaymentDebited” event other components can do their own actions. Here, all of these domain events are facts that represent a state change of aggregates in the domain model. In event-driven architecture, persistence logic of the business transactions are pushed to the consumers or subscribers and producers or publishers of a distributed messaging/streaming systems like NATS/NATS JetStream, by responding to these domain events such as “OrderCreated”, “PaymentDebited”, “OrderApproved”, and “OrderShipped”.
Figure 1. Event-Driven Architetcure
Event Sourcing: An event-driven architecture on Event Store
Even Sourcing is an event-driven architecture based on the idea of an event store to be used for persisting events and corresponding event data. In the traditional model of application engineering, we persist domain entities into persistent stores. But in Event Sourcing, the persistence is focused on events and event data.
Event Sourcing deals with an event store of immutable log of events (domain events), in which each log (a state change made to an object or to an aggregate of Domain-Driven Design) represents an application state. An event store is like a version control system. In a Microservices architecture, we can persist state changes of aggregates as a sequence of events. Events are facts, which represent some actions that happened in the system. These are immutable, which can’t be changed or be retracted. The examples of events in an e-Commerce system are OrderCreated, PaymentDebited, OrderApproved, OrderRejected, OrderShipped, OrderDelivered, etc.
The picture below shows an example Events table of an Event Store where eventtype column is used to specify domain events and aggregateid is used to specify ID of the aggregate, and the event data is persisted as a JSON document in the example demo being used in this post.
Figure 2. Example schema of an Event table for persisting events and event data
Figure 3. Event Store on events of an aggregate Order
In your Event Sourcing architecture, when you publish one event from one microservice, other microservices can be reactive to those events and publish another set of events. A single transaction in a Microservices based system may span into multiple microservices where we can perform a transaction as a sequence of events by building reactive microservices. Each state change of an aggregate or an object, can be treated as a domain event, which is an immutable fact about the state change of the application.
CQRS for building query model for the views of aggregates
The Event Sourcing model persists the data as a sequence of events. But this data is not a queryable data to be presented to a business user. Because the write model (commands) are just an event store, you may need an architecture approach to make queryable data for your microservices. An architecture pattern, Command Query Responsibility Segregation (CQRS) is an ideal pattern for implementing query models for your microservices. As the name implies, CQRS segregates an application into two parts: Commands to perform an action to change the state of aggregates, and Queries to provide a query model for the views of aggregates. Although CQRS is an independent architectural pattern, we often use this with Event Sourcing as the model for implementing commands. We may also use different databases for both write operations and query operations. This will also allow you to make a highly performant query model by loading denormalized data sets into data stores of read models. Because you make separate query models, you can solve many practical challenges of querying data from decentralized data models that are often seen in Microservices architecture. The query model data can be designed as a denormalized data to be quickly used for views without executing too many join queries. When you use CQRS with Event Sourcing, you can write workers (background worker) to subscribe domain events, and reactive to these domain events, you can make queryable data for the query model without the barrier of denormalisation.
A simple demo on event-driven distributed system in Go with gRPC, NATS JetStream and CockroachDB
The primary objective of the demo is to provide an overview about building event-driven distributed systems, but nothing about exploring individual technologies being used in the demo.
The example demo is available here: https://github.com/shijuvar/go-distributed-sys.
I’ve simplified the example for the sake of a conceptual demo in order to simply understand things, and get some insight on how to use technologies like gRPC, NATS JetStream, etc. for building distributed systems.
NATS JetStream for distributed event streaming systems
In Event Sourcing architecture, or with any kind of event-driven architecture, when you publish one domain event from one microservice or a component of a distributed system, other microservices can be reactive to those events, and publish another set of events after doing its own local transactions. A single transaction in a Microservices based system may span into multiple microservices where we can perform a transaction as a sequence of events by building reactive microservices. Each state change of an aggregate can be treated as an event, which is an immutable fact about your system. In order to publish events to let other microservices know that something has happened in the system, we need to use a distributed messaging or a distributed streaming system. In this example, we use NATS JetStream as the event streaming system to build an event-driven distributed system. An event-driven, reactive architecture is a great choice of architecture approach for building massively scalable distributed systems based on Microservices architecture.
The streaming systems let you capture the streams of events from distributed systems and microservices, IoT sensors, Edge devices and software applications, and persist these streams into persistent stores. Because you persist these streams of events into persistent stores, you can replay it for retrieval, processing, and reactive to those event streams by using an event-driven architecture. Because NATS JetStream replay streams of messages from a persistent store that comes with an acknowledgement mechanism of message delivery, it provides the delivery of messages with “At Least Once Delivery” guarantee. In traditional messaging systems, messages are buffering at the server and forwarding those messages to subscriber systems, where if any of the subscriber systems are down while publishing messages, then those messages will be missed from the messaging server. The NATS JetStream, which is a streaming system, guarantees the delivery of messages.
Streams in NATS JetStream
In JetStream, Streams are ‘message stores’, where many related subjects are stored in a single Stream. In this example demo, all of these events are published around an aggregate Order so that a stream named ORDERS is created in the NATS JetStream server. The ORDERS stream can be used to produce and consume messages to and from ORDERS stream with related subjects such as “ORDERS.created”, “ORDERS.paymentdebited”, “ORDERS.approved”, “ORDERS.rejected”, and “ORDERS.shipped”. All of these related subjects ( “ORDERS.*”) belong to “ORDERS” Stream.
Figure 4. ORDERS Stream of file based storage of NATS JetStream Server
gRPC for building APIs based on RPC
In this example, the Event Store provides an API to execute commands, which is an gRPC based API. gRPC is a high performance, open-source remote procedure call (RPC) framework that can run anywhere. It enables client and server applications to communicate transparently, and makes it easier to build connected systems. gRPC is widely known as a communication protocol in Microservices architecture. If you make inter-process communication between microservices over an API, gRPC is a better choice.
Example demo
Here’s the directory structure of the example located at eventstream directory:
Figure 5. Directory structure of the event-driven example at eventstream directory
Here’re the basic workflow of the example:
- A client app post an Order to an HTTP API (ordersvc)
- An HTTP API (ordersvc) receives the order, then executes a command onto Event Store, which is an immutable log of events of domain events, to create an event via its gRPC API (eventstoresvc).
- The Event Store API (eventstoresvc) executes the command and then publishes an event “ORDERS.created” to the ORDERS stream of NATS JetStream server to let other services know that a domain event is created.
- The Payment worker (paymentworker) subscribes the event “ORDERS.created”, then make the payment, and then create another event “ORDERS.paymentdebited” via Event Store API.
- The Event Store API executes a command onto Event Store to create an event “ORDERS.paymentdebited” and publishes an event to NATS JetStream server to let other services know that the payment has been debited.
- The Query synchronising worker (querymodelworker) subscribes the event “ORDERS.created” that synchronise the query data model to provide state of the aggregates for query views.
- The review worker (reviewworker) subscribes the event “ORDERS.paymentdebited” and finally approves the order, and then create another event “ORDERS.approved” via Event Store API.
- A Saga coordinator manages the distributed transactions and makes void transactions on failures (to be implemented)
Event Store for Event Sourcing
Here’s the structure of the message Event. Every state change in the example is treated as an event and executes a command into the Event Store.
Listing 1. Structure of the message Event in protocol buffers organised at eventstore directory
message Event {
string event_id = 1;
string event_type = 2;
string aggregate_id = 3;
string aggregate_type = 4;
string event_data = 5;
string stream = 6;
}
The Event Store API (eventstoresvc) provides a gRPC API to persist events into the Event Store database (Command in the CQRS model), which is implemented with CockroachDB database. Here’s the basic code block in the CreateEvent RPC implementation:
Listing 2. CreateEvent RPC implementation for persisting domain events
// CreateEvent creates a new event into the event store
func (s *server) CreateEvent(ctx context.Context, eventRequest *eventstore.CreateEventRequest) (*eventstore.CreateEventResponse, error) {
err := s.repository.CreateEvent(eventRequest.Event)
if err != nil {
return nil, status.Error(codes.Internal, "internal error")
}
log.Println("Event is created")
go publishEvent(s.nats, eventRequest.Event)
return &eventstore.CreateEventResponse{IsSuccess: true, Error: ""}, nil
}// publishEvent publishes an event via NATS JetStream server
func publishEvent(component *natsutil.NATSComponent, event *eventstore.Event) {
// Creates JetStreamContext to publish messages into Stream
jetStreamContext, _ := component.JetStreamContext()
subject := event.EventType
eventMsg := []byte(event.EventData)
// Publish message on subject (channel)
jetStreamContext.Publish(subject, eventMsg)
log.Println("Published message on subject: " + subject)
}
Whenever a new Event is persisted into the Event Store via its gRPC API, it publishes messages into the ORDERS stream of NATS JetStream server to let other microservices know that a new domain event is published so that subscriber systems which are interested into those domain events, subscribe the events, and could reactive to those events. In this simple example demo, the events are published into the streams of the NATS JetStream system from the Event Store API itself. In real-world scenarios, it might be from individual microservices, and sometimes it might be from a saga coordinator (Distributed Saga) that coordinates a single business transaction, which spans into multiple microservices.
Subscribe events for building reactive microservices
The subscriber systems (consumers) consume messages from Streams of NATS JetStream and perform their own actions and it may publish messages into Streams of NATS JetStream to let other microservices know that another domain event is created.
Pull based Consumer and Push based Consumer in NATS JetStream
In NATS JetStream, consumer systems can be created as either Pull based Consumer or Push based Consumer. JetStream Server delivers the messages from the Streams, as fast as possible to consumer systems based on the subject of choice for Push based consumer systems (Pushing the messages from Server to consumers). On the other hand, Pull based consumers have the control by asking the JetStream Server for messages from the Streams (Pulling the messages from consumers by asking the Server). In the example demo, paymentworker and reviewworker are Push based consumers where consuming the messages as fast as possible is very critical in order to perform their own actions in the distributed transaction process. On the other hand, querymodelworker is a Pull based consumer, which process the Event Store data and persisting queryable data (Query model in the CQRS) for views. If you consume messages for executing some jobs, Pull based consumers are better choice over Push based consumers.
A Push based Consumer
The code block below in Listing 3 (from paymentworker) creates a Pull based consumer as a durable subscription. Durable subscription in NATS JetStream ensures resuming of message consumption from where it previously stopped.
Listing 3. A Push based consumer consumes messages from a Stream ORDERS
// Creates JetStreamContext for create consumer
jetStreamContext, err := natsComponent.JetStreamContext()
if err != nil {
log.Fatal(err)
}
// Create push based consumer as durable
jetStreamContext.QueueSubscribe(subscribeSubject, queueGroup, func(msg *nats.Msg) {
msg.Ack()
var order ordermodel.Order
// Unmarshal JSON that represents the Order data
err := json.Unmarshal(msg.Data, &order)
if err != nil {
log.Print(err)
return
}
log.Printf("Message subscribed on subject:%s, from:%s, data:%v",
subscribeSubject, clientID, order)
// Create OrderPaymentDebitedCommand from Order
command := ordermodel.PaymentDebitedCommand{
OrderID: order.ID,
CustomerID: order.CustomerID,
Amount: order.Amount,
}// Create ORDERS.paymentdebited event
if err := executePaymentDebitedCommand(command); err != nil {
log.Println("error occured while executing the PaymentDebited command")
}
}, nats.Durable(clientID), nats.ManualAck())
The QueueSubscribe API lets you create Push based consumers with a queue group name, in order to create load balanced consumers. In the preceding code block, A consumer system subscribes messages from the subject “ORDERS.created" and then it does its own actions and creates another event called “ORDERS.paymentdebited” by calling the gRPC API. Inside the gRPC API, it persist event and event data into the Event Store and publishes message into the ORDERS Stream of NATS JetStream Server.
The code block below shows the implementation of executePaymentDebitedCommand
method
Listing 4. executePaymentDebitedCommand method from the paymentworker
func executePaymentDebitedCommand(command ordermodel.PaymentDebitedCommand) error {
conn, err := grpc.Dial(grpcUri, grpc.WithInsecure())
if err != nil {
log.Fatalf("Unable to connect: %v", err)
}
defer conn.Close()
client := eventstore.NewEventStoreClient(conn)
paymentJSON, _ := json.Marshal(command)
eventid, _ := uuid.NewUUID()
event := &eventstore.Event{
EventId: eventid.String(),
EventType: event,
AggregateId: command.OrderID,
AggregateType: aggregate,
EventData: string(paymentJSON),
Stream: stream,
}
createEventRequest := &eventstore.CreateEventRequest{Event: event}
resp, err := client.CreateEvent(context.Background(), createEventRequest)
if err != nil {
return fmt.Errorf("error from RPC server: %w", err)
}
if resp.IsSuccess {
return nil
}
return errors.New("error from RPC server")
}
A Pull based consumer: Subscribe events for building query model for the views of aggregates
In CQRS, which segregates an application into two parts: Commands and Queries. Here commands are implemented by using Event Sourcing, which uses an Event Store of immutable log of events to construct the application state. In order to create query models, we can also subscribe events when command operations are executed. In this example, querymodelworker
consumes messages on subject “ORDERS.created” from the ORDERS Stream of the NATS JetStream server and persisting data into the database to be used for query model.
Listing 5. A Pull based consumer for syncing data for the query model
func pullSubscribeOnOrder(js nats.JetStreamContext) {
// Create Pull based consumer with maximum 128 inflight.
// PullMaxWaiting defines the max inflight pull requests.
sub, _ := js.PullSubscribe(subscribeSubject, clientID, nats.PullMaxWaiting(128))
for {
msgs, _ := sub.Fetch(batch)
for _, msg := range msgs {
msg.Ack()
var order ordermodel.Order
// Unmarshal JSON that represents the Order data
err := json.Unmarshal(msg.Data, &order)
if err != nil {
log.Print(err)
return
}
log.Printf("Message subscribed on subject:%s, from:%s, data:%v", subscribeSubject, clientID, order)
orderDB, _ := sqldb.NewOrdersDB()
repository, _ := ordersyncrepository.New(orderDB.DB)
// Sync query model with event data
if err := repository.CreateOrder(context.Background(), order); err != nil {
log.Printf("Error while replicating the query model %+v", err)
}
}
}
}
The PullSubscribe API of JetStream lets you create Pull based consumers. Once you create a Pull based consumers. Once you create a Pull based consumer, a call to the method Fetch
pulls a batch of messages from a stream of NATS JetStream. In the preceding code block, messages are fetched from ORDERS stream, and acknowledges to the Server using method msg.Ack()
and then executes some logic to sync the data model with the events stored in Event Store and creates a denormalized views of aggregates by persisting data into the data store to be used for Query model in CQRS architecture.
Persistent store with CockroachDB
One major benefit of using CQRS is that you can have different data models for both write operations and query operations, and thus you can also use different database technologies. In this example demo, we use CockroachDB for executing both commands and query models. CockroachDB is a distributed SQL database for building global, scalable data stores that can survive disasters. In your Go applications, you can work with package database/sql using a PostgreSQL-compatible driver like github.com/lib/pq. If you’re making transactions on package database/sql use it with CockroachDB’s Go package github.com/cockroachdb/cockroach-go/crdb. In the example demo, persistence logic with CockroachDB is implemented in the directory cockroachdb. The method ExecuteTx of package crdb lets you execute transactions into CockroachDB.
Listing 6. Transaction implementation in CockroachDB using package crdb
// CreateOrder persist Order data into the query model
func (repo *repository) CreateOrder(ctx context.Context, order order.Order) error {
// Run a transaction to sync the query model.
err := crdb.ExecuteTx(ctx, repo.db, nil, func(tx *sql.Tx) error {
return createOrder(tx, order)
})
if err != nil {
return err
}
return nil
}
func createOrder(tx *sql.Tx, order order.Order) error {
// Insert into the "orders" table.
sql := `
INSERT INTO orders (id, customerid, status, createdon, restaurantid, amount)
VALUES ($1,$2,$3,$4,$5,$6)`
_, err := tx.Exec(sql, order.ID, order.CustomerID, order.Status, order.CreatedOn, order.RestaurantId, order.Amount)
if err != nil {
return err
}
// Insert items into the "orderitems" table.
// Because it's store for read model, we can insert denormalized data
for _, v := range order.OrderItems {
sql = `
INSERT INTO orderitems (orderid, customerid, code, name, unitprice, quantity)
VALUES ($1,$2,$3,$4,$5,$6)`
_, err := tx.Exec(sql, order.ID, order.CustomerID, v.ProductCode, v.Name, v.UnitPrice, v.Quantity)
if err != nil {
return err
}
}
return nil
}
Source Code
The source code of the example demo is available here: https://github.com/shijuvar/go-distributed-sys
Summary
The primary objective of the post is to give some insight on building event-driven distributed systems by using an approach, loosely based on Event Sourcing and CQRS. It’s very complex to build real-world Microservices based distributed systems where the hardest part is deal with data that is scattered amongst several databases owned by individual microservices. This makes complexities to build business transactions that span into multiple microservices, and querying data where making a join query is not possible with multiple databases. Thus, you must use some architectural approaches to build Microservices based distributed systems from a real-world perspective where you can consider to use an event-driven architecture like Event Sourcing. The Event Sourcing architecture is better to be paired with CQRS. In this simple example demo, I’ve used three of my favourite technologies: gRPC, NATS and CockroachDB.
When you build distributed systems, don’t try to become a poster child of any buzzwords and named patterns, including Event Sourcing and CQRS. Instead of it, try to learn all of the named patterns, and then understand what it does and does not, with advantages and limitations. When you build your own systems, try to provide a context-driven architecture for your own unique problems rather than becoming a poster child because there is not any perfect solution architecture existed. Distributed systems based on Microservices architecture, is really painful. Event Sourcing and CQRS are painful. When you choose a solution architecture, a pragmatic and middle ground solution might be better choice rather than blindly following some buzzwords, intellectual thoughts and named patterns.
You can follow me on twitter at @shijucv. As a Consulting Solutions Architect, I do provide training and consulting on Go programming language (Golang) and distributed systems architectures, in India.