Building Event-Driven Distributed Systems in Go with gRPC, NATS JetStream and CockroachDB

Practical challenges of building distributed systems and microservices

Challenges for managing distributed transactions and querying data from decentralised data stores

Building event-driven architectures for building distributed systems

Event Sourcing: An event-driven architecture on Event Store

CQRS for building query model for the views of aggregates

A simple demo on event-driven distributed system in Go with gRPC, NATS JetStream and CockroachDB

NATS JetStream for distributed event streaming systems

Streams in NATS JetStream

gRPC for building APIs based on RPC

Example demo

  1. A client app post an Order to an HTTP API (ordersvc)
  2. An HTTP API (ordersvc) receives the order, then executes a command onto Event Store, which is an immutable log of events of domain events, to create an event via its gRPC API (eventstoresvc).
  3. The Event Store API (eventstoresvc) executes the command and then publishes an event “ORDERS.created” to the ORDERS stream of NATS JetStream server to let other services know that a domain event is created.
  4. The Payment worker (paymentworker) subscribes the event “ORDERS.created”, then make the payment, and then create another event “ORDERS.paymentdebited” via Event Store API.
  5. The Event Store API executes a command onto Event Store to create an event “ORDERS.paymentdebited” and publishes an event to NATS JetStream server to let other services know that the payment has been debited.
  6. The Query synchronising worker (querymodelworker) subscribes the event “ORDERS.created” that synchronise the query data model to provide state of the aggregates for query views.
  7. The review worker (reviewworker) subscribes the event “ORDERS.paymentdebited” and finally approves the order, and then create another event “ORDERS.approved” via Event Store API.
  8. A Saga coordinator manages the distributed transactions and makes void transactions on failures (to be implemented)

Event Store for Event Sourcing

message Event {
string event_id = 1;
string event_type = 2;
string aggregate_id = 3;
string aggregate_type = 4;
string event_data = 5;
string stream = 6;
}
// CreateEvent creates a new event into the event store
func (s *server) CreateEvent(ctx context.Context, eventRequest *eventstore.CreateEventRequest) (*eventstore.CreateEventResponse, error) {
err := s.repository.CreateEvent(eventRequest.Event)
if err != nil {
return nil, status.Error(codes.Internal, "internal error")
}
log.Println("Event is created")
go publishEvent(s.nats, eventRequest.Event)
return &eventstore.CreateEventResponse{IsSuccess: true, Error: ""}, nil
}
// publishEvent publishes an event via NATS JetStream server
func publishEvent(component *natsutil.NATSComponent, event *eventstore.Event) {
// Creates JetStreamContext to publish messages into Stream
jetStreamContext, _ := component.JetStreamContext()
subject := event.EventType
eventMsg := []byte(event.EventData)
// Publish message on subject (channel)
jetStreamContext.Publish(subject, eventMsg)
log.Println("Published message on subject: " + subject)
}

Subscribe events for building reactive microservices

Pull based Consumer and Push based Consumer in NATS JetStream

A Push based Consumer

// Creates JetStreamContext for create consumer 
jetStreamContext
, err := natsComponent.JetStreamContext()
if err != nil {
log.Fatal(err)
}
// Create push based consumer as durable
jetStreamContext.QueueSubscribe(subscribeSubject, queueGroup, func(msg *nats.Msg) {
msg.Ack()
var order ordermodel.Order
// Unmarshal JSON that represents the Order data
err := json.Unmarshal(msg.Data, &order)
if err != nil {
log.Print(err)
return
}
log.Printf("Message subscribed on subject:%s, from:%s, data:%v",
subscribeSubject, clientID, order)

// Create OrderPaymentDebitedCommand from Order
command := ordermodel.PaymentDebitedCommand{
OrderID: order.ID,
CustomerID: order.CustomerID,
Amount: order.Amount,
}
// Create ORDERS.paymentdebited event
if err := executePaymentDebitedCommand(command); err != nil {
log.Println("error occured while executing the PaymentDebited command")
}

}, nats.Durable(clientID), nats.ManualAck())
func executePaymentDebitedCommand(command ordermodel.PaymentDebitedCommand) error {

conn, err := grpc.Dial(grpcUri, grpc.WithInsecure())
if err != nil {
log.Fatalf("Unable to connect: %v", err)
}
defer conn.Close()
client := eventstore.NewEventStoreClient(conn)
paymentJSON, _ := json.Marshal(command)
eventid, _ := uuid.NewUUID()
event := &eventstore.Event{
EventId: eventid.String(),
EventType: event,
AggregateId: command.OrderID,
AggregateType: aggregate,
EventData: string(paymentJSON),
Stream: stream,
}
createEventRequest := &eventstore.CreateEventRequest{Event: event}

resp, err := client.CreateEvent(context.Background(), createEventRequest)
if err != nil {
return fmt.Errorf("error from RPC server: %w", err)
}
if resp.IsSuccess {
return nil
}
return errors.New("error from RPC server")
}

A Pull based consumer: Subscribe events for building query model for the views of aggregates

func pullSubscribeOnOrder(js nats.JetStreamContext) {
// Create Pull based consumer with maximum 128 inflight.
// PullMaxWaiting defines the max inflight pull requests.
sub, _ := js.PullSubscribe(subscribeSubject, clientID, nats.PullMaxWaiting(128))
for {

msgs, _ := sub.Fetch(batch)
for _, msg := range msgs {
msg.Ack()
var order ordermodel.Order
// Unmarshal JSON that represents the Order data
err := json.Unmarshal(msg.Data, &order)
if err != nil {
log.Print(err)
return
}
log.Printf("Message subscribed on subject:%s, from:%s, data:%v", subscribeSubject, clientID, order)
orderDB, _ := sqldb.NewOrdersDB()
repository, _ := ordersyncrepository.New(orderDB.DB)
// Sync query model with event data
if err := repository.CreateOrder(context.Background(), order); err != nil {
log.Printf("Error while replicating the query model %+v", err)
}
}
}
}

Persistent store with CockroachDB

// CreateOrder persist Order data into the query model
func (repo *repository) CreateOrder(ctx context.Context, order order.Order) error {

// Run a transaction to sync the query model.
err := crdb.ExecuteTx(ctx, repo.db, nil, func(tx *sql.Tx) error {
return createOrder(tx, order)
})
if err != nil {
return err
}
return nil
}

func createOrder(tx *sql.Tx, order order.Order) error {

// Insert into the "orders" table.
sql := `
INSERT INTO orders (id, customerid, status, createdon, restaurantid, amount)
VALUES ($1,$2,$3,$4,$5,$6)`
_, err := tx.Exec(sql, order.ID, order.CustomerID, order.Status, order.CreatedOn, order.RestaurantId, order.Amount)
if err != nil {
return err
}
// Insert items into the "orderitems" table.
// Because it's store for read model, we can insert denormalized data
for _, v := range order.OrderItems {
sql = `
INSERT INTO orderitems (orderid, customerid, code, name, unitprice, quantity)
VALUES ($1,$2,$3,$4,$5,$6)`

_, err := tx.Exec(sql, order.ID, order.CustomerID, v.ProductCode, v.Name, v.UnitPrice, v.Quantity)
if err != nil {
return err
}
}
return nil
}

Source Code

Summary

--

--

--

Consulting Solutions Architect and Trainer on Go and Distributed Systems, with focus on Microservices and Event-Driven Architectures. Author of two books on Go.

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

5 Best Sites to Learn Coding During The Summer

Google coding interview template

[New Coin Listing] FamousBlock (FOB) to be Listed on DigiFinex!

Scaling the translation process

Multi — Threading in Java

Multi — Threading

What is :: a (double colon) in ruby?

Code Optimization- Game Dev Series 145

Enable Unique And Dynamic SMTP Mail Settings For Each User — Laravel

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Shiju Varghese

Shiju Varghese

Consulting Solutions Architect and Trainer on Go and Distributed Systems, with focus on Microservices and Event-Driven Architectures. Author of two books on Go.

More from Medium

Streaming an avalanche of data with Go’s io package

How to Implement Memento Pattern in Go

Builder Pattern in Go