通过拦截器实现外部逻辑

在某些情况下,某些请求头可能仅适用于一个端点,但通常我们希望能够在不同的端点间应用相同的逻辑。以 auth_token 请求头为例,如果我们有多个路由只能在用户登录后访问,我们不希望在每个端点中重复之前做的检查。这会导致代码膨胀,不易维护,并且可能会让开发人员在查找端点的核心逻辑时分心。因此,我们将使用身份验证拦截器。我们将提取身份验证逻辑,并在每次调用 API 之前都执行该逻辑。

我们的拦截器将被称为 authInterceptor。服务器端的拦截器将简单地执行我们在前面章节中进行的所有检查,如果一切顺利,才会启动端点的执行。否则,拦截器会返回错误,端点不会被调用。

要定义服务器端拦截器,我们有两种选择。第一种适用于我们处理单一 RPC 端点时(例如 AddTasks)。拦截器函数如下所示:

func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error)

接下来是处理流的拦截器。它们如下所示:

func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error

它们非常相似,主要的区别是参数类型。由于我们在这个用例中不使用所有参数,我建议你查看文档( https://pkg.go.dev/google.golang.org/grpc )中的 UnaryServerInterceptorStreamServerInterceptor,并试着进行一些实验。

现在让我们开始编写将应用于 AddTasks 的单一拦截器。我们首先将检查提取到一个函数中,以便在拦截器间共享。在一个名为 interceptors.go 的文件中,我们可以编写如下代码:

import (
    "context"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/metadata"
    "google.golang.org/grpc/status"
)

const authTokenKey string = "auth_token"
const authTokenValue string = "authd"

func validateAuthToken(ctx context.Context) error {
    md, _ := metadata.FromIncomingContext(ctx)
    if t, ok := md[authTokenKey]; ok {
        switch {
        case len(t) != 1:
            return status.Errorf(
                codes.InvalidArgument,
                fmt.Sprintf("%s should contain only 1 value", authTokenKey),
            )
        case t[0] != authTokenValue:
            return status.Errorf(
                codes.Unauthenticated,
                fmt.Sprintf("incorrect %s", authTokenKey),
            )
        }
    } else {
        return status.Errorf(
            codes.Unauthenticated,
            fmt.Sprintf("failed to get %s", authTokenKey),
        )
    }
    return nil
}

与我们直接在 UpdateTasks 中所做的没有什么不同。但现在,编写我们的拦截器非常简单。我们只需要调用 validateAuthToken 函数并检查是否有错误。如果有错误,我们会直接返回错误;如果没有错误,我们将调用处理程序函数,后者实际上会调用端点:

func unaryAuthInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    if err := validateAuthToken(ctx); err != nil {
        return nil, err
    }

    return handler(ctx, req)
}

我们可以对流式处理做同样的事情。唯一的变化是处理程序的参数以及我们如何获取上下文。

func streamAuthInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    if err := validateAuthToken(ss.Context()); err != nil {
        return err
    }
    return handler(srv, ss)
}

现在,你可能会想,虽然我们有了这些函数,但没有人调用它们。你完全正确。我们需要注册这些拦截器,以便我们的服务器知道它们的存在。这是在 server/main.go 中完成的,我们可以将拦截器作为选项添加到 gRPC 服务器中。现在,我们是这样创建服务器的:

var opts []grpc.ServerOption
s := grpc.NewServer(opts...)

要添加拦截器,我们只需将它们添加到 opts 变量中。

opts := []grpc.ServerOption{
    grpc.UnaryInterceptor(unaryAuthInterceptor),
    grpc.StreamInterceptor(streamAuthInterceptor),
}

现在我们可以运行服务器:

在运行服务器之前,您可以删除 server/impl.goUpdateTasks 函数的所有身份验证逻辑。由于拦截器将自动进行请求认证,这部分逻辑不再需要。

$ go run ./server 0.0.0.0:50051
listening at 0.0.0.0:50051

然后,我们可以运行客户端:

$ go run ./client 0.0.0.0:50051
--------ADD--------
rpc error: code = Unauthenticated desc = failed to get auth_token
exit status 1

正如预期的那样,我们得到了一个错误,因为我们在客户端的 addTask 函数中从未添加 auth_token 请求头。

显然,我们不希望手动为所有调用添加请求头。我们将创建一个客户端拦截器,在发送请求之前为我们添加请求头。在客户端拦截器中,我们有两种方式来定义拦截器。对于单一调用,我们有如下代码:

func unaryInterceptor(ctx context.Context, method string, req interface{}, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error

对于流,我们有如下代码:

func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error)

正如你所看到的,客户端拦截器有更多的参数。尽管大多数参数在我们的用例中并不重要,但我还是建议你查看文档( https://pkg.go.dev/google.golang.org/grpc )中的 UnaryClientInterceptorStreamClientInterceptor,并进行一些实验。

在客户端拦截器中,我们将简单地创建一个新的上下文,并在调用端点之前附加元数据。我们甚至不需要创建一个单独的函数来共享逻辑,因为这和调用 AppendToOutgoingContext 函数一样简单。

client/interceptors.go 中,我们可以编写如下代码:

import (
    "context"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
)

const authTokenKey string = "auth_token"
const authTokenValue string = "authd"

func unaryAuthInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    ctx = metadata.AppendToOutgoingContext(ctx, authTokenKey, authTokenValue)
    err := invoker(ctx, method, req, reply, cc, opts...)
    return err
}

func streamAuthInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
    ctx = metadata.AppendToOutgoingContext(ctx, authTokenKey, authTokenValue)
    s, err := streamer(ctx, desc, cc, method, opts...)
    if err != nil {
        return nil, err
    }
    return s, nil
}

最后,和服务器端一样,我们也需要注册这些拦截器。这一次,这些拦截器将通过向我们在 main 中使用的 Dial 函数添加 DialOptions 来注册。目前,你应该有如下内容:

opts := []grpc.DialOption{
    grpc.WithTransportCredentials(insecure.NewCredentials()),
}

我们现在可以这样添加拦截器:

opts := []grpc.DialOption{
    //...
    grpc.WithUnaryInterceptor(unaryAuthInterceptor),
    grpc.WithStreamInterceptor(streamAuthInterceptor),
}

当它们被注册后,我们就可以运行我们的服务器:

$ go run ./server 0.0.0.0:50051
listening at 0.0.0.0:50051

然后,我们可以运行客户端:

在运行客户端之前,您可以删除 client/main.go 文件中 updateTask 函数内对 AppendToOutgoingContext 的调用。由于拦截器会自动处理这一操作,所以这部分逻辑不再需要。

$ go run ./client 0.0.0.0:50051

所有的调用现在应该都能够正常进行,不会出现任何错误。

总结:在本节中,我们展示了如何在服务器端和客户端编写单一和流拦截器。这些拦截器的目的是自动化多个端点间的重复工作。在我们的示例中,我们自动化了为身份验证添加和检查 auth_token 请求头的过程。