跟踪 API 调用

在这一部分,我们将实现一些指标,以便通过仪表板工具进行聚合。这些指标可能包括每秒请求数、状态分布(例如 “Ok”,“Internal” 等)以及其他许多内容。我们将使用 OpenTelemetryPrometheus 对代码进行监控,从而使工具如 Grafana 可以用来创建仪表板。

首先需要理解的是,我们将运行一个 HTTP 服务器来提供 Prometheus 的指标。Prometheus 会通过 /metrics 路径公开指标,外部工具可以查询该路径来获取可用的各种指标。

为了开始,我们需要获取 Prometheus Go 库的依赖。我们将通过获取 go-grpc-middleware/providers/prometheus 来获取依赖。这是 Prometheus Go 库的传递依赖,我们仍然需要注册一些在 Prometheus 提供程序中定义的拦截器:

$ go get github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus

现在我们可以创建一个 HTTP 服务器,该服务器将用于公开 /metrics 路径。我们将创建一个名为 newMetricsServer 的函数,该函数接受 HTTP 服务器的地址:

以下的代码解释了 server/main.go 文件的每一部分。由于完整展示整个文件可能会让人感到不知所措,因此我们将逐步讲解其中的代码,并且你将能够看到 main.go 文件中的导入和整体结构。请注意,为了更好地解释,我们会在后续部分逐步添加某些元素。如果你看到某段代码尚未展示,继续往下阅读,你将会得到对这段代码的详细解释。

func newMetricsServer(httpAddr string) *http.Server {
    httpSrv := &http.Server{Addr: httpAddr}
    m := http.NewServeMux()
    httpSrv.Handler = m
    return httpSrv
}

有了 HTTP 服务器之后,我们将重构 main 函数,将创建 gRPC 服务器的部分分离到另一个函数中:

func newGrpcServer(lis net.Listener) (*grpc.Server, error) {
    creds, err := credentials.NewServerTLSFromFile("./certs/server_cert.pem", "./certs/server_key.pem")
    if err != nil {
        return nil, err
    }
    logger := log.New(os.Stderr, "", log.Ldate|log.Ltime)
    opts := []grpc.ServerOption{
        // ...
    }
    s := grpc.NewServer(opts...)
    pb.RegisterTodoServiceServer(s, &server{
        d: New(),
    })
    return s, nil
}

实际上没有什么变化,只是为了方便,创建 gRPC 服务器的部分被提取到一个函数中,以便后续能并行运行两个服务器。

接下来,main 函数应该包含两个地址参数。第一个是 gRPC 服务器的地址,第二个是 HTTP 服务器的地址:

func main() {
    args := os.Args[1:]
    if len(args) != 2 {
        log.Fatalln("usage: server [GRPC_IP_ADDR] [METRICS_IP_ADDR]")
    }
    grpcAddr := args[0]
    httpAddr := args[1]
}

现在,我们需要同时运行两个服务器。为此,我们将使用 errgrouphttps://pkg.go.dev/golang.org/x/sync/errgroup )包。它允许我们将多个 goroutine 添加到一个组中并等待它们。

首先,我们需要为该组创建一个上下文。我们将创建一个可取消的上下文,以便稍后可以释放服务器的资源:

ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

接着,我们可以开始处理 SIGTERM 信号。当我们想退出这两个服务器时,按下 【Ctrl + C】 会发送 SIGTERM 信号,我们希望服务器能优雅地关闭。为了处理这个信号,我们将创建一个通道,当收到 SIGTERM 信号时将触发:

quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(quit)

接下来,我们可以创建一个包含两个服务器的 goroutine 组。我们首先使用创建的可取消上下文创建该组,然后通过 Go(func() error) 将 goroutines 添加到该组。第一个 goroutine 处理 gRPC 服务器的服务,第二个 goroutine 处理 HTTP 服务器:

lis, err := net.Listen("tcp", grpcAddr)
if err != nil {
    log.Fatalf("unexpected error: %v", err)
}
g, ctx := errgroup.WithContext(ctx)
grpcServer, err := newGrpcServer(lis)
if err != nil {
    log.Fatalf("unexpected error: %v", err)
}
g.Go(func() error {
    log.Printf("gRPC server listening at %s\n", grpcAddr)
    if err := grpcServer.Serve(lis); err != nil {
        log.Printf("failed to gRPC server: %v\n", err)
        return err
    }
    log.Println("gRPC server shutdown")
    return nil
})
metricsServer := newMetricsServer(httpAddr)
g.Go(func() error {
    log.Printf("metrics server listening at %s\n", httpAddr)
    if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
        log.Printf("failed to serve metrics: %v\n", err)
        return err
    }
    log.Println("metrics server shutdown")
    return nil
})

现在我们有了 goroutine 组,可以等待上下文完成或等待 quit 通道接收事件:

select {
    case <-quit:
        break
    case <-ctx.Done():
        break
}

当其中一个事件被接收时,我们将通过调用 cancel 函数来释放资源,最后,我们等待该组中所有 goroutine 完成:

cancel()

timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer timeoutCancel()

log.Println("shutting down servers, please wait...")

grpcServer.GracefulStop()
metricsServer.Shutdown(timeoutCtx)

if err := g.Wait(); err != nil {
    log.Fatal(err)
}

接下来,我们将为该系统添加追踪功能。指标服务器将公开 /metrics 路径,gRPC 服务器将收集这些指标并将其添加到 Prometheus 注册表中。该注册表是一个包含多个收集器的集合。我们通过注册一个或多个收集器来让注册表收集不同的指标,最后将其暴露出来。

在创建注册表之前,我们将首先创建一个通过 go-grpc-middleware/providers/prometheus 包中的 NewServerMetrics 函数提供的收集器。然后,我们将实际创建注册表,并将收集器注册到其中:

srvMetrics := grpcprom.NewServerMetrics(
    grpcprom.WithServerHandlingTimeHistogram(
        grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
    ),
)
reg := prometheus.NewRegistry()
reg.MustRegister(srvMetrics)

注意我们传递了 NewServerMetrics 的选项。这个选项让我们可以将调用的延迟分布到不同的桶中。也就是说,它会告诉我们有多少请求在 0.001 秒内完成、在 0.01 秒内完成,依此类推。

最后,我们将注册表传递给 HTTP 服务器,这样它就知道有哪些可用的指标,并将收集器传递给 gRPC 服务器,这样它就能将指标推送到收集器中:

func newMetricsServer(httpAddr string, reg *prometheus.Registry) *http.Server {
    //...
    m.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
    //...
    return httpSrv
}
func newGrpcServer(lis net.Listener, srvMetrics *grpcprom.ServerMetrics) (*grpc.Server, error) {
    //...
    opts := []grpc.ServerOption{
        //...
        grpc.ChainUnaryInterceptor(
            otelgrpc.UnaryServerInterceptor(),
            srvMetrics.UnaryServerInterceptor(),
            //...
        ),
        grpc.ChainStreamInterceptor(
            otelgrpc.StreamServerInterceptor(),
            srvMetrics.StreamServerInterceptor(),
            //...
        ),
    }
    //...
}

func main() {
    //...
    grpcServer, err := newGrpcServer(lis, srvMetrics)
    //...
    metricsServer := newMetricsServer(httpAddr, reg)
    //...
}

现在我们将使用 OpenTelemetry(otelgrpc)。这是一个自动生成 gRPC 服务器指标的工具。然后,Prometheus 会通过收集器(srvMetrics)收集这些指标,最终 HTTP 服务器能够暴露它们。

要为 gRPC 获取 OpenTelemetry,我们只需获取该依赖:

$ go get go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc

现在我们应该能够运行服务器:

$ go run ./server 0.0.0.0:50051 0.0.0.0:50052
metrics server listening at 0.0.0.0:50052
gRPC server listening at 0.0.0.0:50051

接着,我们可以通过运行客户端来测试 0.0.0.0:50051 地址:

$ go run ./client 0.0.0.0:50051

当客户端调用完成后,我们可以使用以下命令查看指标:

$ curl http://localhost:50052/metrics

你现在应该能看到类似如下的日志(简化,只显示 AddTask):

grpc_server_handled_total{grpc_code="OK",grpc_method="AddTask",grpc_service="todo.v2.TodoService",grpc_type="unary"} 3
grpc_server_handling_seconds_bucket{grpc_method="AddTask",grpc_service="todo.v2.TodoService",grpc_type="unary",le="0.001"} 3
grpc_server_handling_seconds_sum{grpc_method="AddTask",grpc_service="todo.v2.TodoService",grpc_type="unary"} 0.000119291
grpc_server_msg_received_total{grpc_method="AddTask",grpc_service="todo.v2.TodoService",grpc_type="unary"} 3
grpc_server_msg_sent_total{grpc_method="AddTask",grpc_service="todo.v2.TodoService",grpc_type="unary"} 3
grpc_server_started_total{grpc_method="AddTask",grpc_service="todo.v2.TodoService",grpc_type="unary"} 3

这些指标意味着服务器接收了三个 AddTask 请求,并且它们都在 0.001 秒内处理完成(总处理时间:0.000119291 秒),并返回了三个响应给客户端。

显然,关于这些指标还有很多工作要做。不过,这可能是一本书的内容。如果你对这个领域感兴趣,建议你深入学习如何将 Prometheus 与 Grafana 等工具集成,以便创建更加易于阅读的仪表板。

Bazel

我们需要更新依赖项,以便让 Prometheus 和 OpenTelemetry 正常工作。为此,我们将运行 gazelle-update-repos

$ bazel run //:gazelle-update-repos

然后,我们将运行 gazelle,自动将依赖项链接到我们的代码:

$ bazel run //:gazelle

最后,我们现在可以运行我们的服务器:

$ bazel run //:server:server 0.0.0.0:50051 0.0.0.0:50052

metrics server listening at 0.0.0.0:50052
gRPC server listening at 0.0.0.0:50051

接着运行我们的客户端:

$ bazel run //client:client 0.0.0.0:50051

总结:我们展示了如何通过使用 OpenTelemetry 和 Prometheus 从 gRPC 服务器获取指标。我们通过创建第二个服务器,在 /metrics 路由上导出指标,并通过使用 Prometheus 注册表和收集器,将 gRPC 服务器的指标交换到 HTTP 服务器中。