Writing gRPC Interceptors in Go
In this post, we’ll take a look into how to write gRPC Interceptors in Go. When you write HTTP applications, you can wrap route specific application handlers with HTTP middleware that let you execute some common logic before and after executing application handlers. We typically use middleware to write cross-cutting components such as authorization, logging, caching, etc. The same kind of functionality can be implemented in gRPC by using a concept called Interceptors.
gRPC Interceptors
By using Interceptors, you can intercept the execution of RPC methods on both the client and the server. On both the client and the server, there are two types of Interceptors:
- UnaryInterceptor
- StreamInterceptor
UnaryInterceptor intercepts the unary RPCs, while StreamInterceptor intercepts the streaming RPCs.
In Unary RPCs, the client sends a single request to the server and gets a single response back. In streaming RPCs, client or server, or both side (bi-directional streaming), get a stream to read a sequence of messages back, and then client or server reads from the returned stream until there are no more messages.
Writing Interceptors in gRPC Client
In gRPC client applications, you write two types of Interceptors:
- UnaryClientInterceptor: UnaryClientInterceptor intercepts the execution of a unary RPC on the client.
- StreamClientInterceptor: StreamClientInterceptor intercepts the creation of ClientStream. It may return a custom ClientStream to intercept all I/O operations.
UnaryClientInterceptor
In order to create a UnaryClientInterceptor, do call the WithUnaryInterceptor function by providing a UnaryClientInterceptor func value, which returns a grpc.DialOption that specifies the interceptor for unary RPCs:
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption
The returned grpc.DialOption value is then used as an argument to call grpc.Dial function to apply the Interceptor for unary RPCs.
Here’s the definition of UnaryClientInterceptor func type:
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts …CallOption) error
The parameter invoker is the handler to complete the RPC and it is the responsibility of the interceptor to call it. The UnaryClientInterceptor func value provides interceptor logic. Here’s an example interceptor that implements the UnaryClientInterceptor:
func clientInterceptor(
ctx context.Context,
method string,
req interface{},
reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
// Logic before invoking the invoker
start := time.Now()
// Calls the invoker to execute RPC
err := invoker(ctx, method, req, reply, cc, opts...)
// Logic after invoking the invoker
grpcLog.Infof("Invoked RPC method=%s; Duration=%s; Error=%v", method,
time.Since(start), err)
return err
}
The function below returns an grpc.DialOption value, which calls the WithUnaryInterceptor function by providing the UnaryClientInterceptor func value:
func withClientUnaryInterceptor() grpc.DialOption {
return grpc.WithUnaryInterceptor(clientInterceptor)
}
The returned grpc.DialOption value is used as an argument to call grpc.Dial function to apply the Interceptor:
conn, err := grpc.Dial(grpcUri, grpc.WithInsecure(), withClientUnaryInterceptor())
StreamClientInterceptor
In order to create a StreamClientInterceptor, do call the WithStreamInterceptor function by providing StreamClientInterceptor func value, which returns a grpc.DialOption that specifies the Interceptor for streaming RPCs:
func WithStreamInterceptor(f StreamClientInterceptor) DialOption
The returned grpc.DialOption value is then used as an argument to call grpc.Dial function to apply the Interceptor for streaming RPCs.
Here’s the definition of StreamClientInterceptor func type:
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
In order to apply StreamClientInterceptor to streaming RPCs, just pass the returned grpc.DialOption value of WithStreamInterceptor function as an argument for calling grpc.Dial function. You can pass both UnaryClientInterceptor and StreamClientInterceptor values to grpc.Dial function.
conn, err := grpc.Dial(grpcUri, grpc.WithInsecure(), withClientUnaryInterceptor(), withClientStreamInterceptor)
Writing Interceptors in gRPC Server
Like gRPC client applications, gRPC server applications provide two types of Interceptors:
- UnaryServerInterceptor: UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server.
- StreamServerInterceptor: StreamServerInterceptor provides a hook to intercept the execution of a streaming RPC on the server.
UnaryServerInterceptor
In order to create a UnaryServerInterceptor, do call the UnaryInterceptor function by providing the UnaryServerInterceptor func value as an argument, which returns a grpc.ServerOption value that sets the UnaryServerInterceptor for the server.
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption
The returned grpc.ServerOption value is then used to provide as an argument to grpc.NewServer function to register as UnaryServerInterceptor.
Here’s the definition of UnaryServerInterceptor func:
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
The parameter info contains all the information of this RPC, the interceptor can operate on. And handler is the wrapper of the service method implementation. It is the responsibility of the interceptor to invoke handler to complete the RPC.
Example Server Unary Interceptor for Authorization
Here’s the example Interceptor for authorizing the RPC methods:
Note that the interceptor logic in the preceding code block, uses the packages google.golang.org/grpc/codes and google.golang.org/grpc/status.
Sending metadata between client and server
The serverInterceptor function uses the authorize function to authorize the RPC call, which receives the authorization token from metadata. gRPC supports sending metadata between client and server with Context value. The package google.golang.org/grpc/metadata provides the functionality for metadata.
Here’s the code block that sends JWT token from client to server:
ctx := context.Background()
md := metadata.Pairs("authorization", jwtToken)
ctx = metadata.NewOutgoingContext(ctx, md)
// Calls RPC method CreateEvent using the stub client
resp, err := client.CreateEvent(context.Background(), event)
Function Pairs of metadata package returns an MD type (type MD map[string][]string) formed by the mapping of key, value.
The code block below receives the metadata from gRPC server in our interceptor logic:
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return status.Errorf(codes.InvalidArgument, "Retrieving metadata is failed")
}
authHeader, ok := md["authorization"]
if !ok {
return status.Errorf(codes.Unauthenticated, "Authorization token is not supplied")
}
token := authHeader[0]
// validateToken function validates the token
err := validateToken(token)
Function FromIncomingContext of metadata package returns MD type from which you can receive the metadata.
Register Interceptor on Server
Here’s the code block that register the Interceptor when creating the gRPC server:
StreamServerInterceptor
In order to create a StreamServerInterceptor, calls the StreamInterceptor function by providing the StreamServerInterceptor func value as an argument, which returns a grpc.ServerOption value that sets the StreamServerInterceptor for the server.
func StreamInterceptor(i StreamServerInterceptor) ServerOption
The returned grpc.ServerOption value is then used to provide as an argument to grpc.NewServer function to register as UnaryServerInterceptor.
Here’s the definition of StreamServerInterceptor func type:
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
Interceptor Chaining using Go gRPC Middleware
By default, gRPC doesn’t allow to apply more than one interceptor either on the client nor on the server side. By using the package go-grpc-middleware, interceptor chaining that allows you to apply multiple interceptors.
Here’s an example for server chaining:
myServer := grpc.NewServer(
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(loggingStream, monitoringStream, authStream)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(loggingUnary, monitoringUnary, authUnary),
)
These interceptors will be executed from left to right: logging, monitoring and auth.
Here’s an example for client side chaining:
clientConn, err = grpc.Dial(
address,
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(monitoringClientUnary, retryUnary)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(monitoringClientStream, retryStream)),
)
client = pb_testproto.NewTestServiceClient(clientConn)
resp, err := client.PingEmpty(s.ctx, &myservice.Request{Msg: "hello"})
These interceptors will be executed from left to right: monitoring and then retry logic.
You can follow me on twitter at @shijucv. I do provide training and consulting on Go programming language (Golang) and distributed systems architectures, in India.